query/promql/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::greptime_value;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::grouping::grouping_udaf;
32use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
33use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
34use datafusion::functions_aggregate::sum::sum_udaf;
35use datafusion::functions_aggregate::variance::var_pop_udaf;
36use datafusion::functions_window::row_number::RowNumber;
37use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
38use datafusion::logical_expr::expr_rewriter::normalize_cols;
39use datafusion::logical_expr::{
40    BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
41    ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
42};
43use datafusion::prelude as df_prelude;
44use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
45use datafusion::scalar::ScalarValue;
46use datafusion::sql::TableReference;
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{ExprSchemable, Literal, SortExpr, col, lit};
51use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
52use datatypes::data_type::ConcreteDataType;
53use itertools::Itertools;
54use once_cell::sync::Lazy;
55use promql::extension_plan::{
56    Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
57    ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
58};
59use promql::functions::{
60    AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
61    Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
62    QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
63    quantile_udaf,
64};
65use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
66use promql_parser::parser::token::TokenType;
67use promql_parser::parser::{
68    AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
69    Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
70    Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
71    VectorSelector, token,
72};
73use regex::{self, Regex};
74use snafu::{OptionExt, ResultExt, ensure};
75use store_api::metric_engine_consts::{
76    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
77};
78use table::table::adapter::DfTableProviderAdapter;
79
80use crate::promql::error::{
81    CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
82    ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
83    InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
84    MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
85    Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
86    UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
87    UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
88    ZeroRangeSelectorSnafu,
89};
90use crate::query_engine::QueryEngineState;
91
92/// `time()` function in PromQL.
93const SPECIAL_TIME_FUNCTION: &str = "time";
94/// `scalar()` function in PromQL.
95const SCALAR_FUNCTION: &str = "scalar";
96/// `absent()` function in PromQL
97const SPECIAL_ABSENT_FUNCTION: &str = "absent";
98/// `histogram_quantile` function in PromQL
99const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
100/// `vector` function in PromQL
101const SPECIAL_VECTOR_FUNCTION: &str = "vector";
102/// `le` column for conventional histogram.
103const LE_COLUMN_NAME: &str = "le";
104
105/// Static regex for validating label names according to Prometheus specification.
106/// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
107static LABEL_NAME_REGEX: Lazy<Regex> =
108    Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
109
110const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
111
112/// default value column name for empty metric
113const DEFAULT_FIELD_COLUMN: &str = "value";
114
115/// Special modifier to project field columns under multi-field mode
116const FIELD_COLUMN_MATCHER: &str = "__field__";
117
118/// Special modifier for cross schema query
119const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
120const DB_COLUMN_MATCHER: &str = "__database__";
121
122/// Threshold for scatter scan mode
123const MAX_SCATTER_POINTS: i64 = 400;
124
125/// Interval 1 hour in millisecond
126const INTERVAL_1H: i64 = 60 * 60 * 1000;
127
128#[derive(Default, Debug, Clone)]
129struct PromPlannerContext {
130    // query parameters
131    start: Millisecond,
132    end: Millisecond,
133    interval: Millisecond,
134    lookback_delta: Millisecond,
135
136    // planner states
137    table_name: Option<String>,
138    time_index_column: Option<String>,
139    field_columns: Vec<String>,
140    tag_columns: Vec<String>,
141    /// The matcher for field columns `__field__`.
142    field_column_matcher: Option<Vec<Matcher>>,
143    /// The matcher for selectors (normal matchers).
144    selector_matcher: Vec<Matcher>,
145    schema_name: Option<String>,
146    /// The range in millisecond of range selector. None if there is no range selector.
147    range: Option<Millisecond>,
148}
149
150impl PromPlannerContext {
151    fn from_eval_stmt(stmt: &EvalStmt) -> Self {
152        Self {
153            start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
154            end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
155            interval: stmt.interval.as_millis() as _,
156            lookback_delta: stmt.lookback_delta.as_millis() as _,
157            ..Default::default()
158        }
159    }
160
161    /// Reset all planner states
162    fn reset(&mut self) {
163        self.table_name = None;
164        self.time_index_column = None;
165        self.field_columns = vec![];
166        self.tag_columns = vec![];
167        self.field_column_matcher = None;
168        self.selector_matcher.clear();
169        self.schema_name = None;
170        self.range = None;
171    }
172
173    /// Reset table name and schema to empty
174    fn reset_table_name_and_schema(&mut self) {
175        self.table_name = Some(String::new());
176        self.schema_name = None;
177    }
178
179    /// Check if `le` is present in tag columns
180    fn has_le_tag(&self) -> bool {
181        self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
182    }
183}
184
185pub struct PromPlanner {
186    table_provider: DfTableSourceProvider,
187    ctx: PromPlannerContext,
188}
189
190impl PromPlanner {
191    pub async fn stmt_to_plan_with_alias(
192        table_provider: DfTableSourceProvider,
193        stmt: &EvalStmt,
194        alias: Option<String>,
195        query_engine_state: &QueryEngineState,
196    ) -> Result<LogicalPlan> {
197        let mut planner = Self {
198            table_provider,
199            ctx: PromPlannerContext::from_eval_stmt(stmt),
200        };
201
202        let plan = planner
203            .prom_expr_to_plan(&stmt.expr, query_engine_state)
204            .await?;
205
206        // Apply alias if provided
207        if let Some(alias_name) = alias {
208            planner.apply_alias_projection(plan, alias_name)
209        } else {
210            Ok(plan)
211        }
212    }
213
214    #[cfg(test)]
215    async fn stmt_to_plan(
216        table_provider: DfTableSourceProvider,
217        stmt: &EvalStmt,
218        query_engine_state: &QueryEngineState,
219    ) -> Result<LogicalPlan> {
220        Self::stmt_to_plan_with_alias(table_provider, stmt, None, query_engine_state).await
221    }
222
223    pub async fn prom_expr_to_plan(
224        &mut self,
225        prom_expr: &PromExpr,
226        query_engine_state: &QueryEngineState,
227    ) -> Result<LogicalPlan> {
228        self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
229            .await
230    }
231
232    /**
233    Converts a PromQL expression to a logical plan.
234
235    NOTE:
236        The `timestamp_fn` indicates whether the PromQL `timestamp()` function is being evaluated in the current context.
237        If `true`, the planner generates a logical plan that projects the timestamp (time index) column
238        as the value column for each input row, implementing the PromQL `timestamp()` function semantics.
239        If `false`, the planner generates the standard logical plan for the given PromQL expression.
240    */
241    #[async_recursion]
242    async fn prom_expr_to_plan_inner(
243        &mut self,
244        prom_expr: &PromExpr,
245        timestamp_fn: bool,
246        query_engine_state: &QueryEngineState,
247    ) -> Result<LogicalPlan> {
248        let res = match prom_expr {
249            PromExpr::Aggregate(expr) => {
250                self.prom_aggr_expr_to_plan(query_engine_state, expr)
251                    .await?
252            }
253            PromExpr::Unary(expr) => {
254                self.prom_unary_expr_to_plan(query_engine_state, expr)
255                    .await?
256            }
257            PromExpr::Binary(expr) => {
258                self.prom_binary_expr_to_plan(query_engine_state, expr)
259                    .await?
260            }
261            PromExpr::Paren(ParenExpr { expr }) => {
262                self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
263                    .await?
264            }
265            PromExpr::Subquery(expr) => {
266                self.prom_subquery_expr_to_plan(query_engine_state, expr)
267                    .await?
268            }
269            PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
270            PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
271            PromExpr::VectorSelector(selector) => {
272                self.prom_vector_selector_to_plan(selector, timestamp_fn)
273                    .await?
274            }
275            PromExpr::MatrixSelector(selector) => {
276                self.prom_matrix_selector_to_plan(selector).await?
277            }
278            PromExpr::Call(expr) => {
279                self.prom_call_expr_to_plan(query_engine_state, expr)
280                    .await?
281            }
282            PromExpr::Extension(expr) => {
283                self.prom_ext_expr_to_plan(query_engine_state, expr).await?
284            }
285        };
286
287        Ok(res)
288    }
289
290    async fn prom_subquery_expr_to_plan(
291        &mut self,
292        query_engine_state: &QueryEngineState,
293        subquery_expr: &SubqueryExpr,
294    ) -> Result<LogicalPlan> {
295        let SubqueryExpr {
296            expr, range, step, ..
297        } = subquery_expr;
298
299        let current_interval = self.ctx.interval;
300        if let Some(step) = step {
301            self.ctx.interval = step.as_millis() as _;
302        }
303        let current_start = self.ctx.start;
304        self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
305        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
306        self.ctx.interval = current_interval;
307        self.ctx.start = current_start;
308
309        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
310        let range_ms = range.as_millis() as _;
311        self.ctx.range = Some(range_ms);
312
313        let manipulate = RangeManipulate::new(
314            self.ctx.start,
315            self.ctx.end,
316            self.ctx.interval,
317            range_ms,
318            self.ctx
319                .time_index_column
320                .clone()
321                .expect("time index should be set in `setup_context`"),
322            self.ctx.field_columns.clone(),
323            input,
324        )
325        .context(DataFusionPlanningSnafu)?;
326
327        Ok(LogicalPlan::Extension(Extension {
328            node: Arc::new(manipulate),
329        }))
330    }
331
332    async fn prom_aggr_expr_to_plan(
333        &mut self,
334        query_engine_state: &QueryEngineState,
335        aggr_expr: &AggregateExpr,
336    ) -> Result<LogicalPlan> {
337        let AggregateExpr {
338            op,
339            expr,
340            modifier,
341            param,
342        } = aggr_expr;
343
344        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
345
346        match (*op).id() {
347            token::T_TOPK | token::T_BOTTOMK => {
348                self.prom_topk_bottomk_to_plan(aggr_expr, input).await
349            }
350            _ => {
351                // calculate columns to group by
352                // Need to append time index column into group by columns
353                let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
354                // convert op and value columns to aggregate exprs
355                let (aggr_exprs, prev_field_exprs) =
356                    self.create_aggregate_exprs(*op, param, &input)?;
357
358                // create plan
359                let builder = LogicalPlanBuilder::from(input);
360                let builder = if op.id() == token::T_COUNT_VALUES {
361                    let label = Self::get_param_value_as_str(*op, param)?;
362                    // `count_values` must be grouped by fields,
363                    // and project the fields to the new label.
364                    group_exprs.extend(prev_field_exprs.clone());
365                    let project_fields = self
366                        .create_field_column_exprs()?
367                        .into_iter()
368                        .chain(self.create_tag_column_exprs()?)
369                        .chain(Some(self.create_time_index_column_expr()?))
370                        .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
371
372                    builder
373                        .aggregate(group_exprs.clone(), aggr_exprs)
374                        .context(DataFusionPlanningSnafu)?
375                        .project(project_fields)
376                        .context(DataFusionPlanningSnafu)?
377                } else {
378                    builder
379                        .aggregate(group_exprs.clone(), aggr_exprs)
380                        .context(DataFusionPlanningSnafu)?
381                };
382
383                let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
384
385                builder
386                    .sort(sort_expr)
387                    .context(DataFusionPlanningSnafu)?
388                    .build()
389                    .context(DataFusionPlanningSnafu)
390            }
391        }
392    }
393
394    /// Create logical plan for PromQL topk and bottomk expr.
395    async fn prom_topk_bottomk_to_plan(
396        &mut self,
397        aggr_expr: &AggregateExpr,
398        input: LogicalPlan,
399    ) -> Result<LogicalPlan> {
400        let AggregateExpr {
401            op,
402            param,
403            modifier,
404            ..
405        } = aggr_expr;
406
407        let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
408
409        let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
410
411        // convert op and value columns to window exprs.
412        let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
413
414        let rank_columns: Vec<_> = window_exprs
415            .iter()
416            .map(|expr| expr.schema_name().to_string())
417            .collect();
418
419        // Create ranks filter with `Operator::Or`.
420        // Safety: at least one rank column
421        let filter: DfExpr = rank_columns
422            .iter()
423            .fold(None, |expr, rank| {
424                let predicate = DfExpr::BinaryExpr(BinaryExpr {
425                    left: Box::new(col(rank)),
426                    op: Operator::LtEq,
427                    right: Box::new(val.clone()),
428                });
429
430                match expr {
431                    None => Some(predicate),
432                    Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
433                        left: Box::new(expr),
434                        op: Operator::Or,
435                        right: Box::new(predicate),
436                    })),
437                }
438            })
439            .unwrap();
440
441        let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
442
443        let mut new_group_exprs = group_exprs.clone();
444        // Order by ranks
445        new_group_exprs.extend(rank_columns);
446
447        let group_sort_expr = new_group_exprs
448            .into_iter()
449            .map(|expr| expr.sort(true, false));
450
451        let project_fields = self
452            .create_field_column_exprs()?
453            .into_iter()
454            .chain(self.create_tag_column_exprs()?)
455            .chain(Some(self.create_time_index_column_expr()?));
456
457        LogicalPlanBuilder::from(input)
458            .window(window_exprs)
459            .context(DataFusionPlanningSnafu)?
460            .filter(filter)
461            .context(DataFusionPlanningSnafu)?
462            .sort(group_sort_expr)
463            .context(DataFusionPlanningSnafu)?
464            .project(project_fields)
465            .context(DataFusionPlanningSnafu)?
466            .build()
467            .context(DataFusionPlanningSnafu)
468    }
469
470    async fn prom_unary_expr_to_plan(
471        &mut self,
472        query_engine_state: &QueryEngineState,
473        unary_expr: &UnaryExpr,
474    ) -> Result<LogicalPlan> {
475        let UnaryExpr { expr } = unary_expr;
476        // Unary Expr in PromQL implys the `-` operator
477        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
478        self.projection_for_each_field_column(input, |col| {
479            Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
480        })
481    }
482
483    async fn prom_binary_expr_to_plan(
484        &mut self,
485        query_engine_state: &QueryEngineState,
486        binary_expr: &PromBinaryExpr,
487    ) -> Result<LogicalPlan> {
488        let PromBinaryExpr {
489            lhs,
490            rhs,
491            op,
492            modifier,
493        } = binary_expr;
494
495        // if set to true, comparison operator will return 0/1 (for true/false) instead of
496        // filter on the result column
497        let should_return_bool = if let Some(m) = modifier {
498            m.return_bool
499        } else {
500            false
501        };
502        let is_comparison_op = Self::is_token_a_comparison_op(*op);
503
504        // we should build a filter plan here if the op is comparison op and need not
505        // to return 0/1. Otherwise, we should build a projection plan
506        match (
507            Self::try_build_literal_expr(lhs),
508            Self::try_build_literal_expr(rhs),
509        ) {
510            (Some(lhs), Some(rhs)) => {
511                self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
512                self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
513                self.ctx.reset_table_name_and_schema();
514                let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
515                let mut field_expr = field_expr_builder(lhs, rhs)?;
516
517                if is_comparison_op && should_return_bool {
518                    field_expr = DfExpr::Cast(Cast {
519                        expr: Box::new(field_expr),
520                        data_type: ArrowDataType::Float64,
521                    });
522                }
523
524                Ok(LogicalPlan::Extension(Extension {
525                    node: Arc::new(
526                        EmptyMetric::new(
527                            self.ctx.start,
528                            self.ctx.end,
529                            self.ctx.interval,
530                            SPECIAL_TIME_FUNCTION.to_string(),
531                            DEFAULT_FIELD_COLUMN.to_string(),
532                            Some(field_expr),
533                        )
534                        .context(DataFusionPlanningSnafu)?,
535                    ),
536                }))
537            }
538            // lhs is a literal, rhs is a column
539            (Some(mut expr), None) => {
540                let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
541                // check if the literal is a special time expr
542                if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
543                    expr = time_expr
544                }
545                let bin_expr_builder = |col: &String| {
546                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
547                    let mut binary_expr =
548                        binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
549
550                    if is_comparison_op && should_return_bool {
551                        binary_expr = DfExpr::Cast(Cast {
552                            expr: Box::new(binary_expr),
553                            data_type: ArrowDataType::Float64,
554                        });
555                    }
556                    Ok(binary_expr)
557                };
558                if is_comparison_op && !should_return_bool {
559                    self.filter_on_field_column(input, bin_expr_builder)
560                } else {
561                    self.projection_for_each_field_column(input, bin_expr_builder)
562                }
563            }
564            // lhs is a column, rhs is a literal
565            (None, Some(mut expr)) => {
566                let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
567                // check if the literal is a special time expr
568                if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
569                    expr = time_expr
570                }
571                let bin_expr_builder = |col: &String| {
572                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
573                    let mut binary_expr =
574                        binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
575
576                    if is_comparison_op && should_return_bool {
577                        binary_expr = DfExpr::Cast(Cast {
578                            expr: Box::new(binary_expr),
579                            data_type: ArrowDataType::Float64,
580                        });
581                    }
582                    Ok(binary_expr)
583                };
584                if is_comparison_op && !should_return_bool {
585                    self.filter_on_field_column(input, bin_expr_builder)
586                } else {
587                    self.projection_for_each_field_column(input, bin_expr_builder)
588                }
589            }
590            // both are columns. join them on time index
591            (None, None) => {
592                let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
593                let left_field_columns = self.ctx.field_columns.clone();
594                let left_time_index_column = self.ctx.time_index_column.clone();
595                let mut left_table_ref = self
596                    .table_ref()
597                    .unwrap_or_else(|_| TableReference::bare(""));
598                let left_context = self.ctx.clone();
599
600                let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
601                let right_field_columns = self.ctx.field_columns.clone();
602                let right_time_index_column = self.ctx.time_index_column.clone();
603                let mut right_table_ref = self
604                    .table_ref()
605                    .unwrap_or_else(|_| TableReference::bare(""));
606                let right_context = self.ctx.clone();
607
608                // TODO(ruihang): avoid join if left and right are the same table
609
610                // set op has "special" join semantics
611                if Self::is_token_a_set_op(*op) {
612                    return self.set_op_on_non_field_columns(
613                        left_input,
614                        right_input,
615                        left_context,
616                        right_context,
617                        *op,
618                        modifier,
619                    );
620                }
621
622                // normal join
623                if left_table_ref == right_table_ref {
624                    // rename table references to avoid ambiguity
625                    left_table_ref = TableReference::bare("lhs");
626                    right_table_ref = TableReference::bare("rhs");
627                    // `self.ctx` have ctx in right plan, if right plan have no tag,
628                    // we use left plan ctx as the ctx for subsequent calculations,
629                    // to avoid case like `host + scalar(...)`
630                    // we need preserve tag column on `host` table in subsequent projection,
631                    // which only show in left plan ctx.
632                    if self.ctx.tag_columns.is_empty() {
633                        self.ctx = left_context.clone();
634                        self.ctx.table_name = Some("lhs".to_string());
635                    } else {
636                        self.ctx.table_name = Some("rhs".to_string());
637                    }
638                }
639                let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
640
641                let join_plan = self.join_on_non_field_columns(
642                    left_input,
643                    right_input,
644                    left_table_ref.clone(),
645                    right_table_ref.clone(),
646                    left_time_index_column,
647                    right_time_index_column,
648                    // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
649                    // under this case we only join on time index
650                    left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
651                    modifier,
652                )?;
653                let join_plan_schema = join_plan.schema().clone();
654
655                let bin_expr_builder = |_: &String| {
656                    let (left_col_name, right_col_name) = field_columns.next().unwrap();
657                    let left_col = join_plan_schema
658                        .qualified_field_with_name(Some(&left_table_ref), left_col_name)
659                        .context(DataFusionPlanningSnafu)?
660                        .into();
661                    let right_col = join_plan_schema
662                        .qualified_field_with_name(Some(&right_table_ref), right_col_name)
663                        .context(DataFusionPlanningSnafu)?
664                        .into();
665
666                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
667                    let mut binary_expr =
668                        binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
669                    if is_comparison_op && should_return_bool {
670                        binary_expr = DfExpr::Cast(Cast {
671                            expr: Box::new(binary_expr),
672                            data_type: ArrowDataType::Float64,
673                        });
674                    }
675                    Ok(binary_expr)
676                };
677                if is_comparison_op && !should_return_bool {
678                    self.filter_on_field_column(join_plan, bin_expr_builder)
679                } else {
680                    self.projection_for_each_field_column(join_plan, bin_expr_builder)
681                }
682            }
683        }
684    }
685
686    fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
687        let NumberLiteral { val } = number_literal;
688        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
689        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
690        self.ctx.reset_table_name_and_schema();
691        let literal_expr = df_prelude::lit(*val);
692
693        let plan = LogicalPlan::Extension(Extension {
694            node: Arc::new(
695                EmptyMetric::new(
696                    self.ctx.start,
697                    self.ctx.end,
698                    self.ctx.interval,
699                    SPECIAL_TIME_FUNCTION.to_string(),
700                    DEFAULT_FIELD_COLUMN.to_string(),
701                    Some(literal_expr),
702                )
703                .context(DataFusionPlanningSnafu)?,
704            ),
705        });
706        Ok(plan)
707    }
708
709    fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
710        let StringLiteral { val } = string_literal;
711        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
712        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
713        self.ctx.reset_table_name_and_schema();
714        let literal_expr = df_prelude::lit(val.clone());
715
716        let plan = LogicalPlan::Extension(Extension {
717            node: Arc::new(
718                EmptyMetric::new(
719                    self.ctx.start,
720                    self.ctx.end,
721                    self.ctx.interval,
722                    SPECIAL_TIME_FUNCTION.to_string(),
723                    DEFAULT_FIELD_COLUMN.to_string(),
724                    Some(literal_expr),
725                )
726                .context(DataFusionPlanningSnafu)?,
727            ),
728        });
729        Ok(plan)
730    }
731
732    async fn prom_vector_selector_to_plan(
733        &mut self,
734        vector_selector: &VectorSelector,
735        timestamp_fn: bool,
736    ) -> Result<LogicalPlan> {
737        let VectorSelector {
738            name,
739            offset,
740            matchers,
741            at: _,
742        } = vector_selector;
743        let matchers = self.preprocess_label_matchers(matchers, name)?;
744        if let Some(empty_plan) = self.setup_context().await? {
745            return Ok(empty_plan);
746        }
747        let normalize = self
748            .selector_to_series_normalize_plan(offset, matchers, false)
749            .await?;
750
751        let normalize = if timestamp_fn {
752            // If evaluating the PromQL `timestamp()` function, project the time index column as the value column
753            // before wrapping with [`InstantManipulate`], so the output matches PromQL's `timestamp()` semantics.
754            self.create_timestamp_func_plan(normalize)?
755        } else {
756            normalize
757        };
758
759        let manipulate = InstantManipulate::new(
760            self.ctx.start,
761            self.ctx.end,
762            self.ctx.lookback_delta,
763            self.ctx.interval,
764            self.ctx
765                .time_index_column
766                .clone()
767                .expect("time index should be set in `setup_context`"),
768            self.ctx.field_columns.first().cloned(),
769            normalize,
770        );
771        Ok(LogicalPlan::Extension(Extension {
772            node: Arc::new(manipulate),
773        }))
774    }
775
776    /// Builds a projection plan for the PromQL `timestamp()` function.
777    /// Projects the time index column as the value column for each row.
778    ///
779    /// # Arguments
780    /// * `normalize` - Input [`LogicalPlan`] for the normalized series.
781    ///
782    /// # Returns
783    /// Returns a [`Result<LogicalPlan>`] where the resulting logical plan projects the timestamp
784    /// column as the value column, along with the original tag and time index columns.
785    ///
786    /// # Timestamp vs. Time Function
787    ///
788    /// - **Timestamp Function (`timestamp()`)**: In PromQL, the `timestamp()` function returns the
789    ///   timestamp (time index) of each sample as the value column.
790    ///
791    /// - **Time Function (`time()`)**: The `time()` function returns the evaluation time of the query
792    ///   as a scalar value.
793    ///
794    /// # Side Effects
795    /// Updates the planner context's field columns to the timestamp column name.
796    ///
797    fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
798        let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
799            .alias(DEFAULT_FIELD_COLUMN);
800        self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
801        let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
802        project_exprs.push(self.create_time_index_column_expr()?);
803        project_exprs.push(time_expr);
804        project_exprs.extend(self.create_tag_column_exprs()?);
805
806        LogicalPlanBuilder::from(normalize)
807            .project(project_exprs)
808            .context(DataFusionPlanningSnafu)?
809            .build()
810            .context(DataFusionPlanningSnafu)
811    }
812
813    async fn prom_matrix_selector_to_plan(
814        &mut self,
815        matrix_selector: &MatrixSelector,
816    ) -> Result<LogicalPlan> {
817        let MatrixSelector { vs, range } = matrix_selector;
818        let VectorSelector {
819            name,
820            offset,
821            matchers,
822            ..
823        } = vs;
824        let matchers = self.preprocess_label_matchers(matchers, name)?;
825        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
826        let range_ms = range.as_millis() as _;
827        self.ctx.range = Some(range_ms);
828
829        // Some functions like rate may require special fields in the RangeManipulate plan
830        // so we can't skip RangeManipulate.
831        let normalize = match self.setup_context().await? {
832            Some(empty_plan) => empty_plan,
833            None => {
834                self.selector_to_series_normalize_plan(offset, matchers, true)
835                    .await?
836            }
837        };
838        let manipulate = RangeManipulate::new(
839            self.ctx.start,
840            self.ctx.end,
841            self.ctx.interval,
842            // TODO(ruihang): convert via Timestamp datatypes to support different time units
843            range_ms,
844            self.ctx
845                .time_index_column
846                .clone()
847                .expect("time index should be set in `setup_context`"),
848            self.ctx.field_columns.clone(),
849            normalize,
850        )
851        .context(DataFusionPlanningSnafu)?;
852
853        Ok(LogicalPlan::Extension(Extension {
854            node: Arc::new(manipulate),
855        }))
856    }
857
858    async fn prom_call_expr_to_plan(
859        &mut self,
860        query_engine_state: &QueryEngineState,
861        call_expr: &Call,
862    ) -> Result<LogicalPlan> {
863        let Call { func, args } = call_expr;
864        // some special functions that are not expression but a plan
865        match func.name {
866            SPECIAL_HISTOGRAM_QUANTILE => {
867                return self.create_histogram_plan(args, query_engine_state).await;
868            }
869            SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
870            SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
871            SPECIAL_ABSENT_FUNCTION => {
872                return self.create_absent_plan(args, query_engine_state).await;
873            }
874            _ => {}
875        }
876
877        // transform function arguments
878        let args = self.create_function_args(&args.args)?;
879        let input = if let Some(prom_expr) = &args.input {
880            self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
881                .await?
882        } else {
883            self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
884            self.ctx.reset_table_name_and_schema();
885            self.ctx.tag_columns = vec![];
886            self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
887            LogicalPlan::Extension(Extension {
888                node: Arc::new(
889                    EmptyMetric::new(
890                        self.ctx.start,
891                        self.ctx.end,
892                        self.ctx.interval,
893                        SPECIAL_TIME_FUNCTION.to_string(),
894                        DEFAULT_FIELD_COLUMN.to_string(),
895                        None,
896                    )
897                    .context(DataFusionPlanningSnafu)?,
898                ),
899            })
900        };
901        let (mut func_exprs, new_tags) =
902            self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
903        func_exprs.insert(0, self.create_time_index_column_expr()?);
904        func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
905
906        let builder = LogicalPlanBuilder::from(input)
907            .project(func_exprs)
908            .context(DataFusionPlanningSnafu)?
909            .filter(self.create_empty_values_filter_expr()?)
910            .context(DataFusionPlanningSnafu)?;
911
912        let builder = match func.name {
913            "sort" => builder
914                .sort(self.create_field_columns_sort_exprs(true))
915                .context(DataFusionPlanningSnafu)?,
916            "sort_desc" => builder
917                .sort(self.create_field_columns_sort_exprs(false))
918                .context(DataFusionPlanningSnafu)?,
919            "sort_by_label" => builder
920                .sort(Self::create_sort_exprs_by_tags(
921                    func.name,
922                    args.literals,
923                    true,
924                )?)
925                .context(DataFusionPlanningSnafu)?,
926            "sort_by_label_desc" => builder
927                .sort(Self::create_sort_exprs_by_tags(
928                    func.name,
929                    args.literals,
930                    false,
931                )?)
932                .context(DataFusionPlanningSnafu)?,
933
934            _ => builder,
935        };
936
937        // Update context tags after building plan
938        // We can't push them before planning, because they won't exist until projection.
939        for tag in new_tags {
940            self.ctx.tag_columns.push(tag);
941        }
942
943        let plan = builder.build().context(DataFusionPlanningSnafu)?;
944        common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
945
946        Ok(plan)
947    }
948
949    async fn prom_ext_expr_to_plan(
950        &mut self,
951        query_engine_state: &QueryEngineState,
952        ext_expr: &promql_parser::parser::ast::Extension,
953    ) -> Result<LogicalPlan> {
954        // let promql_parser::parser::ast::Extension { expr } = ext_expr;
955        let expr = &ext_expr.expr;
956        let children = expr.children();
957        let plan = self
958            .prom_expr_to_plan(&children[0], query_engine_state)
959            .await?;
960        // Wrapper for the explanation/analyze of the existing plan
961        // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
962        // if `analyze` is true, runs the actual plan and produces
963        // information about metrics during run.
964        // if `verbose` is true, prints out additional details when VERBOSE keyword is specified
965        match expr.name() {
966            "ANALYZE" => LogicalPlanBuilder::from(plan)
967                .explain(false, true)
968                .unwrap()
969                .build()
970                .context(DataFusionPlanningSnafu),
971            "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
972                .explain(true, true)
973                .unwrap()
974                .build()
975                .context(DataFusionPlanningSnafu),
976            "EXPLAIN" => LogicalPlanBuilder::from(plan)
977                .explain(false, false)
978                .unwrap()
979                .build()
980                .context(DataFusionPlanningSnafu),
981            "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
982                .explain(true, false)
983                .unwrap()
984                .build()
985                .context(DataFusionPlanningSnafu),
986            _ => LogicalPlanBuilder::empty(true)
987                .build()
988                .context(DataFusionPlanningSnafu),
989        }
990    }
991
992    /// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].
993    /// Returns a new [Matchers] that doesn't contain metric name matcher.
994    ///
995    /// Each call to this function means new selector is started. Thus, the context will be reset
996    /// at first.
997    ///
998    /// Name rule:
999    /// - if `name` is some, then the matchers MUST NOT contain `__name__` matcher.
1000    /// - if `name` is none, then the matchers MAY contain NONE OR MULTIPLE `__name__` matchers.
1001    #[allow(clippy::mutable_key_type)]
1002    fn preprocess_label_matchers(
1003        &mut self,
1004        label_matchers: &Matchers,
1005        name: &Option<String>,
1006    ) -> Result<Matchers> {
1007        self.ctx.reset();
1008
1009        let metric_name;
1010        if let Some(name) = name.clone() {
1011            metric_name = Some(name);
1012            ensure!(
1013                label_matchers.find_matchers(METRIC_NAME).is_empty(),
1014                MultipleMetricMatchersSnafu
1015            );
1016        } else {
1017            let mut matches = label_matchers.find_matchers(METRIC_NAME);
1018            ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1019            ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1020            ensure!(
1021                matches[0].op == MatchOp::Equal,
1022                UnsupportedMatcherOpSnafu {
1023                    matcher_op: matches[0].op.to_string(),
1024                    matcher: METRIC_NAME
1025                }
1026            );
1027            metric_name = matches.pop().map(|m| m.value);
1028        }
1029
1030        self.ctx.table_name = metric_name;
1031
1032        let mut matchers = HashSet::new();
1033        for matcher in &label_matchers.matchers {
1034            // TODO(ruihang): support other metric match ops
1035            if matcher.name == FIELD_COLUMN_MATCHER {
1036                self.ctx
1037                    .field_column_matcher
1038                    .get_or_insert_default()
1039                    .push(matcher.clone());
1040            } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1041                ensure!(
1042                    matcher.op == MatchOp::Equal,
1043                    UnsupportedMatcherOpSnafu {
1044                        matcher: matcher.name.clone(),
1045                        matcher_op: matcher.op.to_string(),
1046                    }
1047                );
1048                self.ctx.schema_name = Some(matcher.value.clone());
1049            } else if matcher.name != METRIC_NAME {
1050                self.ctx.selector_matcher.push(matcher.clone());
1051                let _ = matchers.insert(matcher.clone());
1052            }
1053        }
1054
1055        Ok(Matchers::new(matchers.into_iter().collect()))
1056    }
1057
1058    async fn selector_to_series_normalize_plan(
1059        &mut self,
1060        offset: &Option<Offset>,
1061        label_matchers: Matchers,
1062        is_range_selector: bool,
1063    ) -> Result<LogicalPlan> {
1064        // make table scan plan
1065        let table_ref = self.table_ref()?;
1066        let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1067        let table_schema = table_scan.schema();
1068
1069        // make filter exprs
1070        let offset_duration = match offset {
1071            Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1072            Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1073            None => 0,
1074        };
1075        let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1076        if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1077            scan_filters.push(time_index_filter);
1078        }
1079        table_scan = LogicalPlanBuilder::from(table_scan)
1080            .filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
1081            .context(DataFusionPlanningSnafu)?
1082            .build()
1083            .context(DataFusionPlanningSnafu)?;
1084
1085        // make a projection plan if there is any `__field__` matcher
1086        if let Some(field_matchers) = &self.ctx.field_column_matcher {
1087            let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1088            // opt-in set
1089            let mut result_set = HashSet::new();
1090            // opt-out set
1091            let mut reverse_set = HashSet::new();
1092            for matcher in field_matchers {
1093                match &matcher.op {
1094                    MatchOp::Equal => {
1095                        if col_set.contains(&matcher.value) {
1096                            let _ = result_set.insert(matcher.value.clone());
1097                        } else {
1098                            return Err(ColumnNotFoundSnafu {
1099                                col: matcher.value.clone(),
1100                            }
1101                            .build());
1102                        }
1103                    }
1104                    MatchOp::NotEqual => {
1105                        if col_set.contains(&matcher.value) {
1106                            let _ = reverse_set.insert(matcher.value.clone());
1107                        } else {
1108                            return Err(ColumnNotFoundSnafu {
1109                                col: matcher.value.clone(),
1110                            }
1111                            .build());
1112                        }
1113                    }
1114                    MatchOp::Re(regex) => {
1115                        for col in &self.ctx.field_columns {
1116                            if regex.is_match(col) {
1117                                let _ = result_set.insert(col.clone());
1118                            }
1119                        }
1120                    }
1121                    MatchOp::NotRe(regex) => {
1122                        for col in &self.ctx.field_columns {
1123                            if regex.is_match(col) {
1124                                let _ = reverse_set.insert(col.clone());
1125                            }
1126                        }
1127                    }
1128                }
1129            }
1130            // merge two set
1131            if result_set.is_empty() {
1132                result_set = col_set.into_iter().cloned().collect();
1133            }
1134            for col in reverse_set {
1135                let _ = result_set.remove(&col);
1136            }
1137
1138            // mask the field columns in context using computed result set
1139            self.ctx.field_columns = self
1140                .ctx
1141                .field_columns
1142                .drain(..)
1143                .filter(|col| result_set.contains(col))
1144                .collect();
1145
1146            let exprs = result_set
1147                .into_iter()
1148                .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1149                .chain(self.create_tag_column_exprs()?)
1150                .chain(Some(self.create_time_index_column_expr()?))
1151                .collect::<Vec<_>>();
1152
1153            // reuse this variable for simplicity
1154            table_scan = LogicalPlanBuilder::from(table_scan)
1155                .project(exprs)
1156                .context(DataFusionPlanningSnafu)?
1157                .build()
1158                .context(DataFusionPlanningSnafu)?;
1159        }
1160
1161        // make sort plan
1162        let sort_plan = LogicalPlanBuilder::from(table_scan)
1163            .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1164            .context(DataFusionPlanningSnafu)?
1165            .build()
1166            .context(DataFusionPlanningSnafu)?;
1167
1168        // make divide plan
1169        let time_index_column =
1170            self.ctx
1171                .time_index_column
1172                .clone()
1173                .with_context(|| TimeIndexNotFoundSnafu {
1174                    table: table_ref.to_string(),
1175                })?;
1176        let divide_plan = LogicalPlan::Extension(Extension {
1177            node: Arc::new(SeriesDivide::new(
1178                self.ctx.tag_columns.clone(),
1179                time_index_column,
1180                sort_plan,
1181            )),
1182        });
1183
1184        // make series_normalize plan
1185        if !is_range_selector && offset_duration == 0 {
1186            return Ok(divide_plan);
1187        }
1188        let series_normalize = SeriesNormalize::new(
1189            offset_duration,
1190            self.ctx
1191                .time_index_column
1192                .clone()
1193                .with_context(|| TimeIndexNotFoundSnafu {
1194                    table: table_ref.to_quoted_string(),
1195                })?,
1196            is_range_selector,
1197            self.ctx.tag_columns.clone(),
1198            divide_plan,
1199        );
1200        let logical_plan = LogicalPlan::Extension(Extension {
1201            node: Arc::new(series_normalize),
1202        });
1203
1204        Ok(logical_plan)
1205    }
1206
1207    /// Convert [LabelModifier] to [Column] exprs for aggregation.
1208    /// Timestamp column and tag columns will be included.
1209    ///
1210    /// # Side effect
1211    ///
1212    /// This method will also change the tag columns in ctx if `update_ctx` is true.
1213    fn agg_modifier_to_col(
1214        &mut self,
1215        input_schema: &DFSchemaRef,
1216        modifier: &Option<LabelModifier>,
1217        update_ctx: bool,
1218    ) -> Result<Vec<DfExpr>> {
1219        match modifier {
1220            None => {
1221                if update_ctx {
1222                    self.ctx.tag_columns.clear();
1223                }
1224                Ok(vec![self.create_time_index_column_expr()?])
1225            }
1226            Some(LabelModifier::Include(labels)) => {
1227                if update_ctx {
1228                    self.ctx.tag_columns.clear();
1229                }
1230                let mut exprs = Vec::with_capacity(labels.labels.len());
1231                for label in &labels.labels {
1232                    // nonexistence label will be ignored
1233                    if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label)
1234                    {
1235                        exprs.push(DfExpr::Column(Column::from_name(column_name.clone())));
1236
1237                        if update_ctx {
1238                            // update the tag columns in context
1239                            self.ctx.tag_columns.push(column_name);
1240                        }
1241                    }
1242                }
1243                // add timestamp column
1244                exprs.push(self.create_time_index_column_expr()?);
1245
1246                Ok(exprs)
1247            }
1248            Some(LabelModifier::Exclude(labels)) => {
1249                let mut all_fields = input_schema
1250                    .fields()
1251                    .iter()
1252                    .map(|f| f.name())
1253                    .collect::<BTreeSet<_>>();
1254
1255                // remove "without"-ed fields
1256                // nonexistence label will be ignored
1257                for label in &labels.labels {
1258                    let _ = all_fields.remove(label);
1259                }
1260
1261                // remove time index and value fields
1262                if let Some(time_index) = &self.ctx.time_index_column {
1263                    let _ = all_fields.remove(time_index);
1264                }
1265                for value in &self.ctx.field_columns {
1266                    let _ = all_fields.remove(value);
1267                }
1268
1269                if update_ctx {
1270                    // change the tag columns in context
1271                    self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1272                }
1273
1274                // collect remaining fields and convert to col expr
1275                let mut exprs = all_fields
1276                    .into_iter()
1277                    .map(|c| DfExpr::Column(Column::from(c)))
1278                    .collect::<Vec<_>>();
1279
1280                // add timestamp column
1281                exprs.push(self.create_time_index_column_expr()?);
1282
1283                Ok(exprs)
1284            }
1285        }
1286    }
1287
1288    // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
1289    pub fn matchers_to_expr(
1290        label_matchers: Matchers,
1291        table_schema: &DFSchemaRef,
1292    ) -> Result<Vec<DfExpr>> {
1293        let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1294        for matcher in label_matchers.matchers {
1295            if matcher.name == SCHEMA_COLUMN_MATCHER
1296                || matcher.name == DB_COLUMN_MATCHER
1297                || matcher.name == FIELD_COLUMN_MATCHER
1298            {
1299                continue;
1300            }
1301
1302            let column_name = Self::find_case_sensitive_column(table_schema, matcher.name.as_str());
1303            let col = if let Some(column_name) = column_name {
1304                DfExpr::Column(Column::from_name(column_name))
1305            } else {
1306                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
1307                    .alias(matcher.name.clone())
1308            };
1309            let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1310            let expr = match matcher.op {
1311                MatchOp::Equal => col.eq(lit),
1312                MatchOp::NotEqual => col.not_eq(lit),
1313                MatchOp::Re(re) => {
1314                    // TODO(ruihang): a more programmatic way to handle this in datafusion
1315
1316                    // This is a hack to handle `.+` and `.*`, and is not strictly correct
1317                    // `.` doesn't match newline (`\n`). Given this is in PromQL context,
1318                    // most of the time it's fine.
1319                    if re.as_str() == "^(?:.*)$" {
1320                        continue;
1321                    }
1322                    if re.as_str() == "^(?:.+)$" {
1323                        col.not_eq(DfExpr::Literal(
1324                            ScalarValue::Utf8(Some(String::new())),
1325                            None,
1326                        ))
1327                    } else {
1328                        DfExpr::BinaryExpr(BinaryExpr {
1329                            left: Box::new(col),
1330                            op: Operator::RegexMatch,
1331                            right: Box::new(DfExpr::Literal(
1332                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1333                                None,
1334                            )),
1335                        })
1336                    }
1337                }
1338                MatchOp::NotRe(re) => {
1339                    if re.as_str() == "^(?:.*)$" {
1340                        DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1341                    } else if re.as_str() == "^(?:.+)$" {
1342                        col.eq(DfExpr::Literal(
1343                            ScalarValue::Utf8(Some(String::new())),
1344                            None,
1345                        ))
1346                    } else {
1347                        DfExpr::BinaryExpr(BinaryExpr {
1348                            left: Box::new(col),
1349                            op: Operator::RegexNotMatch,
1350                            right: Box::new(DfExpr::Literal(
1351                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1352                                None,
1353                            )),
1354                        })
1355                    }
1356                }
1357            };
1358            exprs.push(expr);
1359        }
1360
1361        Ok(exprs)
1362    }
1363
1364    fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option<String> {
1365        schema
1366            .fields()
1367            .iter()
1368            .find(|field| field.name() == column)
1369            .map(|field| field.name().clone())
1370    }
1371
1372    fn table_ref(&self) -> Result<TableReference> {
1373        let table_name = self
1374            .ctx
1375            .table_name
1376            .clone()
1377            .context(TableNameNotFoundSnafu)?;
1378
1379        // set schema name if `__schema__` is given
1380        let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1381            TableReference::partial(schema_name.as_str(), table_name.as_str())
1382        } else {
1383            TableReference::bare(table_name.as_str())
1384        };
1385
1386        Ok(table_ref)
1387    }
1388
1389    fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1390        let start = self.ctx.start;
1391        let end = self.ctx.end;
1392        if end < start {
1393            return InvalidTimeRangeSnafu { start, end }.fail();
1394        }
1395        let lookback_delta = self.ctx.lookback_delta;
1396        let range = self.ctx.range.unwrap_or_default();
1397        let interval = self.ctx.interval;
1398        let time_index_expr = self.create_time_index_column_expr()?;
1399        let num_points = (end - start) / interval;
1400
1401        // Scan a continuous time range
1402        if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1403            let single_time_range = time_index_expr
1404                .clone()
1405                .gt_eq(DfExpr::Literal(
1406                    ScalarValue::TimestampMillisecond(
1407                        Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
1408                        None,
1409                    ),
1410                    None,
1411                ))
1412                .and(time_index_expr.lt_eq(DfExpr::Literal(
1413                    ScalarValue::TimestampMillisecond(
1414                        Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
1415                        None,
1416                    ),
1417                    None,
1418                )));
1419            return Ok(Some(single_time_range));
1420        }
1421
1422        // Otherwise scan scatter ranges separately
1423        let mut filters = Vec::with_capacity(num_points as usize);
1424        for timestamp in (start..end).step_by(interval as usize) {
1425            filters.push(
1426                time_index_expr
1427                    .clone()
1428                    .gt_eq(DfExpr::Literal(
1429                        ScalarValue::TimestampMillisecond(
1430                            Some(timestamp - offset_duration - lookback_delta - range),
1431                            None,
1432                        ),
1433                        None,
1434                    ))
1435                    .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1436                        ScalarValue::TimestampMillisecond(
1437                            Some(timestamp - offset_duration + lookback_delta),
1438                            None,
1439                        ),
1440                        None,
1441                    ))),
1442            )
1443        }
1444
1445        Ok(filters.into_iter().reduce(DfExpr::or))
1446    }
1447
1448    /// Create a table scan plan and a filter plan with given filter.
1449    ///
1450    /// # Panic
1451    /// If the filter is empty
1452    async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1453        let provider = self
1454            .table_provider
1455            .resolve_table(table_ref.clone())
1456            .await
1457            .context(CatalogSnafu)?;
1458
1459        let is_time_index_ms = provider
1460            .as_any()
1461            .downcast_ref::<DefaultTableSource>()
1462            .context(UnknownTableSnafu)?
1463            .table_provider
1464            .as_any()
1465            .downcast_ref::<DfTableProviderAdapter>()
1466            .context(UnknownTableSnafu)?
1467            .table()
1468            .schema()
1469            .timestamp_column()
1470            .with_context(|| TimeIndexNotFoundSnafu {
1471                table: table_ref.to_quoted_string(),
1472            })?
1473            .data_type
1474            == ConcreteDataType::timestamp_millisecond_datatype();
1475
1476        let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1477            .context(DataFusionPlanningSnafu)?
1478            .build()
1479            .context(DataFusionPlanningSnafu)?;
1480
1481        if !is_time_index_ms {
1482            // cast to ms if time_index not in Millisecond precision
1483            let expr: Vec<_> = self
1484                .ctx
1485                .field_columns
1486                .iter()
1487                .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1488                .chain(self.create_tag_column_exprs()?)
1489                .chain(Some(DfExpr::Alias(Alias {
1490                    expr: Box::new(DfExpr::Cast(Cast {
1491                        expr: Box::new(self.create_time_index_column_expr()?),
1492                        data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1493                    })),
1494                    relation: Some(table_ref.clone()),
1495                    name: self
1496                        .ctx
1497                        .time_index_column
1498                        .as_ref()
1499                        .with_context(|| TimeIndexNotFoundSnafu {
1500                            table: table_ref.to_quoted_string(),
1501                        })?
1502                        .clone(),
1503                    metadata: None,
1504                })))
1505                .collect::<Vec<_>>();
1506            scan_plan = LogicalPlanBuilder::from(scan_plan)
1507                .project(expr)
1508                .context(DataFusionPlanningSnafu)?
1509                .build()
1510                .context(DataFusionPlanningSnafu)?;
1511        }
1512
1513        let result = LogicalPlanBuilder::from(scan_plan)
1514            .build()
1515            .context(DataFusionPlanningSnafu)?;
1516        Ok(result)
1517    }
1518
1519    /// Setup [PromPlannerContext]'s state fields.
1520    ///
1521    /// Returns a logical plan for an empty metric.
1522    async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1523        let table_ref = self.table_ref()?;
1524        let table = match self.table_provider.resolve_table(table_ref.clone()).await {
1525            Err(e) if e.status_code() == StatusCode::TableNotFound => {
1526                let plan = self.setup_context_for_empty_metric()?;
1527                return Ok(Some(plan));
1528            }
1529            res => res.context(CatalogSnafu)?,
1530        };
1531        let table = table
1532            .as_any()
1533            .downcast_ref::<DefaultTableSource>()
1534            .context(UnknownTableSnafu)?
1535            .table_provider
1536            .as_any()
1537            .downcast_ref::<DfTableProviderAdapter>()
1538            .context(UnknownTableSnafu)?
1539            .table();
1540
1541        // set time index column name
1542        let time_index = table
1543            .schema()
1544            .timestamp_column()
1545            .with_context(|| TimeIndexNotFoundSnafu {
1546                table: table_ref.to_quoted_string(),
1547            })?
1548            .name
1549            .clone();
1550        self.ctx.time_index_column = Some(time_index);
1551
1552        // set values columns
1553        let values = table
1554            .table_info()
1555            .meta
1556            .field_column_names()
1557            .cloned()
1558            .collect();
1559        self.ctx.field_columns = values;
1560
1561        // set primary key (tag) columns
1562        let tags = table
1563            .table_info()
1564            .meta
1565            .row_key_column_names()
1566            .filter(|col| {
1567                // remove metric engine's internal columns
1568                col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1569            })
1570            .cloned()
1571            .collect();
1572        self.ctx.tag_columns = tags;
1573
1574        Ok(None)
1575    }
1576
1577    /// Setup [PromPlannerContext]'s state fields for a non existent table
1578    /// without any rows.
1579    fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
1580        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1581        self.ctx.reset_table_name_and_schema();
1582        self.ctx.tag_columns = vec![];
1583        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1584
1585        // The table doesn't have any data, so we set start to 0 and end to -1.
1586        let plan = LogicalPlan::Extension(Extension {
1587            node: Arc::new(
1588                EmptyMetric::new(
1589                    0,
1590                    -1,
1591                    self.ctx.interval,
1592                    SPECIAL_TIME_FUNCTION.to_string(),
1593                    DEFAULT_FIELD_COLUMN.to_string(),
1594                    Some(lit(0.0f64)),
1595                )
1596                .context(DataFusionPlanningSnafu)?,
1597            ),
1598        });
1599        Ok(plan)
1600    }
1601
1602    // TODO(ruihang): insert column expr
1603    fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1604        let mut result = FunctionArgs::default();
1605
1606        for arg in args {
1607            // First try to parse as literal expression (including binary expressions like 100.0 + 3.0)
1608            if let Some(expr) = Self::try_build_literal_expr(arg) {
1609                result.literals.push(expr);
1610            } else {
1611                // If not a literal, treat as vector input
1612                match arg.as_ref() {
1613                    PromExpr::Subquery(_)
1614                    | PromExpr::VectorSelector(_)
1615                    | PromExpr::MatrixSelector(_)
1616                    | PromExpr::Extension(_)
1617                    | PromExpr::Aggregate(_)
1618                    | PromExpr::Paren(_)
1619                    | PromExpr::Call(_)
1620                    | PromExpr::Binary(_)
1621                    | PromExpr::Unary(_) => {
1622                        if result.input.replace(*arg.clone()).is_some() {
1623                            MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1624                        }
1625                    }
1626
1627                    _ => {
1628                        let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
1629                        result.literals.push(expr);
1630                    }
1631                }
1632            }
1633        }
1634
1635        Ok(result)
1636    }
1637
1638    /// Creates function expressions for projection and returns the expressions and new tags.
1639    ///
1640    /// # Side Effects
1641    ///
1642    /// This method will update [PromPlannerContext]'s fields and tags if needed.
1643    fn create_function_expr(
1644        &mut self,
1645        func: &Function,
1646        other_input_exprs: Vec<DfExpr>,
1647        query_engine_state: &QueryEngineState,
1648    ) -> Result<(Vec<DfExpr>, Vec<String>)> {
1649        // TODO(ruihang): check function args list
1650        let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1651
1652        // TODO(ruihang): set this according to in-param list
1653        let field_column_pos = 0;
1654        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1655        // New labels after executing the function, e.g. `label_replace` etc.
1656        let mut new_tags = vec![];
1657        let scalar_func = match func.name {
1658            "increase" => ScalarFunc::ExtrapolateUdf(
1659                Arc::new(Increase::scalar_udf()),
1660                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1661            ),
1662            "rate" => ScalarFunc::ExtrapolateUdf(
1663                Arc::new(Rate::scalar_udf()),
1664                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1665            ),
1666            "delta" => ScalarFunc::ExtrapolateUdf(
1667                Arc::new(Delta::scalar_udf()),
1668                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1669            ),
1670            "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1671            "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1672            "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1673            "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1674            "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1675            "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1676            "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1677            "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1678            "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1679            "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1680            "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1681            "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1682            "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1683            "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1684            "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1685            "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1686            "predict_linear" => {
1687                other_input_exprs[0] = DfExpr::Cast(Cast {
1688                    expr: Box::new(other_input_exprs[0].clone()),
1689                    data_type: ArrowDataType::Int64,
1690                });
1691                ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
1692            }
1693            "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1694            "time" => {
1695                exprs.push(build_special_time_expr(
1696                    self.ctx.time_index_column.as_ref().unwrap(),
1697                ));
1698                ScalarFunc::GeneratedExpr
1699            }
1700            "minute" => {
1701                // date_part('minute', time_index)
1702                let expr = self.date_part_on_time_index("minute")?;
1703                exprs.push(expr);
1704                ScalarFunc::GeneratedExpr
1705            }
1706            "hour" => {
1707                // date_part('hour', time_index)
1708                let expr = self.date_part_on_time_index("hour")?;
1709                exprs.push(expr);
1710                ScalarFunc::GeneratedExpr
1711            }
1712            "month" => {
1713                // date_part('month', time_index)
1714                let expr = self.date_part_on_time_index("month")?;
1715                exprs.push(expr);
1716                ScalarFunc::GeneratedExpr
1717            }
1718            "year" => {
1719                // date_part('year', time_index)
1720                let expr = self.date_part_on_time_index("year")?;
1721                exprs.push(expr);
1722                ScalarFunc::GeneratedExpr
1723            }
1724            "day_of_month" => {
1725                // date_part('day', time_index)
1726                let expr = self.date_part_on_time_index("day")?;
1727                exprs.push(expr);
1728                ScalarFunc::GeneratedExpr
1729            }
1730            "day_of_week" => {
1731                // date_part('dow', time_index)
1732                let expr = self.date_part_on_time_index("dow")?;
1733                exprs.push(expr);
1734                ScalarFunc::GeneratedExpr
1735            }
1736            "day_of_year" => {
1737                // date_part('doy', time_index)
1738                let expr = self.date_part_on_time_index("doy")?;
1739                exprs.push(expr);
1740                ScalarFunc::GeneratedExpr
1741            }
1742            "days_in_month" => {
1743                // date_part(
1744                //     'days',
1745                //     (date_trunc('month', <TIME INDEX>::date) + interval '1 month - 1 day')
1746                // );
1747                let day_lit_expr = "day".lit();
1748                let month_lit_expr = "month".lit();
1749                let interval_1month_lit_expr =
1750                    DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
1751                let interval_1day_lit_expr = DfExpr::Literal(
1752                    ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
1753                    None,
1754                );
1755                let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1756                    left: Box::new(interval_1month_lit_expr),
1757                    op: Operator::Minus,
1758                    right: Box::new(interval_1day_lit_expr),
1759                });
1760                let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1761                    func: datafusion_functions::datetime::date_trunc(),
1762                    args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1763                });
1764                let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1765                    left: Box::new(date_trunc_expr),
1766                    op: Operator::Plus,
1767                    right: Box::new(the_1month_minus_1day_expr),
1768                });
1769                let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1770                    func: datafusion_functions::datetime::date_part(),
1771                    args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1772                });
1773
1774                exprs.push(date_part_expr);
1775                ScalarFunc::GeneratedExpr
1776            }
1777
1778            "label_join" => {
1779                let (concat_expr, dst_label) = Self::build_concat_labels_expr(
1780                    &mut other_input_exprs,
1781                    &self.ctx,
1782                    query_engine_state,
1783                )?;
1784
1785                // Reserve the current field columns except the `dst_label`.
1786                for value in &self.ctx.field_columns {
1787                    if *value != dst_label {
1788                        let expr = DfExpr::Column(Column::from_name(value));
1789                        exprs.push(expr);
1790                    }
1791                }
1792
1793                // Remove it from tag columns if exists to avoid duplicated column names
1794                self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1795                new_tags.push(dst_label);
1796                // Add the new label expr to evaluate
1797                exprs.push(concat_expr);
1798
1799                ScalarFunc::GeneratedExpr
1800            }
1801            "label_replace" => {
1802                if let Some((replace_expr, dst_label)) = self
1803                    .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
1804                {
1805                    // Reserve the current field columns except the `dst_label`.
1806                    for value in &self.ctx.field_columns {
1807                        if *value != dst_label {
1808                            let expr = DfExpr::Column(Column::from_name(value));
1809                            exprs.push(expr);
1810                        }
1811                    }
1812
1813                    ensure!(
1814                        !self.ctx.tag_columns.contains(&dst_label),
1815                        SameLabelSetSnafu
1816                    );
1817                    new_tags.push(dst_label);
1818                    // Add the new label expr to evaluate
1819                    exprs.push(replace_expr);
1820                } else {
1821                    // Keep the current field columns
1822                    for value in &self.ctx.field_columns {
1823                        let expr = DfExpr::Column(Column::from_name(value));
1824                        exprs.push(expr);
1825                    }
1826                }
1827
1828                ScalarFunc::GeneratedExpr
1829            }
1830            "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
1831                // These functions are not expression but a part of plan,
1832                // they are processed by `prom_call_expr_to_plan`.
1833                for value in &self.ctx.field_columns {
1834                    let expr = DfExpr::Column(Column::from_name(value));
1835                    exprs.push(expr);
1836                }
1837
1838                ScalarFunc::GeneratedExpr
1839            }
1840            "round" => {
1841                if other_input_exprs.is_empty() {
1842                    other_input_exprs.push_front(0.0f64.lit());
1843                }
1844                ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1845            }
1846            "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
1847            "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
1848            "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
1849            "pi" => {
1850                // pi functions doesn't accepts any arguments, needs special processing
1851                let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1852                    func: datafusion::functions::math::pi(),
1853                    args: vec![],
1854                });
1855                exprs.push(fn_expr);
1856
1857                ScalarFunc::GeneratedExpr
1858            }
1859            _ => {
1860                if let Some(f) = query_engine_state
1861                    .session_state()
1862                    .scalar_functions()
1863                    .get(func.name)
1864                {
1865                    ScalarFunc::DataFusionBuiltin(f.clone())
1866                } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
1867                    let func_state = query_engine_state.function_state();
1868                    let query_ctx = self.table_provider.query_ctx();
1869
1870                    ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
1871                        state: func_state,
1872                        query_ctx: query_ctx.clone(),
1873                    })))
1874                } else if let Some(f) = datafusion_functions::math::functions()
1875                    .iter()
1876                    .find(|f| f.name() == func.name)
1877                {
1878                    ScalarFunc::DataFusionUdf(f.clone())
1879                } else {
1880                    return UnsupportedExprSnafu {
1881                        name: func.name.to_string(),
1882                    }
1883                    .fail();
1884                }
1885            }
1886        };
1887
1888        for value in &self.ctx.field_columns {
1889            let col_expr = DfExpr::Column(Column::from_name(value));
1890
1891            match scalar_func.clone() {
1892                ScalarFunc::DataFusionBuiltin(func) => {
1893                    other_input_exprs.insert(field_column_pos, col_expr);
1894                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1895                        func,
1896                        args: other_input_exprs.clone().into(),
1897                    });
1898                    exprs.push(fn_expr);
1899                    let _ = other_input_exprs.remove(field_column_pos);
1900                }
1901                ScalarFunc::DataFusionUdf(func) => {
1902                    let args = itertools::chain!(
1903                        other_input_exprs.iter().take(field_column_pos).cloned(),
1904                        std::iter::once(col_expr),
1905                        other_input_exprs.iter().skip(field_column_pos).cloned()
1906                    )
1907                    .collect_vec();
1908                    exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1909                }
1910                ScalarFunc::Udf(func) => {
1911                    let ts_range_expr = DfExpr::Column(Column::from_name(
1912                        RangeManipulate::build_timestamp_range_name(
1913                            self.ctx.time_index_column.as_ref().unwrap(),
1914                        ),
1915                    ));
1916                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1917                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1918                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1919                        func,
1920                        args: other_input_exprs.clone().into(),
1921                    });
1922                    exprs.push(fn_expr);
1923                    let _ = other_input_exprs.remove(field_column_pos + 1);
1924                    let _ = other_input_exprs.remove(field_column_pos);
1925                }
1926                ScalarFunc::ExtrapolateUdf(func, range_length) => {
1927                    let ts_range_expr = DfExpr::Column(Column::from_name(
1928                        RangeManipulate::build_timestamp_range_name(
1929                            self.ctx.time_index_column.as_ref().unwrap(),
1930                        ),
1931                    ));
1932                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1933                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1934                    other_input_exprs
1935                        .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1936                    other_input_exprs.push_back(lit(range_length));
1937                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1938                        func,
1939                        args: other_input_exprs.clone().into(),
1940                    });
1941                    exprs.push(fn_expr);
1942                    let _ = other_input_exprs.pop_back();
1943                    let _ = other_input_exprs.remove(field_column_pos + 2);
1944                    let _ = other_input_exprs.remove(field_column_pos + 1);
1945                    let _ = other_input_exprs.remove(field_column_pos);
1946                }
1947                ScalarFunc::GeneratedExpr => {}
1948            }
1949        }
1950
1951        // Update value columns' name, and alias them to remove qualifiers
1952        // For label functions such as `label_join`, `label_replace`, etc.,
1953        // we keep the fields unchanged.
1954        if !matches!(func.name, "label_join" | "label_replace") {
1955            let mut new_field_columns = Vec::with_capacity(exprs.len());
1956
1957            exprs = exprs
1958                .into_iter()
1959                .map(|expr| {
1960                    let display_name = expr.schema_name().to_string();
1961                    new_field_columns.push(display_name.clone());
1962                    Ok(expr.alias(display_name))
1963                })
1964                .collect::<std::result::Result<Vec<_>, _>>()
1965                .context(DataFusionPlanningSnafu)?;
1966
1967            self.ctx.field_columns = new_field_columns;
1968        }
1969
1970        Ok((exprs, new_tags))
1971    }
1972
1973    /// Validate label name according to Prometheus specification.
1974    /// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
1975    /// Additionally, label names starting with double underscores are reserved for internal use.
1976    fn validate_label_name(label_name: &str) -> Result<()> {
1977        // Check if label name starts with double underscores (reserved)
1978        if label_name.starts_with("__") {
1979            return InvalidDestinationLabelNameSnafu { label_name }.fail();
1980        }
1981        // Check if label name matches the required pattern
1982        if !LABEL_NAME_REGEX.is_match(label_name) {
1983            return InvalidDestinationLabelNameSnafu { label_name }.fail();
1984        }
1985
1986        Ok(())
1987    }
1988
1989    /// Build expr for `label_replace` function
1990    fn build_regexp_replace_label_expr(
1991        &self,
1992        other_input_exprs: &mut VecDeque<DfExpr>,
1993        query_engine_state: &QueryEngineState,
1994    ) -> Result<Option<(DfExpr, String)>> {
1995        // label_replace(vector, dst_label, replacement, src_label, regex)
1996        let dst_label = match other_input_exprs.pop_front() {
1997            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
1998            other => UnexpectedPlanExprSnafu {
1999                desc: format!("expected dst_label string literal, but found {:?}", other),
2000            }
2001            .fail()?,
2002        };
2003
2004        // Validate the destination label name
2005        Self::validate_label_name(&dst_label)?;
2006        let replacement = match other_input_exprs.pop_front() {
2007            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
2008            other => UnexpectedPlanExprSnafu {
2009                desc: format!("expected replacement string literal, but found {:?}", other),
2010            }
2011            .fail()?,
2012        };
2013        let src_label = match other_input_exprs.pop_front() {
2014            Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2015            other => UnexpectedPlanExprSnafu {
2016                desc: format!("expected src_label string literal, but found {:?}", other),
2017            }
2018            .fail()?,
2019        };
2020
2021        let regex = match other_input_exprs.pop_front() {
2022            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2023            other => UnexpectedPlanExprSnafu {
2024                desc: format!("expected regex string literal, but found {:?}", other),
2025            }
2026            .fail()?,
2027        };
2028
2029        // Validate the regex before using it
2030        // doc: https://prometheus.io/docs/prometheus/latest/querying/functions/#label_replace
2031        regex::Regex::new(&regex).map_err(|_| {
2032            InvalidRegularExpressionSnafu {
2033                regex: regex.clone(),
2034            }
2035            .build()
2036        })?;
2037
2038        // If the src_label exists and regex is empty, keep everything unchanged.
2039        if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2040            return Ok(None);
2041        }
2042
2043        // If the src_label doesn't exists, and
2044        if !self.ctx.tag_columns.contains(&src_label) {
2045            if replacement.is_empty() {
2046                // the replacement is empty, keep everything unchanged.
2047                return Ok(None);
2048            } else {
2049                // the replacement is not empty, always adds dst_label with replacement value.
2050                return Ok(Some((
2051                    // alias literal `replacement` as dst_label
2052                    lit(replacement).alias(&dst_label),
2053                    dst_label,
2054                )));
2055            }
2056        }
2057
2058        // Preprocess the regex:
2059        // https://github.com/prometheus/prometheus/blob/d902abc50d6652ba8fe9a81ff8e5cce936114eba/promql/functions.go#L1575C32-L1575C37
2060        let regex = format!("^(?s:{regex})$");
2061
2062        let session_state = query_engine_state.session_state();
2063        let func = session_state
2064            .scalar_functions()
2065            .get("regexp_replace")
2066            .context(UnsupportedExprSnafu {
2067                name: "regexp_replace",
2068            })?;
2069
2070        // regexp_replace(src_label, regex, replacement)
2071        let args = vec![
2072            if src_label.is_empty() {
2073                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2074            } else {
2075                DfExpr::Column(Column::from_name(src_label))
2076            },
2077            DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2078            DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2079        ];
2080
2081        Ok(Some((
2082            DfExpr::ScalarFunction(ScalarFunction {
2083                func: func.clone(),
2084                args,
2085            })
2086            .alias(&dst_label),
2087            dst_label,
2088        )))
2089    }
2090
2091    /// Build expr for `label_join` function
2092    fn build_concat_labels_expr(
2093        other_input_exprs: &mut VecDeque<DfExpr>,
2094        ctx: &PromPlannerContext,
2095        query_engine_state: &QueryEngineState,
2096    ) -> Result<(DfExpr, String)> {
2097        // label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)
2098
2099        let dst_label = match other_input_exprs.pop_front() {
2100            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2101            other => UnexpectedPlanExprSnafu {
2102                desc: format!("expected dst_label string literal, but found {:?}", other),
2103            }
2104            .fail()?,
2105        };
2106        let separator = match other_input_exprs.pop_front() {
2107            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2108            other => UnexpectedPlanExprSnafu {
2109                desc: format!("expected separator string literal, but found {:?}", other),
2110            }
2111            .fail()?,
2112        };
2113
2114        // Create a set of available columns (tag columns + field columns + time index column)
2115        let available_columns: HashSet<&str> = ctx
2116            .tag_columns
2117            .iter()
2118            .chain(ctx.field_columns.iter())
2119            .chain(ctx.time_index_column.as_ref())
2120            .map(|s| s.as_str())
2121            .collect();
2122
2123        let src_labels = other_input_exprs
2124            .iter()
2125            .map(|expr| {
2126                // Cast source label into column or null literal
2127                match expr {
2128                    DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2129                        if label.is_empty() {
2130                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2131                        } else if available_columns.contains(label.as_str()) {
2132                            // Label exists in the table schema
2133                            Ok(DfExpr::Column(Column::from_name(label)))
2134                        } else {
2135                            // Label doesn't exist, treat as empty string (null)
2136                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2137                        }
2138                    }
2139                    other => UnexpectedPlanExprSnafu {
2140                        desc: format!(
2141                            "expected source label string literal, but found {:?}",
2142                            other
2143                        ),
2144                    }
2145                    .fail(),
2146                }
2147            })
2148            .collect::<Result<Vec<_>>>()?;
2149        ensure!(
2150            !src_labels.is_empty(),
2151            FunctionInvalidArgumentSnafu {
2152                fn_name: "label_join"
2153            }
2154        );
2155
2156        let session_state = query_engine_state.session_state();
2157        let func = session_state
2158            .scalar_functions()
2159            .get("concat_ws")
2160            .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2161
2162        // concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
2163        let mut args = Vec::with_capacity(1 + src_labels.len());
2164        args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2165        args.extend(src_labels);
2166
2167        Ok((
2168            DfExpr::ScalarFunction(ScalarFunction {
2169                func: func.clone(),
2170                args,
2171            })
2172            .alias(&dst_label),
2173            dst_label,
2174        ))
2175    }
2176
2177    fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2178        Ok(DfExpr::Column(Column::from_name(
2179            self.ctx
2180                .time_index_column
2181                .clone()
2182                .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2183        )))
2184    }
2185
2186    fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2187        let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2188        for tag in &self.ctx.tag_columns {
2189            let expr = DfExpr::Column(Column::from_name(tag));
2190            result.push(expr);
2191        }
2192        Ok(result)
2193    }
2194
2195    fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2196        let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2197        for field in &self.ctx.field_columns {
2198            let expr = DfExpr::Column(Column::from_name(field));
2199            result.push(expr);
2200        }
2201        Ok(result)
2202    }
2203
2204    fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2205        let mut result = self
2206            .ctx
2207            .tag_columns
2208            .iter()
2209            .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2210            .collect::<Vec<_>>();
2211        result.push(self.create_time_index_column_expr()?.sort(true, true));
2212        Ok(result)
2213    }
2214
2215    fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2216        self.ctx
2217            .field_columns
2218            .iter()
2219            .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2220            .collect::<Vec<_>>()
2221    }
2222
2223    fn create_sort_exprs_by_tags(
2224        func: &str,
2225        tags: Vec<DfExpr>,
2226        asc: bool,
2227    ) -> Result<Vec<SortExpr>> {
2228        ensure!(
2229            !tags.is_empty(),
2230            FunctionInvalidArgumentSnafu { fn_name: func }
2231        );
2232
2233        tags.iter()
2234            .map(|col| match col {
2235                DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2236                    Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2237                }
2238                other => UnexpectedPlanExprSnafu {
2239                    desc: format!("expected label string literal, but found {:?}", other),
2240                }
2241                .fail(),
2242            })
2243            .collect::<Result<Vec<_>>>()
2244    }
2245
2246    fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2247        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2248        for value in &self.ctx.field_columns {
2249            let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2250            exprs.push(expr);
2251        }
2252
2253        conjunction(exprs).context(ValueNotFoundSnafu {
2254            table: self.table_ref()?.to_quoted_string(),
2255        })
2256    }
2257
2258    /// Creates a set of DataFusion `DfExpr::AggregateFunction` expressions for each value column using the specified aggregate function.
2259    ///
2260    /// # Side Effects
2261    ///
2262    /// This method modifies the value columns in the context by replacing them with the new columns
2263    /// created by the aggregate function application.
2264    ///
2265    /// # Returns
2266    ///
2267    /// Returns a tuple of `(aggregate_expressions, previous_field_expressions)` where:
2268    /// - `aggregate_expressions`: Expressions that apply the aggregate function to the original fields
2269    /// - `previous_field_expressions`: Original field expressions before aggregation. This is non-empty
2270    ///   only when the operation is `count_values`, as this operation requires preserving the original
2271    ///   values for grouping.
2272    ///
2273    fn create_aggregate_exprs(
2274        &mut self,
2275        op: TokenType,
2276        param: &Option<Box<PromExpr>>,
2277        input_plan: &LogicalPlan,
2278    ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2279        let mut non_col_args = Vec::new();
2280        let aggr = match op.id() {
2281            token::T_SUM => sum_udaf(),
2282            token::T_QUANTILE => {
2283                let q =
2284                    Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2285                non_col_args.push(q);
2286                quantile_udaf()
2287            }
2288            token::T_AVG => avg_udaf(),
2289            token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2290            token::T_MIN => min_udaf(),
2291            token::T_MAX => max_udaf(),
2292            token::T_GROUP => grouping_udaf(),
2293            token::T_STDDEV => stddev_pop_udaf(),
2294            token::T_STDVAR => var_pop_udaf(),
2295            token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2296                name: format!("{op:?}"),
2297            }
2298            .fail()?,
2299            _ => UnexpectedTokenSnafu { token: op }.fail()?,
2300        };
2301
2302        // perform aggregate operation to each value column
2303        let exprs: Vec<DfExpr> = self
2304            .ctx
2305            .field_columns
2306            .iter()
2307            .map(|col| {
2308                non_col_args.push(DfExpr::Column(Column::from_name(col)));
2309                let expr = aggr.call(non_col_args.clone());
2310                non_col_args.pop();
2311                expr
2312            })
2313            .collect::<Vec<_>>();
2314
2315        // if the aggregator is `count_values`, it must be grouped by current fields.
2316        let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2317            let prev_field_exprs: Vec<_> = self
2318                .ctx
2319                .field_columns
2320                .iter()
2321                .map(|col| DfExpr::Column(Column::from_name(col)))
2322                .collect();
2323
2324            ensure!(
2325                self.ctx.field_columns.len() == 1,
2326                UnsupportedExprSnafu {
2327                    name: "count_values on multi-value input"
2328                }
2329            );
2330
2331            prev_field_exprs
2332        } else {
2333            vec![]
2334        };
2335
2336        // update value column name according to the aggregators,
2337        let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2338
2339        let normalized_exprs =
2340            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2341        for expr in normalized_exprs {
2342            new_field_columns.push(expr.schema_name().to_string());
2343        }
2344        self.ctx.field_columns = new_field_columns;
2345
2346        Ok((exprs, prev_field_exprs))
2347    }
2348
2349    fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2350        let param = param
2351            .as_deref()
2352            .with_context(|| FunctionInvalidArgumentSnafu {
2353                fn_name: op.to_string(),
2354            })?;
2355        let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2356            return FunctionInvalidArgumentSnafu {
2357                fn_name: op.to_string(),
2358            }
2359            .fail();
2360        };
2361
2362        Ok(val)
2363    }
2364
2365    fn get_param_as_literal_expr(
2366        param: &Option<Box<PromExpr>>,
2367        op: Option<TokenType>,
2368        expected_type: Option<ArrowDataType>,
2369    ) -> Result<DfExpr> {
2370        let prom_param = param.as_deref().with_context(|| {
2371            if let Some(op) = op {
2372                FunctionInvalidArgumentSnafu {
2373                    fn_name: op.to_string(),
2374                }
2375            } else {
2376                FunctionInvalidArgumentSnafu {
2377                    fn_name: "unknown".to_string(),
2378                }
2379            }
2380        })?;
2381
2382        let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2383            if let Some(op) = op {
2384                FunctionInvalidArgumentSnafu {
2385                    fn_name: op.to_string(),
2386                }
2387            } else {
2388                FunctionInvalidArgumentSnafu {
2389                    fn_name: "unknown".to_string(),
2390                }
2391            }
2392        })?;
2393
2394        // check if the type is expected
2395        if let Some(expected_type) = expected_type {
2396            // literal should not have reference to column
2397            let expr_type = expr
2398                .get_type(&DFSchema::empty())
2399                .context(DataFusionPlanningSnafu)?;
2400            if expected_type != expr_type {
2401                return FunctionInvalidArgumentSnafu {
2402                    fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2403                }
2404                .fail();
2405            }
2406        }
2407
2408        Ok(expr)
2409    }
2410
2411    /// Create [DfExpr::WindowFunction] expr for each value column with given window function.
2412    ///
2413    fn create_window_exprs(
2414        &mut self,
2415        op: TokenType,
2416        group_exprs: Vec<DfExpr>,
2417        input_plan: &LogicalPlan,
2418    ) -> Result<Vec<DfExpr>> {
2419        ensure!(
2420            self.ctx.field_columns.len() == 1,
2421            UnsupportedExprSnafu {
2422                name: "topk or bottomk on multi-value input"
2423            }
2424        );
2425
2426        assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2427
2428        let asc = matches!(op.id(), token::T_BOTTOMK);
2429
2430        let tag_sort_exprs = self
2431            .create_tag_column_exprs()?
2432            .into_iter()
2433            .map(|expr| expr.sort(asc, true));
2434
2435        // perform window operation to each value column
2436        let exprs: Vec<DfExpr> = self
2437            .ctx
2438            .field_columns
2439            .iter()
2440            .map(|col| {
2441                let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2442                // Order by value in the specific order
2443                sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2444                // Then tags if the values are equal,
2445                // Try to ensure the relative stability of the output results.
2446                sort_exprs.extend(tag_sort_exprs.clone());
2447
2448                DfExpr::WindowFunction(Box::new(WindowFunction {
2449                    fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2450                    params: WindowFunctionParams {
2451                        args: vec![],
2452                        partition_by: group_exprs.clone(),
2453                        order_by: sort_exprs,
2454                        window_frame: WindowFrame::new(Some(true)),
2455                        null_treatment: None,
2456                        distinct: false,
2457                        filter: None,
2458                    },
2459                }))
2460            })
2461            .collect();
2462
2463        let normalized_exprs =
2464            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2465        Ok(normalized_exprs)
2466    }
2467
2468    /// Try to build a [f64] from [PromExpr].
2469    #[deprecated(
2470        note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2471    )]
2472    fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2473        match expr {
2474            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2475            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2476            PromExpr::Unary(UnaryExpr { expr, .. }) => {
2477                Self::try_build_float_literal(expr).map(|f| -f)
2478            }
2479            PromExpr::StringLiteral(_)
2480            | PromExpr::Binary(_)
2481            | PromExpr::VectorSelector(_)
2482            | PromExpr::MatrixSelector(_)
2483            | PromExpr::Call(_)
2484            | PromExpr::Extension(_)
2485            | PromExpr::Aggregate(_)
2486            | PromExpr::Subquery(_) => None,
2487        }
2488    }
2489
2490    /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
2491    async fn create_histogram_plan(
2492        &mut self,
2493        args: &PromFunctionArgs,
2494        query_engine_state: &QueryEngineState,
2495    ) -> Result<LogicalPlan> {
2496        if args.args.len() != 2 {
2497            return FunctionInvalidArgumentSnafu {
2498                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2499            }
2500            .fail();
2501        }
2502        #[allow(deprecated)]
2503        let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2504            FunctionInvalidArgumentSnafu {
2505                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2506            }
2507        })?;
2508
2509        let input = args.args[1].as_ref().clone();
2510        let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
2511
2512        if !self.ctx.has_le_tag() {
2513            // Return empty result instead of error when 'le' column is not found
2514            // This handles the case when histogram metrics don't exist
2515            return Ok(LogicalPlan::EmptyRelation(
2516                datafusion::logical_expr::EmptyRelation {
2517                    produce_one_row: false,
2518                    schema: Arc::new(DFSchema::empty()),
2519                },
2520            ));
2521        }
2522        let time_index_column =
2523            self.ctx
2524                .time_index_column
2525                .clone()
2526                .with_context(|| TimeIndexNotFoundSnafu {
2527                    table: self.ctx.table_name.clone().unwrap_or_default(),
2528                })?;
2529        // FIXME(ruihang): support multi fields
2530        let field_column = self
2531            .ctx
2532            .field_columns
2533            .first()
2534            .with_context(|| FunctionInvalidArgumentSnafu {
2535                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2536            })?
2537            .clone();
2538        // remove le column from tag columns
2539        self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2540
2541        Ok(LogicalPlan::Extension(Extension {
2542            node: Arc::new(
2543                HistogramFold::new(
2544                    LE_COLUMN_NAME.to_string(),
2545                    field_column,
2546                    time_index_column,
2547                    phi,
2548                    input_plan,
2549                )
2550                .context(DataFusionPlanningSnafu)?,
2551            ),
2552        }))
2553    }
2554
2555    /// Create a [SPECIAL_VECTOR_FUNCTION] plan
2556    async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2557        if args.args.len() != 1 {
2558            return FunctionInvalidArgumentSnafu {
2559                fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2560            }
2561            .fail();
2562        }
2563        let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
2564
2565        // reuse `SPECIAL_TIME_FUNCTION` as name of time index column
2566        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2567        self.ctx.reset_table_name_and_schema();
2568        self.ctx.tag_columns = vec![];
2569        self.ctx.field_columns = vec![greptime_value().to_string()];
2570        Ok(LogicalPlan::Extension(Extension {
2571            node: Arc::new(
2572                EmptyMetric::new(
2573                    self.ctx.start,
2574                    self.ctx.end,
2575                    self.ctx.interval,
2576                    SPECIAL_TIME_FUNCTION.to_string(),
2577                    greptime_value().to_string(),
2578                    Some(lit),
2579                )
2580                .context(DataFusionPlanningSnafu)?,
2581            ),
2582        }))
2583    }
2584
2585    /// Create a [SCALAR_FUNCTION] plan
2586    async fn create_scalar_plan(
2587        &mut self,
2588        args: &PromFunctionArgs,
2589        query_engine_state: &QueryEngineState,
2590    ) -> Result<LogicalPlan> {
2591        ensure!(
2592            args.len() == 1,
2593            FunctionInvalidArgumentSnafu {
2594                fn_name: SCALAR_FUNCTION
2595            }
2596        );
2597        let input = self
2598            .prom_expr_to_plan(&args.args[0], query_engine_state)
2599            .await?;
2600        ensure!(
2601            self.ctx.field_columns.len() == 1,
2602            MultiFieldsNotSupportedSnafu {
2603                operator: SCALAR_FUNCTION
2604            },
2605        );
2606        let scalar_plan = LogicalPlan::Extension(Extension {
2607            node: Arc::new(
2608                ScalarCalculate::new(
2609                    self.ctx.start,
2610                    self.ctx.end,
2611                    self.ctx.interval,
2612                    input,
2613                    self.ctx.time_index_column.as_ref().unwrap(),
2614                    &self.ctx.tag_columns,
2615                    &self.ctx.field_columns[0],
2616                    self.ctx.table_name.as_deref(),
2617                )
2618                .context(PromqlPlanNodeSnafu)?,
2619            ),
2620        });
2621        // scalar plan have no tag columns
2622        self.ctx.tag_columns.clear();
2623        self.ctx.field_columns.clear();
2624        self.ctx
2625            .field_columns
2626            .push(scalar_plan.schema().field(1).name().clone());
2627        Ok(scalar_plan)
2628    }
2629
2630    /// Create a [SPECIAL_ABSENT_FUNCTION] plan
2631    async fn create_absent_plan(
2632        &mut self,
2633        args: &PromFunctionArgs,
2634        query_engine_state: &QueryEngineState,
2635    ) -> Result<LogicalPlan> {
2636        if args.args.len() != 1 {
2637            return FunctionInvalidArgumentSnafu {
2638                fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
2639            }
2640            .fail();
2641        }
2642        let input = self
2643            .prom_expr_to_plan(&args.args[0], query_engine_state)
2644            .await?;
2645
2646        let time_index_expr = self.create_time_index_column_expr()?;
2647        let first_field_expr =
2648            self.create_field_column_exprs()?
2649                .pop()
2650                .with_context(|| ValueNotFoundSnafu {
2651                    table: self.ctx.table_name.clone().unwrap_or_default(),
2652                })?;
2653        let first_value_expr = first_value(first_field_expr, vec![]);
2654
2655        let ordered_aggregated_input = LogicalPlanBuilder::from(input)
2656            .aggregate(
2657                vec![time_index_expr.clone()],
2658                vec![first_value_expr.clone()],
2659            )
2660            .context(DataFusionPlanningSnafu)?
2661            .sort(vec![time_index_expr.sort(true, false)])
2662            .context(DataFusionPlanningSnafu)?
2663            .build()
2664            .context(DataFusionPlanningSnafu)?;
2665
2666        let fake_labels = self
2667            .ctx
2668            .selector_matcher
2669            .iter()
2670            .filter_map(|matcher| match matcher.op {
2671                MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
2672                _ => None,
2673            })
2674            .collect::<Vec<_>>();
2675
2676        // Create the absent plan
2677        let absent_plan = LogicalPlan::Extension(Extension {
2678            node: Arc::new(
2679                Absent::try_new(
2680                    self.ctx.start,
2681                    self.ctx.end,
2682                    self.ctx.interval,
2683                    self.ctx.time_index_column.as_ref().unwrap().clone(),
2684                    self.ctx.field_columns[0].clone(),
2685                    fake_labels,
2686                    ordered_aggregated_input,
2687                )
2688                .context(DataFusionPlanningSnafu)?,
2689            ),
2690        });
2691
2692        Ok(absent_plan)
2693    }
2694
2695    /// Try to build a DataFusion Literal Expression from PromQL Expr, return
2696    /// `None` if the input is not a literal expression.
2697    fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2698        match expr {
2699            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
2700            PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
2701            PromExpr::VectorSelector(_)
2702            | PromExpr::MatrixSelector(_)
2703            | PromExpr::Extension(_)
2704            | PromExpr::Aggregate(_)
2705            | PromExpr::Subquery(_) => None,
2706            PromExpr::Call(Call { func, .. }) => {
2707                if func.name == SPECIAL_TIME_FUNCTION {
2708                    // For time() function, don't treat it as a literal
2709                    // Let it be handled as a regular function call
2710                    None
2711                } else {
2712                    None
2713                }
2714            }
2715            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2716            // TODO(ruihang): support Unary operator
2717            PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2718            PromExpr::Binary(PromBinaryExpr {
2719                lhs,
2720                rhs,
2721                op,
2722                modifier,
2723            }) => {
2724                let lhs = Self::try_build_literal_expr(lhs)?;
2725                let rhs = Self::try_build_literal_expr(rhs)?;
2726                let is_comparison_op = Self::is_token_a_comparison_op(*op);
2727                let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2728                let expr = expr_builder(lhs, rhs).ok()?;
2729
2730                let should_return_bool = if let Some(m) = modifier {
2731                    m.return_bool
2732                } else {
2733                    false
2734                };
2735                if is_comparison_op && should_return_bool {
2736                    Some(DfExpr::Cast(Cast {
2737                        expr: Box::new(expr),
2738                        data_type: ArrowDataType::Float64,
2739                    }))
2740                } else {
2741                    Some(expr)
2742                }
2743            }
2744        }
2745    }
2746
2747    fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
2748        match expr {
2749            PromExpr::Call(Call { func, .. }) => {
2750                if func.name == SPECIAL_TIME_FUNCTION
2751                    && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
2752                {
2753                    Some(build_special_time_expr(time_index_col))
2754                } else {
2755                    None
2756                }
2757            }
2758            _ => None,
2759        }
2760    }
2761
2762    /// Return a lambda to build binary expression from token.
2763    /// Because some binary operator are function in DataFusion like `atan2` or `^`.
2764    #[allow(clippy::type_complexity)]
2765    fn prom_token_to_binary_expr_builder(
2766        token: TokenType,
2767    ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2768        match token.id() {
2769            token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2770            token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2771            token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2772            token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2773            token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2774            token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2775            token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2776            token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2777            token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2778            token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2779            token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2780            token::T_POW => Ok(Box::new(|lhs, rhs| {
2781                Ok(DfExpr::ScalarFunction(ScalarFunction {
2782                    func: datafusion_functions::math::power(),
2783                    args: vec![lhs, rhs],
2784                }))
2785            })),
2786            token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2787                Ok(DfExpr::ScalarFunction(ScalarFunction {
2788                    func: datafusion_functions::math::atan2(),
2789                    args: vec![lhs, rhs],
2790                }))
2791            })),
2792            _ => UnexpectedTokenSnafu { token }.fail(),
2793        }
2794    }
2795
2796    /// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
2797    fn is_token_a_comparison_op(token: TokenType) -> bool {
2798        matches!(
2799            token.id(),
2800            token::T_EQLC
2801                | token::T_NEQ
2802                | token::T_GTR
2803                | token::T_LSS
2804                | token::T_GTE
2805                | token::T_LTE
2806        )
2807    }
2808
2809    /// Check if the given op is a set operator (UNION, INTERSECT and EXCEPT in SQL).
2810    fn is_token_a_set_op(token: TokenType) -> bool {
2811        matches!(
2812            token.id(),
2813            token::T_LAND // INTERSECT
2814                | token::T_LOR // UNION
2815                | token::T_LUNLESS // EXCEPT
2816        )
2817    }
2818
2819    /// Build a inner join on time index column and tag columns to concat two logical plans.
2820    /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
2821    #[allow(clippy::too_many_arguments)]
2822    fn join_on_non_field_columns(
2823        &self,
2824        left: LogicalPlan,
2825        right: LogicalPlan,
2826        left_table_ref: TableReference,
2827        right_table_ref: TableReference,
2828        left_time_index_column: Option<String>,
2829        right_time_index_column: Option<String>,
2830        only_join_time_index: bool,
2831        modifier: &Option<BinModifier>,
2832    ) -> Result<LogicalPlan> {
2833        let mut left_tag_columns = if only_join_time_index {
2834            BTreeSet::new()
2835        } else {
2836            self.ctx
2837                .tag_columns
2838                .iter()
2839                .cloned()
2840                .collect::<BTreeSet<_>>()
2841        };
2842        let mut right_tag_columns = left_tag_columns.clone();
2843
2844        // apply modifier
2845        if let Some(modifier) = modifier {
2846            // apply label modifier
2847            if let Some(matching) = &modifier.matching {
2848                match matching {
2849                    // keeps columns mentioned in `on`
2850                    LabelModifier::Include(on) => {
2851                        let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2852                        left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2853                        right_tag_columns =
2854                            right_tag_columns.intersection(&mask).cloned().collect();
2855                    }
2856                    // removes columns memtioned in `ignoring`
2857                    LabelModifier::Exclude(ignoring) => {
2858                        // doesn't check existence of label
2859                        for label in &ignoring.labels {
2860                            let _ = left_tag_columns.remove(label);
2861                            let _ = right_tag_columns.remove(label);
2862                        }
2863                    }
2864                }
2865            }
2866        }
2867
2868        // push time index column if it exists
2869        if let (Some(left_time_index_column), Some(right_time_index_column)) =
2870            (left_time_index_column, right_time_index_column)
2871        {
2872            left_tag_columns.insert(left_time_index_column);
2873            right_tag_columns.insert(right_time_index_column);
2874        }
2875
2876        let right = LogicalPlanBuilder::from(right)
2877            .alias(right_table_ref)
2878            .context(DataFusionPlanningSnafu)?
2879            .build()
2880            .context(DataFusionPlanningSnafu)?;
2881
2882        // Inner Join on time index column to concat two operator
2883        LogicalPlanBuilder::from(left)
2884            .alias(left_table_ref)
2885            .context(DataFusionPlanningSnafu)?
2886            .join_detailed(
2887                right,
2888                JoinType::Inner,
2889                (
2890                    left_tag_columns
2891                        .into_iter()
2892                        .map(Column::from_name)
2893                        .collect::<Vec<_>>(),
2894                    right_tag_columns
2895                        .into_iter()
2896                        .map(Column::from_name)
2897                        .collect::<Vec<_>>(),
2898                ),
2899                None,
2900                NullEquality::NullEqualsNull,
2901            )
2902            .context(DataFusionPlanningSnafu)?
2903            .build()
2904            .context(DataFusionPlanningSnafu)
2905    }
2906
2907    /// Build a set operator (AND/OR/UNLESS)
2908    fn set_op_on_non_field_columns(
2909        &mut self,
2910        left: LogicalPlan,
2911        mut right: LogicalPlan,
2912        left_context: PromPlannerContext,
2913        right_context: PromPlannerContext,
2914        op: TokenType,
2915        modifier: &Option<BinModifier>,
2916    ) -> Result<LogicalPlan> {
2917        let mut left_tag_col_set = left_context
2918            .tag_columns
2919            .iter()
2920            .cloned()
2921            .collect::<HashSet<_>>();
2922        let mut right_tag_col_set = right_context
2923            .tag_columns
2924            .iter()
2925            .cloned()
2926            .collect::<HashSet<_>>();
2927
2928        if matches!(op.id(), token::T_LOR) {
2929            return self.or_operator(
2930                left,
2931                right,
2932                left_tag_col_set,
2933                right_tag_col_set,
2934                left_context,
2935                right_context,
2936                modifier,
2937            );
2938        }
2939
2940        // apply modifier
2941        if let Some(modifier) = modifier {
2942            // one-to-many and many-to-one are not supported
2943            ensure!(
2944                matches!(
2945                    modifier.card,
2946                    VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2947                ),
2948                UnsupportedVectorMatchSnafu {
2949                    name: modifier.card.clone(),
2950                },
2951            );
2952            // apply label modifier
2953            if let Some(matching) = &modifier.matching {
2954                match matching {
2955                    // keeps columns mentioned in `on`
2956                    LabelModifier::Include(on) => {
2957                        let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2958                        left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2959                        right_tag_col_set =
2960                            right_tag_col_set.intersection(&mask).cloned().collect();
2961                    }
2962                    // removes columns memtioned in `ignoring`
2963                    LabelModifier::Exclude(ignoring) => {
2964                        // doesn't check existence of label
2965                        for label in &ignoring.labels {
2966                            let _ = left_tag_col_set.remove(label);
2967                            let _ = right_tag_col_set.remove(label);
2968                        }
2969                    }
2970                }
2971            }
2972        }
2973        // ensure two sides have the same tag columns
2974        if !matches!(op.id(), token::T_LOR) {
2975            ensure!(
2976                left_tag_col_set == right_tag_col_set,
2977                CombineTableColumnMismatchSnafu {
2978                    left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2979                    right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2980                }
2981            )
2982        };
2983        let left_time_index = left_context.time_index_column.clone().unwrap();
2984        let right_time_index = right_context.time_index_column.clone().unwrap();
2985        let join_keys = left_tag_col_set
2986            .iter()
2987            .cloned()
2988            .chain([left_time_index.clone()])
2989            .collect::<Vec<_>>();
2990        self.ctx.time_index_column = Some(left_time_index.clone());
2991
2992        // alias right time index column if necessary
2993        if left_context.time_index_column != right_context.time_index_column {
2994            let right_project_exprs = right
2995                .schema()
2996                .fields()
2997                .iter()
2998                .map(|field| {
2999                    if field.name() == &right_time_index {
3000                        DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
3001                    } else {
3002                        DfExpr::Column(Column::from_name(field.name()))
3003                    }
3004                })
3005                .collect::<Vec<_>>();
3006
3007            right = LogicalPlanBuilder::from(right)
3008                .project(right_project_exprs)
3009                .context(DataFusionPlanningSnafu)?
3010                .build()
3011                .context(DataFusionPlanningSnafu)?;
3012        }
3013
3014        ensure!(
3015            left_context.field_columns.len() == 1,
3016            MultiFieldsNotSupportedSnafu {
3017                operator: "AND operator"
3018            }
3019        );
3020        // Update the field column in context.
3021        // The AND/UNLESS operator only keep the field column in left input.
3022        let left_field_col = left_context.field_columns.first().unwrap();
3023        self.ctx.field_columns = vec![left_field_col.clone()];
3024
3025        // Generate join plan.
3026        // All set operations in PromQL are "distinct"
3027        match op.id() {
3028            token::T_LAND => LogicalPlanBuilder::from(left)
3029                .distinct()
3030                .context(DataFusionPlanningSnafu)?
3031                .join_detailed(
3032                    right,
3033                    JoinType::LeftSemi,
3034                    (join_keys.clone(), join_keys),
3035                    None,
3036                    NullEquality::NullEqualsNull,
3037                )
3038                .context(DataFusionPlanningSnafu)?
3039                .build()
3040                .context(DataFusionPlanningSnafu),
3041            token::T_LUNLESS => LogicalPlanBuilder::from(left)
3042                .distinct()
3043                .context(DataFusionPlanningSnafu)?
3044                .join_detailed(
3045                    right,
3046                    JoinType::LeftAnti,
3047                    (join_keys.clone(), join_keys),
3048                    None,
3049                    NullEquality::NullEqualsNull,
3050                )
3051                .context(DataFusionPlanningSnafu)?
3052                .build()
3053                .context(DataFusionPlanningSnafu),
3054            token::T_LOR => {
3055                // OR is handled at the beginning of this function, as it cannot
3056                // be expressed using JOIN like AND and UNLESS.
3057                unreachable!()
3058            }
3059            _ => UnexpectedTokenSnafu { token: op }.fail(),
3060        }
3061    }
3062
3063    // TODO(ruihang): change function name
3064    #[allow(clippy::too_many_arguments)]
3065    fn or_operator(
3066        &mut self,
3067        left: LogicalPlan,
3068        right: LogicalPlan,
3069        left_tag_cols_set: HashSet<String>,
3070        right_tag_cols_set: HashSet<String>,
3071        left_context: PromPlannerContext,
3072        right_context: PromPlannerContext,
3073        modifier: &Option<BinModifier>,
3074    ) -> Result<LogicalPlan> {
3075        // checks
3076        ensure!(
3077            left_context.field_columns.len() == right_context.field_columns.len(),
3078            CombineTableColumnMismatchSnafu {
3079                left: left_context.field_columns.clone(),
3080                right: right_context.field_columns.clone()
3081            }
3082        );
3083        ensure!(
3084            left_context.field_columns.len() == 1,
3085            MultiFieldsNotSupportedSnafu {
3086                operator: "OR operator"
3087            }
3088        );
3089
3090        // prepare hash sets
3091        let all_tags = left_tag_cols_set
3092            .union(&right_tag_cols_set)
3093            .cloned()
3094            .collect::<HashSet<_>>();
3095        let tags_not_in_left = all_tags
3096            .difference(&left_tag_cols_set)
3097            .cloned()
3098            .collect::<Vec<_>>();
3099        let tags_not_in_right = all_tags
3100            .difference(&right_tag_cols_set)
3101            .cloned()
3102            .collect::<Vec<_>>();
3103        let left_qualifier = left.schema().qualified_field(0).0.cloned();
3104        let right_qualifier = right.schema().qualified_field(0).0.cloned();
3105        let left_qualifier_string = left_qualifier
3106            .as_ref()
3107            .map(|l| l.to_string())
3108            .unwrap_or_default();
3109        let right_qualifier_string = right_qualifier
3110            .as_ref()
3111            .map(|r| r.to_string())
3112            .unwrap_or_default();
3113        let left_time_index_column =
3114            left_context
3115                .time_index_column
3116                .clone()
3117                .with_context(|| TimeIndexNotFoundSnafu {
3118                    table: left_qualifier_string.clone(),
3119                })?;
3120        let right_time_index_column =
3121            right_context
3122                .time_index_column
3123                .clone()
3124                .with_context(|| TimeIndexNotFoundSnafu {
3125                    table: right_qualifier_string.clone(),
3126                })?;
3127        // Take the name of first field column. The length is checked above.
3128        let left_field_col = left_context.field_columns.first().unwrap();
3129        let right_field_col = right_context.field_columns.first().unwrap();
3130
3131        // step 0: fill all columns in output schema
3132        let mut all_columns_set = left
3133            .schema()
3134            .fields()
3135            .iter()
3136            .chain(right.schema().fields().iter())
3137            .map(|field| field.name().clone())
3138            .collect::<HashSet<_>>();
3139        // remove time index column
3140        all_columns_set.remove(&left_time_index_column);
3141        all_columns_set.remove(&right_time_index_column);
3142        // remove field column in the right
3143        if left_field_col != right_field_col {
3144            all_columns_set.remove(right_field_col);
3145        }
3146        let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3147        // sort to ensure the generated schema is not volatile
3148        all_columns.sort_unstable();
3149        // use left time index column name as the result time index column name
3150        all_columns.insert(0, left_time_index_column.clone());
3151
3152        // step 1: align schema using project, fill non-exist columns with null
3153        let left_proj_exprs = all_columns.iter().map(|col| {
3154            if tags_not_in_left.contains(col) {
3155                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3156            } else {
3157                DfExpr::Column(Column::new(None::<String>, col))
3158            }
3159        });
3160        let right_time_index_expr = DfExpr::Column(Column::new(
3161            right_qualifier.clone(),
3162            right_time_index_column,
3163        ))
3164        .alias(left_time_index_column.clone());
3165        // The field column in right side may not have qualifier (it may be removed by join operation),
3166        // so we need to find it from the schema.
3167        let right_qualifier_for_field = right
3168            .schema()
3169            .iter()
3170            .find(|(_, f)| f.name() == right_field_col)
3171            .map(|(q, _)| q)
3172            .with_context(|| ColumnNotFoundSnafu {
3173                col: right_field_col.clone(),
3174            })?
3175            .cloned();
3176
3177        // `skip(1)` to skip the time index column
3178        let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3179            // expr
3180            if col == left_field_col && left_field_col != right_field_col {
3181                // qualify field in right side if necessary to handle different field name
3182                DfExpr::Column(Column::new(
3183                    right_qualifier_for_field.clone(),
3184                    right_field_col,
3185                ))
3186            } else if tags_not_in_right.contains(col) {
3187                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3188            } else {
3189                DfExpr::Column(Column::new(None::<String>, col))
3190            }
3191        });
3192        let right_proj_exprs = [right_time_index_expr]
3193            .into_iter()
3194            .chain(right_proj_exprs_without_time_index);
3195
3196        let left_projected = LogicalPlanBuilder::from(left)
3197            .project(left_proj_exprs)
3198            .context(DataFusionPlanningSnafu)?
3199            .alias(left_qualifier_string.clone())
3200            .context(DataFusionPlanningSnafu)?
3201            .build()
3202            .context(DataFusionPlanningSnafu)?;
3203        let right_projected = LogicalPlanBuilder::from(right)
3204            .project(right_proj_exprs)
3205            .context(DataFusionPlanningSnafu)?
3206            .alias(right_qualifier_string.clone())
3207            .context(DataFusionPlanningSnafu)?
3208            .build()
3209            .context(DataFusionPlanningSnafu)?;
3210
3211        // step 2: compute match columns
3212        let mut match_columns = if let Some(modifier) = modifier
3213            && let Some(matching) = &modifier.matching
3214        {
3215            match matching {
3216                // keeps columns mentioned in `on`
3217                LabelModifier::Include(on) => on.labels.clone(),
3218                // removes columns memtioned in `ignoring`
3219                LabelModifier::Exclude(ignoring) => {
3220                    let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3221                    all_tags.difference(&ignoring).cloned().collect()
3222                }
3223            }
3224        } else {
3225            all_tags.iter().cloned().collect()
3226        };
3227        // sort to ensure the generated plan is not volatile
3228        match_columns.sort_unstable();
3229        // step 3: build `UnionDistinctOn` plan
3230        let schema = left_projected.schema().clone();
3231        let union_distinct_on = UnionDistinctOn::new(
3232            left_projected,
3233            right_projected,
3234            match_columns,
3235            left_time_index_column.clone(),
3236            schema,
3237        );
3238        let result = LogicalPlan::Extension(Extension {
3239            node: Arc::new(union_distinct_on),
3240        });
3241
3242        // step 4: update context
3243        self.ctx.time_index_column = Some(left_time_index_column);
3244        self.ctx.tag_columns = all_tags.into_iter().collect();
3245        self.ctx.field_columns = vec![left_field_col.clone()];
3246
3247        Ok(result)
3248    }
3249
3250    /// Build a projection that project and perform operation expr for every value columns.
3251    /// Non-value columns (tag and timestamp) will be preserved in the projection.
3252    ///
3253    /// # Side effect
3254    ///
3255    /// This function will update the value columns in the context. Those new column names
3256    /// don't contains qualifier.
3257    fn projection_for_each_field_column<F>(
3258        &mut self,
3259        input: LogicalPlan,
3260        name_to_expr: F,
3261    ) -> Result<LogicalPlan>
3262    where
3263        F: FnMut(&String) -> Result<DfExpr>,
3264    {
3265        let non_field_columns_iter = self
3266            .ctx
3267            .tag_columns
3268            .iter()
3269            .chain(self.ctx.time_index_column.iter())
3270            .map(|col| {
3271                Ok(DfExpr::Column(Column::new(
3272                    self.ctx.table_name.clone().map(TableReference::bare),
3273                    col,
3274                )))
3275            });
3276
3277        // build computation exprs
3278        let result_field_columns = self
3279            .ctx
3280            .field_columns
3281            .iter()
3282            .map(name_to_expr)
3283            .collect::<Result<Vec<_>>>()?;
3284
3285        // alias the computation exprs to remove qualifier
3286        self.ctx.field_columns = result_field_columns
3287            .iter()
3288            .map(|expr| expr.schema_name().to_string())
3289            .collect();
3290        let field_columns_iter = result_field_columns
3291            .into_iter()
3292            .zip(self.ctx.field_columns.iter())
3293            .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3294
3295        // chain non-field columns (unchanged) and field columns (applied computation then alias)
3296        let project_fields = non_field_columns_iter
3297            .chain(field_columns_iter)
3298            .collect::<Result<Vec<_>>>()?;
3299
3300        LogicalPlanBuilder::from(input)
3301            .project(project_fields)
3302            .context(DataFusionPlanningSnafu)?
3303            .build()
3304            .context(DataFusionPlanningSnafu)
3305    }
3306
3307    /// Build a filter plan that filter on value column. Notice that only one value column
3308    /// is expected.
3309    fn filter_on_field_column<F>(
3310        &self,
3311        input: LogicalPlan,
3312        mut name_to_expr: F,
3313    ) -> Result<LogicalPlan>
3314    where
3315        F: FnMut(&String) -> Result<DfExpr>,
3316    {
3317        ensure!(
3318            self.ctx.field_columns.len() == 1,
3319            UnsupportedExprSnafu {
3320                name: "filter on multi-value input"
3321            }
3322        );
3323
3324        let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3325
3326        LogicalPlanBuilder::from(input)
3327            .filter(field_column_filter)
3328            .context(DataFusionPlanningSnafu)?
3329            .build()
3330            .context(DataFusionPlanningSnafu)
3331    }
3332
3333    /// Generate an expr like `date_part("hour", <TIME_INDEX>)`. Caller should ensure the
3334    /// time index column in context is set
3335    fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3336        let input_expr = datafusion::logical_expr::col(
3337            self.ctx
3338                .time_index_column
3339                .as_ref()
3340                // table name doesn't matters here
3341                .with_context(|| TimeIndexNotFoundSnafu {
3342                    table: "<doesn't matter>",
3343                })?,
3344        );
3345        let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3346            func: datafusion_functions::datetime::date_part(),
3347            args: vec![date_part.lit(), input_expr],
3348        });
3349        Ok(fn_expr)
3350    }
3351
3352    /// Apply an alias to the query result by adding a projection with the alias name
3353    fn apply_alias_projection(
3354        &mut self,
3355        plan: LogicalPlan,
3356        alias_name: String,
3357    ) -> Result<LogicalPlan> {
3358        let fields_expr = self.create_field_column_exprs()?;
3359
3360        // TODO(dennis): how to support multi-value aliasing?
3361        ensure!(
3362            fields_expr.len() == 1,
3363            UnsupportedExprSnafu {
3364                name: "alias on multi-value result"
3365            }
3366        );
3367
3368        let project_fields = fields_expr
3369            .into_iter()
3370            .map(|expr| expr.alias(&alias_name))
3371            .chain(self.create_tag_column_exprs()?)
3372            .chain(Some(self.create_time_index_column_expr()?));
3373
3374        LogicalPlanBuilder::from(plan)
3375            .project(project_fields)
3376            .context(DataFusionPlanningSnafu)?
3377            .build()
3378            .context(DataFusionPlanningSnafu)
3379    }
3380}
3381
3382#[derive(Default, Debug)]
3383struct FunctionArgs {
3384    input: Option<PromExpr>,
3385    literals: Vec<DfExpr>,
3386}
3387
3388/// Represents different types of scalar functions supported in PromQL expressions.
3389/// Each variant defines how the function should be processed and what arguments it expects.
3390#[derive(Debug, Clone)]
3391enum ScalarFunc {
3392    /// DataFusion's registered(including built-in) scalar functions (e.g., abs, sqrt, round, clamp).
3393    /// These are passed through directly to DataFusion's execution engine.
3394    /// Processing: Simple argument insertion at the specified position.
3395    DataFusionBuiltin(Arc<ScalarUdfDef>),
3396    /// User-defined functions registered in DataFusion's function registry.
3397    /// Similar to DataFusionBuiltin but for custom functions not built into DataFusion.
3398    /// Processing: Direct pass-through with argument positioning.
3399    DataFusionUdf(Arc<ScalarUdfDef>),
3400    /// PromQL-specific functions that operate on time series data with temporal context.
3401    /// These functions require both timestamp ranges and values to perform calculations.
3402    /// Processing: Automatically injects timestamp_range and value columns as first arguments.
3403    /// Examples: idelta, irate, resets, changes, deriv, *_over_time function
3404    Udf(Arc<ScalarUdfDef>),
3405    /// PromQL functions requiring extrapolation calculations with explicit range information.
3406    /// These functions need to know the time range length to perform rate calculations.
3407    /// The second field contains the range length in milliseconds.
3408    /// Processing: Injects timestamp_range, value, time_index columns and appends range_length.
3409    /// Examples: increase, rate, delta
3410    // TODO(ruihang): maybe merge with Udf later
3411    ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3412    /// Functions that generate expressions directly without external UDF calls.
3413    /// The expression is constructed during function matching and requires no additional processing.
3414    /// Examples: time(), minute(), hour(), month(), year() and other date/time extractors
3415    GeneratedExpr,
3416}
3417
3418#[cfg(test)]
3419mod test {
3420    use std::time::{Duration, UNIX_EPOCH};
3421
3422    use catalog::RegisterTableRequest;
3423    use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
3424    use common_base::Plugins;
3425    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3426    use common_query::prelude::greptime_timestamp;
3427    use common_query::test_util::DummyDecoder;
3428    use datatypes::prelude::ConcreteDataType;
3429    use datatypes::schema::{ColumnSchema, Schema};
3430    use promql_parser::label::Labels;
3431    use promql_parser::parser;
3432    use session::context::QueryContext;
3433    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3434    use table::test_util::EmptyTable;
3435
3436    use super::*;
3437    use crate::options::QueryOptions;
3438
3439    fn build_query_engine_state() -> QueryEngineState {
3440        QueryEngineState::new(
3441            new_memory_catalog_manager().unwrap(),
3442            None,
3443            None,
3444            None,
3445            None,
3446            None,
3447            false,
3448            Plugins::default(),
3449            QueryOptions::default(),
3450        )
3451    }
3452
3453    async fn build_test_table_provider(
3454        table_name_tuples: &[(String, String)],
3455        num_tag: usize,
3456        num_field: usize,
3457    ) -> DfTableSourceProvider {
3458        let catalog_list = MemoryCatalogManager::with_default_setup();
3459        for (schema_name, table_name) in table_name_tuples {
3460            let mut columns = vec![];
3461            for i in 0..num_tag {
3462                columns.push(ColumnSchema::new(
3463                    format!("tag_{i}"),
3464                    ConcreteDataType::string_datatype(),
3465                    false,
3466                ));
3467            }
3468            columns.push(
3469                ColumnSchema::new(
3470                    "timestamp".to_string(),
3471                    ConcreteDataType::timestamp_millisecond_datatype(),
3472                    false,
3473                )
3474                .with_time_index(true),
3475            );
3476            for i in 0..num_field {
3477                columns.push(ColumnSchema::new(
3478                    format!("field_{i}"),
3479                    ConcreteDataType::float64_datatype(),
3480                    true,
3481                ));
3482            }
3483            let schema = Arc::new(Schema::new(columns));
3484            let table_meta = TableMetaBuilder::empty()
3485                .schema(schema)
3486                .primary_key_indices((0..num_tag).collect())
3487                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3488                .next_column_id(1024)
3489                .build()
3490                .unwrap();
3491            let table_info = TableInfoBuilder::default()
3492                .name(table_name.clone())
3493                .meta(table_meta)
3494                .build()
3495                .unwrap();
3496            let table = EmptyTable::from_table_info(&table_info);
3497
3498            assert!(
3499                catalog_list
3500                    .register_table_sync(RegisterTableRequest {
3501                        catalog: DEFAULT_CATALOG_NAME.to_string(),
3502                        schema: schema_name.clone(),
3503                        table_name: table_name.clone(),
3504                        table_id: 1024,
3505                        table,
3506                    })
3507                    .is_ok()
3508            );
3509        }
3510
3511        DfTableSourceProvider::new(
3512            catalog_list,
3513            false,
3514            QueryContext::arc(),
3515            DummyDecoder::arc(),
3516            false,
3517        )
3518    }
3519
3520    async fn build_test_table_provider_with_fields(
3521        table_name_tuples: &[(String, String)],
3522        tags: &[&str],
3523    ) -> DfTableSourceProvider {
3524        let catalog_list = MemoryCatalogManager::with_default_setup();
3525        for (schema_name, table_name) in table_name_tuples {
3526            let mut columns = vec![];
3527            let num_tag = tags.len();
3528            for tag in tags {
3529                columns.push(ColumnSchema::new(
3530                    tag.to_string(),
3531                    ConcreteDataType::string_datatype(),
3532                    false,
3533                ));
3534            }
3535            columns.push(
3536                ColumnSchema::new(
3537                    greptime_timestamp().to_string(),
3538                    ConcreteDataType::timestamp_millisecond_datatype(),
3539                    false,
3540                )
3541                .with_time_index(true),
3542            );
3543            columns.push(ColumnSchema::new(
3544                greptime_value().to_string(),
3545                ConcreteDataType::float64_datatype(),
3546                true,
3547            ));
3548            let schema = Arc::new(Schema::new(columns));
3549            let table_meta = TableMetaBuilder::empty()
3550                .schema(schema)
3551                .primary_key_indices((0..num_tag).collect())
3552                .next_column_id(1024)
3553                .build()
3554                .unwrap();
3555            let table_info = TableInfoBuilder::default()
3556                .name(table_name.clone())
3557                .meta(table_meta)
3558                .build()
3559                .unwrap();
3560            let table = EmptyTable::from_table_info(&table_info);
3561
3562            assert!(
3563                catalog_list
3564                    .register_table_sync(RegisterTableRequest {
3565                        catalog: DEFAULT_CATALOG_NAME.to_string(),
3566                        schema: schema_name.clone(),
3567                        table_name: table_name.clone(),
3568                        table_id: 1024,
3569                        table,
3570                    })
3571                    .is_ok()
3572            );
3573        }
3574
3575        DfTableSourceProvider::new(
3576            catalog_list,
3577            false,
3578            QueryContext::arc(),
3579            DummyDecoder::arc(),
3580            false,
3581        )
3582    }
3583
3584    // {
3585    //     input: `abs(some_metric{foo!="bar"})`,
3586    //     expected: &Call{
3587    //         Func: MustGetFunction("abs"),
3588    //         Args: Expressions{
3589    //             &VectorSelector{
3590    //                 Name: "some_metric",
3591    //                 LabelMatchers: []*labels.Matcher{
3592    //                     MustLabelMatcher(labels.MatchNotEqual, "foo", "bar"),
3593    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3594    //                 },
3595    //             },
3596    //         },
3597    //     },
3598    // },
3599    async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3600        let prom_expr =
3601            parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3602        let eval_stmt = EvalStmt {
3603            expr: prom_expr,
3604            start: UNIX_EPOCH,
3605            end: UNIX_EPOCH
3606                .checked_add(Duration::from_secs(100_000))
3607                .unwrap(),
3608            interval: Duration::from_secs(5),
3609            lookback_delta: Duration::from_secs(1),
3610        };
3611
3612        let table_provider = build_test_table_provider(
3613            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3614            1,
3615            1,
3616        )
3617        .await;
3618        let plan =
3619            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3620                .await
3621                .unwrap();
3622
3623        let expected = String::from(
3624            "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3625            \n  Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3626            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3627            \n      PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3628            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3629            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3630            \n            TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3631        ).replace("TEMPLATE", plan_name);
3632
3633        assert_eq!(plan.display_indent_schema().to_string(), expected);
3634    }
3635
3636    #[tokio::test]
3637    async fn single_abs() {
3638        do_single_instant_function_call("abs", "abs").await;
3639    }
3640
3641    #[tokio::test]
3642    #[should_panic]
3643    async fn single_absent() {
3644        do_single_instant_function_call("absent", "").await;
3645    }
3646
3647    #[tokio::test]
3648    async fn single_ceil() {
3649        do_single_instant_function_call("ceil", "ceil").await;
3650    }
3651
3652    #[tokio::test]
3653    async fn single_exp() {
3654        do_single_instant_function_call("exp", "exp").await;
3655    }
3656
3657    #[tokio::test]
3658    async fn single_ln() {
3659        do_single_instant_function_call("ln", "ln").await;
3660    }
3661
3662    #[tokio::test]
3663    async fn single_log2() {
3664        do_single_instant_function_call("log2", "log2").await;
3665    }
3666
3667    #[tokio::test]
3668    async fn single_log10() {
3669        do_single_instant_function_call("log10", "log10").await;
3670    }
3671
3672    #[tokio::test]
3673    #[should_panic]
3674    async fn single_scalar() {
3675        do_single_instant_function_call("scalar", "").await;
3676    }
3677
3678    #[tokio::test]
3679    #[should_panic]
3680    async fn single_sgn() {
3681        do_single_instant_function_call("sgn", "").await;
3682    }
3683
3684    #[tokio::test]
3685    #[should_panic]
3686    async fn single_sort() {
3687        do_single_instant_function_call("sort", "").await;
3688    }
3689
3690    #[tokio::test]
3691    #[should_panic]
3692    async fn single_sort_desc() {
3693        do_single_instant_function_call("sort_desc", "").await;
3694    }
3695
3696    #[tokio::test]
3697    async fn single_sqrt() {
3698        do_single_instant_function_call("sqrt", "sqrt").await;
3699    }
3700
3701    #[tokio::test]
3702    #[should_panic]
3703    async fn single_timestamp() {
3704        do_single_instant_function_call("timestamp", "").await;
3705    }
3706
3707    #[tokio::test]
3708    async fn single_acos() {
3709        do_single_instant_function_call("acos", "acos").await;
3710    }
3711
3712    #[tokio::test]
3713    #[should_panic]
3714    async fn single_acosh() {
3715        do_single_instant_function_call("acosh", "").await;
3716    }
3717
3718    #[tokio::test]
3719    async fn single_asin() {
3720        do_single_instant_function_call("asin", "asin").await;
3721    }
3722
3723    #[tokio::test]
3724    #[should_panic]
3725    async fn single_asinh() {
3726        do_single_instant_function_call("asinh", "").await;
3727    }
3728
3729    #[tokio::test]
3730    async fn single_atan() {
3731        do_single_instant_function_call("atan", "atan").await;
3732    }
3733
3734    #[tokio::test]
3735    #[should_panic]
3736    async fn single_atanh() {
3737        do_single_instant_function_call("atanh", "").await;
3738    }
3739
3740    #[tokio::test]
3741    async fn single_cos() {
3742        do_single_instant_function_call("cos", "cos").await;
3743    }
3744
3745    #[tokio::test]
3746    #[should_panic]
3747    async fn single_cosh() {
3748        do_single_instant_function_call("cosh", "").await;
3749    }
3750
3751    #[tokio::test]
3752    async fn single_sin() {
3753        do_single_instant_function_call("sin", "sin").await;
3754    }
3755
3756    #[tokio::test]
3757    #[should_panic]
3758    async fn single_sinh() {
3759        do_single_instant_function_call("sinh", "").await;
3760    }
3761
3762    #[tokio::test]
3763    async fn single_tan() {
3764        do_single_instant_function_call("tan", "tan").await;
3765    }
3766
3767    #[tokio::test]
3768    #[should_panic]
3769    async fn single_tanh() {
3770        do_single_instant_function_call("tanh", "").await;
3771    }
3772
3773    #[tokio::test]
3774    #[should_panic]
3775    async fn single_deg() {
3776        do_single_instant_function_call("deg", "").await;
3777    }
3778
3779    #[tokio::test]
3780    #[should_panic]
3781    async fn single_rad() {
3782        do_single_instant_function_call("rad", "").await;
3783    }
3784
3785    // {
3786    //     input: "avg by (foo)(some_metric)",
3787    //     expected: &AggregateExpr{
3788    //         Op: AVG,
3789    //         Expr: &VectorSelector{
3790    //             Name: "some_metric",
3791    //             LabelMatchers: []*labels.Matcher{
3792    //                 MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3793    //             },
3794    //             PosRange: PositionRange{
3795    //                 Start: 13,
3796    //                 End:   24,
3797    //             },
3798    //         },
3799    //         Grouping: []string{"foo"},
3800    //         PosRange: PositionRange{
3801    //             Start: 0,
3802    //             End:   25,
3803    //         },
3804    //     },
3805    // },
3806    async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3807        let prom_expr = parser::parse(&format!(
3808            "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3809        ))
3810        .unwrap();
3811        let mut eval_stmt = EvalStmt {
3812            expr: prom_expr,
3813            start: UNIX_EPOCH,
3814            end: UNIX_EPOCH
3815                .checked_add(Duration::from_secs(100_000))
3816                .unwrap(),
3817            interval: Duration::from_secs(5),
3818            lookback_delta: Duration::from_secs(1),
3819        };
3820
3821        // test group by
3822        let table_provider = build_test_table_provider(
3823            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3824            2,
3825            2,
3826        )
3827        .await;
3828        let plan =
3829            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3830                .await
3831                .unwrap();
3832        let expected_no_without = String::from(
3833            "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3834            \n  Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3835            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3836            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3837            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3838            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3839            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3840        ).replace("TEMPLATE", plan_name);
3841        assert_eq!(
3842            plan.display_indent_schema().to_string(),
3843            expected_no_without
3844        );
3845
3846        // test group without
3847        if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3848            *modifier = Some(LabelModifier::Exclude(Labels {
3849                labels: vec![String::from("tag_1")].into_iter().collect(),
3850            }));
3851        }
3852        let table_provider = build_test_table_provider(
3853            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3854            2,
3855            2,
3856        )
3857        .await;
3858        let plan =
3859            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3860                .await
3861                .unwrap();
3862        let expected_without = String::from(
3863            "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3864            \n  Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3865            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3866            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3867            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3868            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3869            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3870        ).replace("TEMPLATE", plan_name);
3871        assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3872    }
3873
3874    #[tokio::test]
3875    async fn aggregate_sum() {
3876        do_aggregate_expr_plan("sum", "sum").await;
3877    }
3878
3879    #[tokio::test]
3880    async fn aggregate_avg() {
3881        do_aggregate_expr_plan("avg", "avg").await;
3882    }
3883
3884    #[tokio::test]
3885    #[should_panic] // output type doesn't match
3886    async fn aggregate_count() {
3887        do_aggregate_expr_plan("count", "count").await;
3888    }
3889
3890    #[tokio::test]
3891    async fn aggregate_min() {
3892        do_aggregate_expr_plan("min", "min").await;
3893    }
3894
3895    #[tokio::test]
3896    async fn aggregate_max() {
3897        do_aggregate_expr_plan("max", "max").await;
3898    }
3899
3900    #[tokio::test]
3901    #[should_panic] // output type doesn't match
3902    async fn aggregate_group() {
3903        do_aggregate_expr_plan("grouping", "GROUPING").await;
3904    }
3905
3906    #[tokio::test]
3907    async fn aggregate_stddev() {
3908        do_aggregate_expr_plan("stddev", "stddev_pop").await;
3909    }
3910
3911    #[tokio::test]
3912    async fn aggregate_stdvar() {
3913        do_aggregate_expr_plan("stdvar", "var_pop").await;
3914    }
3915
3916    // TODO(ruihang): add range fn tests once exprs are ready.
3917
3918    // {
3919    //     input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}",
3920    //     expected: &BinaryExpr{
3921    //         Op: ADD,
3922    //         LHS: &VectorSelector{
3923    //             Name: "a",
3924    //             LabelMatchers: []*labels.Matcher{
3925    //                     MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"),
3926    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3927    //             },
3928    //         },
3929    //         RHS: &VectorSelector{
3930    //             Name: "sum",
3931    //             LabelMatchers: []*labels.Matcher{
3932    //                     MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"),
3933    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3934    //             },
3935    //         },
3936    //         VectorMatching: &VectorMatching{},
3937    //     },
3938    // },
3939    #[tokio::test]
3940    async fn binary_op_column_column() {
3941        let prom_expr =
3942            parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3943        let eval_stmt = EvalStmt {
3944            expr: prom_expr,
3945            start: UNIX_EPOCH,
3946            end: UNIX_EPOCH
3947                .checked_add(Duration::from_secs(100_000))
3948                .unwrap(),
3949            interval: Duration::from_secs(5),
3950            lookback_delta: Duration::from_secs(1),
3951        };
3952
3953        let table_provider = build_test_table_provider(
3954            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3955            1,
3956            1,
3957        )
3958        .await;
3959        let plan =
3960            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3961                .await
3962                .unwrap();
3963
3964        let expected = String::from(
3965            "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3966            \n  Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3967            \n    SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3968            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3969            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3970            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3971            \n            Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3972            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3973            \n    SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3974            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3975            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3976            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3977            \n            Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3978            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
3979        );
3980
3981        assert_eq!(plan.display_indent_schema().to_string(), expected);
3982    }
3983
3984    async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
3985        let prom_expr = parser::parse(query).unwrap();
3986        let eval_stmt = EvalStmt {
3987            expr: prom_expr,
3988            start: UNIX_EPOCH,
3989            end: UNIX_EPOCH
3990                .checked_add(Duration::from_secs(100_000))
3991                .unwrap(),
3992            interval: Duration::from_secs(5),
3993            lookback_delta: Duration::from_secs(1),
3994        };
3995
3996        let table_provider = build_test_table_provider(
3997            &[
3998                (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
3999                (
4000                    "greptime_private".to_string(),
4001                    "some_alt_metric".to_string(),
4002                ),
4003            ],
4004            1,
4005            1,
4006        )
4007        .await;
4008        let plan =
4009            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4010                .await
4011                .unwrap();
4012
4013        assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
4014    }
4015
4016    #[tokio::test]
4017    async fn binary_op_literal_column() {
4018        let query = r#"1 + some_metric{tag_0="bar"}"#;
4019        let expected = String::from(
4020            "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
4021            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4022            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4023            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4024            \n        Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4025            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4026        );
4027
4028        indie_query_plan_compare(query, expected).await;
4029    }
4030
4031    #[tokio::test]
4032    async fn binary_op_literal_literal() {
4033        let query = r#"1 + 1"#;
4034        let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
4035  TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
4036        indie_query_plan_compare(query, expected).await;
4037    }
4038
4039    #[tokio::test]
4040    async fn simple_bool_grammar() {
4041        let query = "some_metric != bool 1.2345";
4042        let expected = String::from(
4043            "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
4044            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4045            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4046            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4047            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4048            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4049        );
4050
4051        indie_query_plan_compare(query, expected).await;
4052    }
4053
4054    #[tokio::test]
4055    async fn bool_with_additional_arithmetic() {
4056        let query = "some_metric + (1 == bool 2)";
4057        let expected = String::from(
4058            "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
4059            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4060            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4061            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4062            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4063            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4064        );
4065
4066        indie_query_plan_compare(query, expected).await;
4067    }
4068
4069    #[tokio::test]
4070    async fn simple_unary() {
4071        let query = "-some_metric";
4072        let expected = String::from(
4073            "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
4074            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4075            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4076            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4077            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4078            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4079        );
4080
4081        indie_query_plan_compare(query, expected).await;
4082    }
4083
4084    #[tokio::test]
4085    async fn increase_aggr() {
4086        let query = "increase(some_metric[5m])";
4087        let expected = String::from(
4088            "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4089            \n  Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4090            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4091            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4092            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4093            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4094            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4095            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4096        );
4097
4098        indie_query_plan_compare(query, expected).await;
4099    }
4100
4101    #[tokio::test]
4102    async fn less_filter_on_value() {
4103        let query = "some_metric < 1.2345";
4104        let expected = String::from(
4105            "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4106            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4107            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4108            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4109            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4110            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4111        );
4112
4113        indie_query_plan_compare(query, expected).await;
4114    }
4115
4116    #[tokio::test]
4117    async fn count_over_time() {
4118        let query = "count_over_time(some_metric[5m])";
4119        let expected = String::from(
4120            "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4121            \n  Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4122            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4123            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4124            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4125            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4126            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4127            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4128        );
4129
4130        indie_query_plan_compare(query, expected).await;
4131    }
4132
4133    #[tokio::test]
4134    async fn test_hash_join() {
4135        let mut eval_stmt = EvalStmt {
4136            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4137            start: UNIX_EPOCH,
4138            end: UNIX_EPOCH
4139                .checked_add(Duration::from_secs(100_000))
4140                .unwrap(),
4141            interval: Duration::from_secs(5),
4142            lookback_delta: Duration::from_secs(1),
4143        };
4144
4145        let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
4146
4147        let prom_expr = parser::parse(case).unwrap();
4148        eval_stmt.expr = prom_expr;
4149        let table_provider = build_test_table_provider_with_fields(
4150            &[
4151                (
4152                    DEFAULT_SCHEMA_NAME.to_string(),
4153                    "http_server_requests_seconds_sum".to_string(),
4154                ),
4155                (
4156                    DEFAULT_SCHEMA_NAME.to_string(),
4157                    "http_server_requests_seconds_count".to_string(),
4158                ),
4159            ],
4160            &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
4161        )
4162        .await;
4163        // Should be ok
4164        let plan =
4165            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4166                .await
4167                .unwrap();
4168        let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
4169            \n  Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
4170            \n    SubqueryAlias: http_server_requests_seconds_sum\
4171            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4172            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4173            \n          Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
4174            \n            Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4175            \n              TableScan: http_server_requests_seconds_sum\
4176            \n    SubqueryAlias: http_server_requests_seconds_count\
4177            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4178            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4179            \n          Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
4180            \n            Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4181            \n              TableScan: http_server_requests_seconds_count";
4182        assert_eq!(plan.to_string(), expected);
4183    }
4184
4185    #[tokio::test]
4186    async fn test_nested_histogram_quantile() {
4187        let mut eval_stmt = EvalStmt {
4188            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4189            start: UNIX_EPOCH,
4190            end: UNIX_EPOCH
4191                .checked_add(Duration::from_secs(100_000))
4192                .unwrap(),
4193            interval: Duration::from_secs(5),
4194            lookback_delta: Duration::from_secs(1),
4195        };
4196
4197        let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
4198
4199        let prom_expr = parser::parse(case).unwrap();
4200        eval_stmt.expr = prom_expr;
4201        let table_provider = build_test_table_provider_with_fields(
4202            &[(
4203                DEFAULT_SCHEMA_NAME.to_string(),
4204                "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
4205            )],
4206            &["pod", "le", "path", "code", "container"],
4207        )
4208        .await;
4209        // Should be ok
4210        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4211            .await
4212            .unwrap();
4213    }
4214
4215    #[tokio::test]
4216    async fn test_parse_and_operator() {
4217        let mut eval_stmt = EvalStmt {
4218            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4219            start: UNIX_EPOCH,
4220            end: UNIX_EPOCH
4221                .checked_add(Duration::from_secs(100_000))
4222                .unwrap(),
4223            interval: Duration::from_secs(5),
4224            lookback_delta: Duration::from_secs(1),
4225        };
4226
4227        let cases = [
4228            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4229            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4230        ];
4231
4232        for case in cases {
4233            let prom_expr = parser::parse(case).unwrap();
4234            eval_stmt.expr = prom_expr;
4235            let table_provider = build_test_table_provider_with_fields(
4236                &[
4237                    (
4238                        DEFAULT_SCHEMA_NAME.to_string(),
4239                        "kubelet_volume_stats_used_bytes".to_string(),
4240                    ),
4241                    (
4242                        DEFAULT_SCHEMA_NAME.to_string(),
4243                        "kubelet_volume_stats_capacity_bytes".to_string(),
4244                    ),
4245                ],
4246                &["namespace", "persistentvolumeclaim"],
4247            )
4248            .await;
4249            // Should be ok
4250            let _ =
4251                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4252                    .await
4253                    .unwrap();
4254        }
4255    }
4256
4257    #[tokio::test]
4258    async fn test_nested_binary_op() {
4259        let mut eval_stmt = EvalStmt {
4260            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4261            start: UNIX_EPOCH,
4262            end: UNIX_EPOCH
4263                .checked_add(Duration::from_secs(100_000))
4264                .unwrap(),
4265            interval: Duration::from_secs(5),
4266            lookback_delta: Duration::from_secs(1),
4267        };
4268
4269        let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
4270        (
4271            sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
4272            or
4273            vector(0)
4274        )"#;
4275
4276        let prom_expr = parser::parse(case).unwrap();
4277        eval_stmt.expr = prom_expr;
4278        let table_provider = build_test_table_provider_with_fields(
4279            &[(
4280                DEFAULT_SCHEMA_NAME.to_string(),
4281                "nginx_ingress_controller_requests".to_string(),
4282            )],
4283            &["namespace", "job"],
4284        )
4285        .await;
4286        // Should be ok
4287        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4288            .await
4289            .unwrap();
4290    }
4291
4292    #[tokio::test]
4293    async fn test_parse_or_operator() {
4294        let mut eval_stmt = EvalStmt {
4295            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4296            start: UNIX_EPOCH,
4297            end: UNIX_EPOCH
4298                .checked_add(Duration::from_secs(100_000))
4299                .unwrap(),
4300            interval: Duration::from_secs(5),
4301            lookback_delta: Duration::from_secs(1),
4302        };
4303
4304        let case = r#"
4305        sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
4306        (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
4307            or
4308        200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
4309        sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
4310
4311        let table_provider = build_test_table_provider_with_fields(
4312            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4313            &["tenant_name", "cluster_name"],
4314        )
4315        .await;
4316        eval_stmt.expr = parser::parse(case).unwrap();
4317        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4318            .await
4319            .unwrap();
4320
4321        let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4322            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
4323            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4324            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4325            or
4326            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4327            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4328            or
4329            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4330            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
4331        let table_provider = build_test_table_provider_with_fields(
4332            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4333            &["tenant_name", "cluster_name"],
4334        )
4335        .await;
4336        eval_stmt.expr = parser::parse(case).unwrap();
4337        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4338            .await
4339            .unwrap();
4340
4341        let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
4342            sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4343            (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4344            (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
4345        let table_provider = build_test_table_provider_with_fields(
4346            &[
4347                (
4348                    DEFAULT_SCHEMA_NAME.to_string(),
4349                    "background_waitevent_cnt".to_string(),
4350                ),
4351                (
4352                    DEFAULT_SCHEMA_NAME.to_string(),
4353                    "foreground_waitevent_cnt".to_string(),
4354                ),
4355            ],
4356            &["tenant_name", "cluster_name"],
4357        )
4358        .await;
4359        eval_stmt.expr = parser::parse(case).unwrap();
4360        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4361            .await
4362            .unwrap();
4363
4364        let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
4365        let table_provider = build_test_table_provider_with_fields(
4366            &[
4367                (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
4368                (
4369                    DEFAULT_SCHEMA_NAME.to_string(),
4370                    "container_cpu_load_average_10s".to_string(),
4371                ),
4372                (
4373                    DEFAULT_SCHEMA_NAME.to_string(),
4374                    "container_spec_cpu_quota".to_string(),
4375                ),
4376            ],
4377            &["cluster_name", "host_name"],
4378        )
4379        .await;
4380        eval_stmt.expr = parser::parse(case).unwrap();
4381        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4382            .await
4383            .unwrap();
4384    }
4385
4386    #[tokio::test]
4387    async fn value_matcher() {
4388        // template
4389        let mut eval_stmt = EvalStmt {
4390            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4391            start: UNIX_EPOCH,
4392            end: UNIX_EPOCH
4393                .checked_add(Duration::from_secs(100_000))
4394                .unwrap(),
4395            interval: Duration::from_secs(5),
4396            lookback_delta: Duration::from_secs(1),
4397        };
4398
4399        let cases = [
4400            // single equal matcher
4401            (
4402                r#"some_metric{__field__="field_1"}"#,
4403                vec![
4404                    "some_metric.field_1",
4405                    "some_metric.tag_0",
4406                    "some_metric.tag_1",
4407                    "some_metric.tag_2",
4408                    "some_metric.timestamp",
4409                ],
4410            ),
4411            // two equal matchers
4412            (
4413                r#"some_metric{__field__="field_1", __field__="field_0"}"#,
4414                vec![
4415                    "some_metric.field_0",
4416                    "some_metric.field_1",
4417                    "some_metric.tag_0",
4418                    "some_metric.tag_1",
4419                    "some_metric.tag_2",
4420                    "some_metric.timestamp",
4421                ],
4422            ),
4423            // single not_eq matcher
4424            (
4425                r#"some_metric{__field__!="field_1"}"#,
4426                vec![
4427                    "some_metric.field_0",
4428                    "some_metric.field_2",
4429                    "some_metric.tag_0",
4430                    "some_metric.tag_1",
4431                    "some_metric.tag_2",
4432                    "some_metric.timestamp",
4433                ],
4434            ),
4435            // two not_eq matchers
4436            (
4437                r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
4438                vec![
4439                    "some_metric.field_0",
4440                    "some_metric.tag_0",
4441                    "some_metric.tag_1",
4442                    "some_metric.tag_2",
4443                    "some_metric.timestamp",
4444                ],
4445            ),
4446            // equal and not_eq matchers (no conflict)
4447            (
4448                r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
4449                vec![
4450                    "some_metric.field_1",
4451                    "some_metric.tag_0",
4452                    "some_metric.tag_1",
4453                    "some_metric.tag_2",
4454                    "some_metric.timestamp",
4455                ],
4456            ),
4457            // equal and not_eq matchers (conflict)
4458            (
4459                r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
4460                vec![
4461                    "some_metric.tag_0",
4462                    "some_metric.tag_1",
4463                    "some_metric.tag_2",
4464                    "some_metric.timestamp",
4465                ],
4466            ),
4467            // single regex eq matcher
4468            (
4469                r#"some_metric{__field__=~"field_1|field_2"}"#,
4470                vec![
4471                    "some_metric.field_1",
4472                    "some_metric.field_2",
4473                    "some_metric.tag_0",
4474                    "some_metric.tag_1",
4475                    "some_metric.tag_2",
4476                    "some_metric.timestamp",
4477                ],
4478            ),
4479            // single regex not_eq matcher
4480            (
4481                r#"some_metric{__field__!~"field_1|field_2"}"#,
4482                vec![
4483                    "some_metric.field_0",
4484                    "some_metric.tag_0",
4485                    "some_metric.tag_1",
4486                    "some_metric.tag_2",
4487                    "some_metric.timestamp",
4488                ],
4489            ),
4490        ];
4491
4492        for case in cases {
4493            let prom_expr = parser::parse(case.0).unwrap();
4494            eval_stmt.expr = prom_expr;
4495            let table_provider = build_test_table_provider(
4496                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4497                3,
4498                3,
4499            )
4500            .await;
4501            let plan =
4502                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4503                    .await
4504                    .unwrap();
4505            let mut fields = plan.schema().field_names();
4506            let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
4507            fields.sort();
4508            expected.sort();
4509            assert_eq!(fields, expected, "case: {:?}", case.0);
4510        }
4511
4512        let bad_cases = [
4513            r#"some_metric{__field__="nonexistent"}"#,
4514            r#"some_metric{__field__!="nonexistent"}"#,
4515        ];
4516
4517        for case in bad_cases {
4518            let prom_expr = parser::parse(case).unwrap();
4519            eval_stmt.expr = prom_expr;
4520            let table_provider = build_test_table_provider(
4521                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4522                3,
4523                3,
4524            )
4525            .await;
4526            let plan =
4527                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4528                    .await;
4529            assert!(plan.is_err(), "case: {:?}", case);
4530        }
4531    }
4532
4533    #[tokio::test]
4534    async fn custom_schema() {
4535        let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4536        let expected = String::from(
4537            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4538            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4539            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4540            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4541            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4542        );
4543
4544        indie_query_plan_compare(query, expected).await;
4545
4546        let query = "some_alt_metric{__database__=\"greptime_private\"}";
4547        let expected = String::from(
4548            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4549            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4550            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4551            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4552            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4553        );
4554
4555        indie_query_plan_compare(query, expected).await;
4556
4557        let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4558        let expected = String::from(
4559            "Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4560        \n  Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4561        \n    SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4562        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4563        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4564        \n          Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4565        \n            Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4566        \n              TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4567        \n    SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4568        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4569        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4570        \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4571        \n            Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4572        \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4573        );
4574
4575        indie_query_plan_compare(query, expected).await;
4576    }
4577
4578    #[tokio::test]
4579    async fn only_equals_is_supported_for_special_matcher() {
4580        let queries = &[
4581            "some_alt_metric{__schema__!=\"greptime_private\"}",
4582            "some_alt_metric{__schema__=~\"lalala\"}",
4583            "some_alt_metric{__database__!=\"greptime_private\"}",
4584            "some_alt_metric{__database__=~\"lalala\"}",
4585        ];
4586
4587        for query in queries {
4588            let prom_expr = parser::parse(query).unwrap();
4589            let eval_stmt = EvalStmt {
4590                expr: prom_expr,
4591                start: UNIX_EPOCH,
4592                end: UNIX_EPOCH
4593                    .checked_add(Duration::from_secs(100_000))
4594                    .unwrap(),
4595                interval: Duration::from_secs(5),
4596                lookback_delta: Duration::from_secs(1),
4597            };
4598
4599            let table_provider = build_test_table_provider(
4600                &[
4601                    (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4602                    (
4603                        "greptime_private".to_string(),
4604                        "some_alt_metric".to_string(),
4605                    ),
4606                ],
4607                1,
4608                1,
4609            )
4610            .await;
4611
4612            let plan =
4613                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4614                    .await;
4615            assert!(plan.is_err(), "query: {:?}", query);
4616        }
4617    }
4618
4619    #[tokio::test]
4620    async fn test_non_ms_precision() {
4621        let catalog_list = MemoryCatalogManager::with_default_setup();
4622        let columns = vec![
4623            ColumnSchema::new(
4624                "tag".to_string(),
4625                ConcreteDataType::string_datatype(),
4626                false,
4627            ),
4628            ColumnSchema::new(
4629                "timestamp".to_string(),
4630                ConcreteDataType::timestamp_nanosecond_datatype(),
4631                false,
4632            )
4633            .with_time_index(true),
4634            ColumnSchema::new(
4635                "field".to_string(),
4636                ConcreteDataType::float64_datatype(),
4637                true,
4638            ),
4639        ];
4640        let schema = Arc::new(Schema::new(columns));
4641        let table_meta = TableMetaBuilder::empty()
4642            .schema(schema)
4643            .primary_key_indices(vec![0])
4644            .value_indices(vec![2])
4645            .next_column_id(1024)
4646            .build()
4647            .unwrap();
4648        let table_info = TableInfoBuilder::default()
4649            .name("metrics".to_string())
4650            .meta(table_meta)
4651            .build()
4652            .unwrap();
4653        let table = EmptyTable::from_table_info(&table_info);
4654        assert!(
4655            catalog_list
4656                .register_table_sync(RegisterTableRequest {
4657                    catalog: DEFAULT_CATALOG_NAME.to_string(),
4658                    schema: DEFAULT_SCHEMA_NAME.to_string(),
4659                    table_name: "metrics".to_string(),
4660                    table_id: 1024,
4661                    table,
4662                })
4663                .is_ok()
4664        );
4665
4666        let plan = PromPlanner::stmt_to_plan(
4667            DfTableSourceProvider::new(
4668                catalog_list.clone(),
4669                false,
4670                QueryContext::arc(),
4671                DummyDecoder::arc(),
4672                true,
4673            ),
4674            &EvalStmt {
4675                expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4676                start: UNIX_EPOCH,
4677                end: UNIX_EPOCH
4678                    .checked_add(Duration::from_secs(100_000))
4679                    .unwrap(),
4680                interval: Duration::from_secs(5),
4681                lookback_delta: Duration::from_secs(1),
4682            },
4683            &build_query_engine_state(),
4684        )
4685        .await
4686        .unwrap();
4687        assert_eq!(
4688            plan.display_indent_schema().to_string(),
4689            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4690        \n  PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4691        \n    Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4692        \n      Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4693        \n        Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4694        \n          TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4695        );
4696        let plan = PromPlanner::stmt_to_plan(
4697            DfTableSourceProvider::new(
4698                catalog_list.clone(),
4699                false,
4700                QueryContext::arc(),
4701                DummyDecoder::arc(),
4702                true,
4703            ),
4704            &EvalStmt {
4705                expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4706                start: UNIX_EPOCH,
4707                end: UNIX_EPOCH
4708                    .checked_add(Duration::from_secs(100_000))
4709                    .unwrap(),
4710                interval: Duration::from_secs(5),
4711                lookback_delta: Duration::from_secs(1),
4712            },
4713            &build_query_engine_state(),
4714        )
4715        .await
4716        .unwrap();
4717        assert_eq!(
4718            plan.display_indent_schema().to_string(),
4719            "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4720        \n  Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4721        \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4722        \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4723        \n        PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4724        \n          Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4725        \n            Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4726        \n              Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4727        \n                TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4728        );
4729    }
4730
4731    #[tokio::test]
4732    async fn test_nonexistent_label() {
4733        // template
4734        let mut eval_stmt = EvalStmt {
4735            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4736            start: UNIX_EPOCH,
4737            end: UNIX_EPOCH
4738                .checked_add(Duration::from_secs(100_000))
4739                .unwrap(),
4740            interval: Duration::from_secs(5),
4741            lookback_delta: Duration::from_secs(1),
4742        };
4743
4744        let case = r#"some_metric{nonexistent="hi"}"#;
4745        let prom_expr = parser::parse(case).unwrap();
4746        eval_stmt.expr = prom_expr;
4747        let table_provider = build_test_table_provider(
4748            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4749            3,
4750            3,
4751        )
4752        .await;
4753        // Should be ok
4754        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4755            .await
4756            .unwrap();
4757    }
4758
4759    #[tokio::test]
4760    async fn test_label_join() {
4761        let prom_expr = parser::parse(
4762            "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4763        )
4764        .unwrap();
4765        let eval_stmt = EvalStmt {
4766            expr: prom_expr,
4767            start: UNIX_EPOCH,
4768            end: UNIX_EPOCH
4769                .checked_add(Duration::from_secs(100_000))
4770                .unwrap(),
4771            interval: Duration::from_secs(5),
4772            lookback_delta: Duration::from_secs(1),
4773        };
4774
4775        let table_provider =
4776            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4777                .await;
4778        let plan =
4779            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4780                .await
4781                .unwrap();
4782
4783        let expected = r#"
4784Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4785  Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4786    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4787      PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4788        Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4789          Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4790            TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4791
4792        let ret = plan.display_indent_schema().to_string();
4793        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4794    }
4795
4796    #[tokio::test]
4797    async fn test_label_replace() {
4798        let prom_expr = parser::parse(
4799            "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4800        )
4801        .unwrap();
4802        let eval_stmt = EvalStmt {
4803            expr: prom_expr,
4804            start: UNIX_EPOCH,
4805            end: UNIX_EPOCH
4806                .checked_add(Duration::from_secs(100_000))
4807                .unwrap(),
4808            interval: Duration::from_secs(5),
4809            lookback_delta: Duration::from_secs(1),
4810        };
4811
4812        let table_provider =
4813            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4814                .await;
4815        let plan =
4816            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4817                .await
4818                .unwrap();
4819
4820        let expected = r#"
4821Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4822  Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4823    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4824      PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4825        Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4826          Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4827            TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4828
4829        let ret = plan.display_indent_schema().to_string();
4830        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4831    }
4832
4833    #[tokio::test]
4834    async fn test_matchers_to_expr() {
4835        let mut eval_stmt = EvalStmt {
4836            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4837            start: UNIX_EPOCH,
4838            end: UNIX_EPOCH
4839                .checked_add(Duration::from_secs(100_000))
4840                .unwrap(),
4841            interval: Duration::from_secs(5),
4842            lookback_delta: Duration::from_secs(1),
4843        };
4844        let case =
4845            r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4846
4847        let prom_expr = parser::parse(case).unwrap();
4848        eval_stmt.expr = prom_expr;
4849        let table_provider = build_test_table_provider(
4850            &[(
4851                DEFAULT_SCHEMA_NAME.to_string(),
4852                "prometheus_tsdb_head_series".to_string(),
4853            )],
4854            3,
4855            3,
4856        )
4857        .await;
4858        let plan =
4859            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4860                .await
4861                .unwrap();
4862        let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4863        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4864        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4865        \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4866        \n        Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4867        \n          Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4868        \n            TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4869        assert_eq!(plan.display_indent_schema().to_string(), expected);
4870    }
4871
4872    #[tokio::test]
4873    async fn test_topk_expr() {
4874        let mut eval_stmt = EvalStmt {
4875            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4876            start: UNIX_EPOCH,
4877            end: UNIX_EPOCH
4878                .checked_add(Duration::from_secs(100_000))
4879                .unwrap(),
4880            interval: Duration::from_secs(5),
4881            lookback_delta: Duration::from_secs(1),
4882        };
4883        let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4884
4885        let prom_expr = parser::parse(case).unwrap();
4886        eval_stmt.expr = prom_expr;
4887        let table_provider = build_test_table_provider_with_fields(
4888            &[
4889                (
4890                    DEFAULT_SCHEMA_NAME.to_string(),
4891                    "prometheus_tsdb_head_series".to_string(),
4892                ),
4893                (
4894                    DEFAULT_SCHEMA_NAME.to_string(),
4895                    "http_server_requests_seconds_count".to_string(),
4896                ),
4897            ],
4898            &["ip"],
4899        )
4900        .await;
4901
4902        let plan =
4903            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4904                .await
4905                .unwrap();
4906        let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4907        \n  Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4908        \n    Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4909        \n      WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4910        \n        Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4911        \n          Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4912        \n            PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4913        \n              PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4914        \n                Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4915        \n                  Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4916        \n                    TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4917
4918        assert_eq!(plan.display_indent_schema().to_string(), expected);
4919    }
4920
4921    #[tokio::test]
4922    async fn test_count_values_expr() {
4923        let mut eval_stmt = EvalStmt {
4924            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4925            start: UNIX_EPOCH,
4926            end: UNIX_EPOCH
4927                .checked_add(Duration::from_secs(100_000))
4928                .unwrap(),
4929            interval: Duration::from_secs(5),
4930            lookback_delta: Duration::from_secs(1),
4931        };
4932        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4933
4934        let prom_expr = parser::parse(case).unwrap();
4935        eval_stmt.expr = prom_expr;
4936        let table_provider = build_test_table_provider_with_fields(
4937            &[
4938                (
4939                    DEFAULT_SCHEMA_NAME.to_string(),
4940                    "prometheus_tsdb_head_series".to_string(),
4941                ),
4942                (
4943                    DEFAULT_SCHEMA_NAME.to_string(),
4944                    "http_server_requests_seconds_count".to_string(),
4945                ),
4946            ],
4947            &["ip"],
4948        )
4949        .await;
4950
4951        let plan =
4952            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4953                .await
4954                .unwrap();
4955        let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4956        \n  Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4957        \n    Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4958        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4959        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4960        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4961        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4962        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4963        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4964
4965        assert_eq!(plan.display_indent_schema().to_string(), expected);
4966    }
4967
4968    #[tokio::test]
4969    async fn test_value_alias() {
4970        let mut eval_stmt = EvalStmt {
4971            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4972            start: UNIX_EPOCH,
4973            end: UNIX_EPOCH
4974                .checked_add(Duration::from_secs(100_000))
4975                .unwrap(),
4976            interval: Duration::from_secs(5),
4977            lookback_delta: Duration::from_secs(1),
4978        };
4979        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4980
4981        let prom_expr = parser::parse(case).unwrap();
4982        eval_stmt.expr = prom_expr;
4983        let table_provider = build_test_table_provider_with_fields(
4984            &[
4985                (
4986                    DEFAULT_SCHEMA_NAME.to_string(),
4987                    "prometheus_tsdb_head_series".to_string(),
4988                ),
4989                (
4990                    DEFAULT_SCHEMA_NAME.to_string(),
4991                    "http_server_requests_seconds_count".to_string(),
4992                ),
4993            ],
4994            &["ip"],
4995        )
4996        .await;
4997
4998        let alias = Some("my_series".to_string());
4999        let plan = PromPlanner::stmt_to_plan_with_alias(
5000            table_provider,
5001            &eval_stmt,
5002            alias,
5003            &build_query_engine_state(),
5004        )
5005        .await
5006        .unwrap();
5007        let expected = r#"
5008Projection: count(prometheus_tsdb_head_series.greptime_value) AS my_series, prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [my_series:Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]
5009  Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]
5010    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
5011      Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
5012        Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
5013          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5014            PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5015              Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5016                Filter: prometheus_tsdb_head_series.ip ~ Utf8("^(?:(10.0.160.237:8080|10.0.160.237:9090))$") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5017                  TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
5018        assert_eq!(format!("\n{}", plan.display_indent_schema()), expected);
5019    }
5020
5021    #[tokio::test]
5022    async fn test_quantile_expr() {
5023        let mut eval_stmt = EvalStmt {
5024            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5025            start: UNIX_EPOCH,
5026            end: UNIX_EPOCH
5027                .checked_add(Duration::from_secs(100_000))
5028                .unwrap(),
5029            interval: Duration::from_secs(5),
5030            lookback_delta: Duration::from_secs(1),
5031        };
5032        let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
5033
5034        let prom_expr = parser::parse(case).unwrap();
5035        eval_stmt.expr = prom_expr;
5036        let table_provider = build_test_table_provider_with_fields(
5037            &[
5038                (
5039                    DEFAULT_SCHEMA_NAME.to_string(),
5040                    "prometheus_tsdb_head_series".to_string(),
5041                ),
5042                (
5043                    DEFAULT_SCHEMA_NAME.to_string(),
5044                    "http_server_requests_seconds_count".to_string(),
5045                ),
5046            ],
5047            &["ip"],
5048        )
5049        .await;
5050
5051        let plan =
5052            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5053                .await
5054                .unwrap();
5055        let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
5056        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
5057        \n    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
5058        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
5059        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5060        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5061        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5062        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5063        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
5064
5065        assert_eq!(plan.display_indent_schema().to_string(), expected);
5066    }
5067
5068    #[tokio::test]
5069    async fn test_or_not_exists_table_label() {
5070        let mut eval_stmt = EvalStmt {
5071            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5072            start: UNIX_EPOCH,
5073            end: UNIX_EPOCH
5074                .checked_add(Duration::from_secs(100_000))
5075                .unwrap(),
5076            interval: Duration::from_secs(5),
5077            lookback_delta: Duration::from_secs(1),
5078        };
5079        let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
5080
5081        let prom_expr = parser::parse(case).unwrap();
5082        eval_stmt.expr = prom_expr;
5083        let table_provider = build_test_table_provider_with_fields(
5084            &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
5085            &["job"],
5086        )
5087        .await;
5088
5089        let plan =
5090            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5091                .await
5092                .unwrap();
5093        let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
5094  SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
5095    Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
5096      Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
5097        Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
5098          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5099            PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5100              Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5101                Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5102                  TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5103  SubqueryAlias:  [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
5104    Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
5105      Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
5106        Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
5107          EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
5108            TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
5109
5110        assert_eq!(plan.display_indent_schema().to_string(), expected);
5111    }
5112
5113    #[tokio::test]
5114    async fn test_histogram_quantile_missing_le_column() {
5115        let mut eval_stmt = EvalStmt {
5116            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5117            start: UNIX_EPOCH,
5118            end: UNIX_EPOCH
5119                .checked_add(Duration::from_secs(100_000))
5120                .unwrap(),
5121            interval: Duration::from_secs(5),
5122            lookback_delta: Duration::from_secs(1),
5123        };
5124
5125        // Test case: histogram_quantile with a table that doesn't have 'le' column
5126        let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
5127
5128        let prom_expr = parser::parse(case).unwrap();
5129        eval_stmt.expr = prom_expr;
5130
5131        // Create a table provider with a table that doesn't have 'le' column
5132        let table_provider = build_test_table_provider_with_fields(
5133            &[(
5134                DEFAULT_SCHEMA_NAME.to_string(),
5135                "non_existent_histogram_bucket".to_string(),
5136            )],
5137            &["pod", "instance"], // Note: no 'le' column
5138        )
5139        .await;
5140
5141        // Should return empty result instead of error
5142        let result =
5143            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5144                .await;
5145
5146        // This should succeed now (returning empty result) instead of failing with "Cannot find column le"
5147        assert!(
5148            result.is_ok(),
5149            "Expected successful plan creation with empty result, but got error: {:?}",
5150            result.err()
5151        );
5152
5153        // Verify that the result is an EmptyRelation
5154        let plan = result.unwrap();
5155        match plan {
5156            LogicalPlan::EmptyRelation(_) => {
5157                // This is what we expect
5158            }
5159            _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
5160        }
5161    }
5162}