Skip to main content

query/promql/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::greptime_value;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
32use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
33use datafusion::functions_aggregate::sum::sum_udaf;
34use datafusion::functions_aggregate::variance::var_pop_udaf;
35use datafusion::functions_window::row_number::RowNumber;
36use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
37use datafusion::logical_expr::expr_rewriter::normalize_cols;
38use datafusion::logical_expr::{
39    BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
40    ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
41};
42use datafusion::prelude as df_prelude;
43use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
44use datafusion::scalar::ScalarValue;
45use datafusion::sql::TableReference;
46use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{
51    ExprSchemable, Literal, Projection, SortExpr, TableScan, TableSource, col, lit,
52};
53use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
54use datatypes::data_type::ConcreteDataType;
55use itertools::Itertools;
56use once_cell::sync::Lazy;
57use promql::extension_plan::{
58    Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
59    ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
60};
61use promql::functions::{
62    AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, DoubleExponentialSmoothing,
63    IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
64    QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
65    quantile_udaf,
66};
67use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
68use promql_parser::parser::token::TokenType;
69use promql_parser::parser::value::ValueType;
70use promql_parser::parser::{
71    AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
72    Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
73    Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
74    VectorSelector, token,
75};
76use regex::{self, Regex};
77use snafu::{OptionExt, ResultExt, ensure};
78use store_api::metric_engine_consts::{
79    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
80    METRIC_ENGINE_NAME, is_metric_engine_internal_column,
81};
82use table::table::adapter::DfTableProviderAdapter;
83
84use crate::parser::{
85    ALIAS_NODE_NAME, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, AliasExpr, EXPLAIN_NODE_NAME,
86    EXPLAIN_VERBOSE_NODE_NAME,
87};
88use crate::promql::error::{
89    CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
90    ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
91    InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
92    MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
93    Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
94    UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
95    UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
96    ZeroRangeSelectorSnafu,
97};
98use crate::query_engine::QueryEngineState;
99
100/// `time()` function in PromQL.
101const SPECIAL_TIME_FUNCTION: &str = "time";
102/// `scalar()` function in PromQL.
103const SCALAR_FUNCTION: &str = "scalar";
104/// `absent()` function in PromQL
105const SPECIAL_ABSENT_FUNCTION: &str = "absent";
106/// `histogram_quantile` function in PromQL
107const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
108/// `vector` function in PromQL
109const SPECIAL_VECTOR_FUNCTION: &str = "vector";
110/// `le` column for conventional histogram.
111const LE_COLUMN_NAME: &str = "le";
112
113/// Static regex for validating label names according to Prometheus specification.
114/// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
115static LABEL_NAME_REGEX: Lazy<Regex> =
116    Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
117
118const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
119
120/// default value column name for empty metric
121const DEFAULT_FIELD_COLUMN: &str = "value";
122
123/// Special modifier to project field columns under multi-field mode
124const FIELD_COLUMN_MATCHER: &str = "__field__";
125
126/// Special modifier for cross schema query
127const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
128const DB_COLUMN_MATCHER: &str = "__database__";
129
130/// Threshold for scatter scan mode
131const MAX_SCATTER_POINTS: i64 = 400;
132
133/// Interval 1 hour in millisecond
134const INTERVAL_1H: i64 = 60 * 60 * 1000;
135
136#[derive(Default, Debug, Clone)]
137struct PromPlannerContext {
138    // query parameters
139    start: Millisecond,
140    end: Millisecond,
141    interval: Millisecond,
142    lookback_delta: Millisecond,
143
144    // planner states
145    table_name: Option<String>,
146    time_index_column: Option<String>,
147    field_columns: Vec<String>,
148    tag_columns: Vec<String>,
149    /// Use metric engine internal series identifier column (`__tsid`) as series key.
150    ///
151    /// This is enabled only when the underlying scan can provide `__tsid` (`UInt64`). The planner
152    /// uses it internally (e.g. as the series key for [`SeriesDivide`]) and strips it from the
153    /// final output.
154    use_tsid: bool,
155    /// The matcher for field columns `__field__`.
156    field_column_matcher: Option<Vec<Matcher>>,
157    /// The matcher for selectors (normal matchers).
158    selector_matcher: Vec<Matcher>,
159    schema_name: Option<String>,
160    /// The range in millisecond of range selector. None if there is no range selector.
161    range: Option<Millisecond>,
162}
163
164impl PromPlannerContext {
165    fn from_eval_stmt(stmt: &EvalStmt) -> Self {
166        Self {
167            start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
168            end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
169            interval: stmt.interval.as_millis() as _,
170            lookback_delta: stmt.lookback_delta.as_millis() as _,
171            ..Default::default()
172        }
173    }
174
175    /// Reset all planner states
176    fn reset(&mut self) {
177        self.table_name = None;
178        self.time_index_column = None;
179        self.field_columns = vec![];
180        self.tag_columns = vec![];
181        self.use_tsid = false;
182        self.field_column_matcher = None;
183        self.selector_matcher.clear();
184        self.schema_name = None;
185        self.range = None;
186    }
187
188    /// Reset table name and schema to empty
189    fn reset_table_name_and_schema(&mut self) {
190        self.table_name = Some(String::new());
191        self.schema_name = None;
192        self.use_tsid = false;
193    }
194
195    /// Check if `le` is present in tag columns
196    fn has_le_tag(&self) -> bool {
197        self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
198    }
199}
200
201pub struct PromPlanner {
202    table_provider: DfTableSourceProvider,
203    ctx: PromPlannerContext,
204}
205
206impl PromPlanner {
207    pub async fn stmt_to_plan(
208        table_provider: DfTableSourceProvider,
209        stmt: &EvalStmt,
210        query_engine_state: &QueryEngineState,
211    ) -> Result<LogicalPlan> {
212        let mut planner = Self {
213            table_provider,
214            ctx: PromPlannerContext::from_eval_stmt(stmt),
215        };
216
217        let plan = planner
218            .prom_expr_to_plan(&stmt.expr, query_engine_state)
219            .await?;
220
221        // Never leak internal series identifier to output.
222        planner.strip_tsid_column(plan)
223    }
224
225    pub async fn prom_expr_to_plan(
226        &mut self,
227        prom_expr: &PromExpr,
228        query_engine_state: &QueryEngineState,
229    ) -> Result<LogicalPlan> {
230        self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
231            .await
232    }
233
234    /**
235    Converts a PromQL expression to a logical plan.
236
237    NOTE:
238        The `timestamp_fn` indicates whether the PromQL `timestamp()` function is being evaluated in the current context.
239        If `true`, the planner generates a logical plan that projects the timestamp (time index) column
240        as the value column for each input row, implementing the PromQL `timestamp()` function semantics.
241        If `false`, the planner generates the standard logical plan for the given PromQL expression.
242    */
243    #[async_recursion]
244    async fn prom_expr_to_plan_inner(
245        &mut self,
246        prom_expr: &PromExpr,
247        timestamp_fn: bool,
248        query_engine_state: &QueryEngineState,
249    ) -> Result<LogicalPlan> {
250        let res = match prom_expr {
251            PromExpr::Aggregate(expr) => {
252                self.prom_aggr_expr_to_plan(query_engine_state, expr)
253                    .await?
254            }
255            PromExpr::Unary(expr) => {
256                self.prom_unary_expr_to_plan(query_engine_state, expr)
257                    .await?
258            }
259            PromExpr::Binary(expr) => {
260                self.prom_binary_expr_to_plan(query_engine_state, expr)
261                    .await?
262            }
263            PromExpr::Paren(ParenExpr { expr }) => {
264                self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
265                    .await?
266            }
267            PromExpr::Subquery(expr) => {
268                self.prom_subquery_expr_to_plan(query_engine_state, expr)
269                    .await?
270            }
271            PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
272            PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
273            PromExpr::VectorSelector(selector) => {
274                self.prom_vector_selector_to_plan(selector, timestamp_fn)
275                    .await?
276            }
277            PromExpr::MatrixSelector(selector) => {
278                self.prom_matrix_selector_to_plan(selector).await?
279            }
280            PromExpr::Call(expr) => {
281                self.prom_call_expr_to_plan(query_engine_state, expr)
282                    .await?
283            }
284            PromExpr::Extension(expr) => {
285                self.prom_ext_expr_to_plan(query_engine_state, expr).await?
286            }
287        };
288
289        Ok(res)
290    }
291
292    async fn prom_subquery_expr_to_plan(
293        &mut self,
294        query_engine_state: &QueryEngineState,
295        subquery_expr: &SubqueryExpr,
296    ) -> Result<LogicalPlan> {
297        let SubqueryExpr {
298            expr, range, step, ..
299        } = subquery_expr;
300
301        let current_interval = self.ctx.interval;
302        if let Some(step) = step {
303            self.ctx.interval = step.as_millis() as _;
304        }
305        let current_start = self.ctx.start;
306        self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
307        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
308        self.ctx.interval = current_interval;
309        self.ctx.start = current_start;
310
311        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
312        let range_ms = range.as_millis() as _;
313        self.ctx.range = Some(range_ms);
314
315        let manipulate = RangeManipulate::new(
316            self.ctx.start,
317            self.ctx.end,
318            self.ctx.interval,
319            range_ms,
320            self.ctx
321                .time_index_column
322                .clone()
323                .expect("time index should be set in `setup_context`"),
324            self.ctx.field_columns.clone(),
325            input,
326        )
327        .context(DataFusionPlanningSnafu)?;
328
329        Ok(LogicalPlan::Extension(Extension {
330            node: Arc::new(manipulate),
331        }))
332    }
333
334    async fn prom_aggr_expr_to_plan(
335        &mut self,
336        query_engine_state: &QueryEngineState,
337        aggr_expr: &AggregateExpr,
338    ) -> Result<LogicalPlan> {
339        let AggregateExpr {
340            op,
341            expr,
342            modifier,
343            param,
344        } = aggr_expr;
345
346        let mut input = self.prom_expr_to_plan(expr, query_engine_state).await?;
347        let input_has_tsid = input.schema().fields().iter().any(|field| {
348            field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
349                && field.data_type() == &ArrowDataType::UInt64
350        });
351
352        // `__tsid` based scan projection may prune tag columns. Ensure tags referenced in
353        // aggregation modifiers (`by`/`without`) are available before planning group keys.
354        let required_group_tags = match modifier {
355            None => BTreeSet::new(),
356            Some(LabelModifier::Include(labels)) => labels
357                .labels
358                .iter()
359                .filter(|label| !is_metric_engine_internal_column(label.as_str()))
360                .cloned()
361                .collect(),
362            Some(LabelModifier::Exclude(labels)) => {
363                let mut all_tags = self.collect_row_key_tag_columns_from_plan(&input)?;
364                for label in &labels.labels {
365                    let _ = all_tags.remove(label);
366                }
367                all_tags
368            }
369        };
370
371        if !required_group_tags.is_empty()
372            && required_group_tags
373                .iter()
374                .any(|tag| Self::find_case_sensitive_column(input.schema(), tag.as_str()).is_none())
375        {
376            input = self.ensure_tag_columns_available(input, &required_group_tags)?;
377            self.refresh_tag_columns_from_schema(input.schema());
378        }
379
380        match (*op).id() {
381            token::T_TOPK | token::T_BOTTOMK => {
382                self.prom_topk_bottomk_to_plan(aggr_expr, input).await
383            }
384            _ => {
385                // When `__tsid` is available, tag columns may have been pruned from the input plan.
386                // For `keep_tsid` decision we should compare against the full row-key label set,
387                // otherwise we may incorrectly treat label-reducing aggregates as preserving labels.
388                let input_tag_columns = if input_has_tsid {
389                    self.collect_row_key_tag_columns_from_plan(&input)?
390                        .into_iter()
391                        .collect::<Vec<_>>()
392                } else {
393                    self.ctx.tag_columns.clone()
394                };
395                // calculate columns to group by
396                // Need to append time index column into group by columns
397                let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
398                // convert op and value columns to aggregate exprs
399                let (mut aggr_exprs, prev_field_exprs) =
400                    self.create_aggregate_exprs(*op, param, &input)?;
401
402                let keep_tsid = op.id() != token::T_COUNT_VALUES
403                    && input_has_tsid
404                    && input_tag_columns.iter().collect::<HashSet<_>>()
405                        == self.ctx.tag_columns.iter().collect::<HashSet<_>>();
406
407                if keep_tsid {
408                    aggr_exprs.push(
409                        first_value(
410                            DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)),
411                            vec![],
412                        )
413                        .alias(DATA_SCHEMA_TSID_COLUMN_NAME),
414                    );
415                }
416                self.ctx.use_tsid = keep_tsid;
417
418                // create plan
419                let builder = LogicalPlanBuilder::from(input);
420                let builder = if op.id() == token::T_COUNT_VALUES {
421                    let label = Self::get_param_value_as_str(*op, param)?;
422                    // `count_values` must be grouped by fields,
423                    // and project the fields to the new label.
424                    group_exprs.extend(prev_field_exprs.clone());
425                    let project_fields = self
426                        .create_field_column_exprs()?
427                        .into_iter()
428                        .chain(self.create_tag_column_exprs()?)
429                        .chain(Some(self.create_time_index_column_expr()?))
430                        .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
431
432                    builder
433                        .aggregate(group_exprs.clone(), aggr_exprs)
434                        .context(DataFusionPlanningSnafu)?
435                        .project(project_fields)
436                        .context(DataFusionPlanningSnafu)?
437                } else {
438                    builder
439                        .aggregate(group_exprs.clone(), aggr_exprs)
440                        .context(DataFusionPlanningSnafu)?
441                };
442
443                let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
444
445                builder
446                    .sort(sort_expr)
447                    .context(DataFusionPlanningSnafu)?
448                    .build()
449                    .context(DataFusionPlanningSnafu)
450            }
451        }
452    }
453
454    /// Create logical plan for PromQL topk and bottomk expr.
455    async fn prom_topk_bottomk_to_plan(
456        &mut self,
457        aggr_expr: &AggregateExpr,
458        input: LogicalPlan,
459    ) -> Result<LogicalPlan> {
460        let AggregateExpr {
461            op,
462            param,
463            modifier,
464            ..
465        } = aggr_expr;
466
467        let input_has_tsid = input.schema().fields().iter().any(|field| {
468            field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
469                && field.data_type() == &ArrowDataType::UInt64
470        });
471        self.ctx.use_tsid = input_has_tsid;
472
473        let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
474
475        let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
476
477        // convert op and value columns to window exprs.
478        let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
479
480        let rank_columns: Vec<_> = window_exprs
481            .iter()
482            .map(|expr| expr.schema_name().to_string())
483            .collect();
484
485        // Create ranks filter with `Operator::Or`.
486        // Safety: at least one rank column
487        let filter: DfExpr = rank_columns
488            .iter()
489            .fold(None, |expr, rank| {
490                let predicate = DfExpr::BinaryExpr(BinaryExpr {
491                    left: Box::new(col(rank)),
492                    op: Operator::LtEq,
493                    right: Box::new(val.clone()),
494                });
495
496                match expr {
497                    None => Some(predicate),
498                    Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
499                        left: Box::new(expr),
500                        op: Operator::Or,
501                        right: Box::new(predicate),
502                    })),
503                }
504            })
505            .unwrap();
506
507        let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
508
509        let mut new_group_exprs = group_exprs.clone();
510        // Order by ranks
511        new_group_exprs.extend(rank_columns);
512
513        let group_sort_expr = new_group_exprs
514            .into_iter()
515            .map(|expr| expr.sort(true, false));
516
517        let project_fields = self
518            .create_field_column_exprs()?
519            .into_iter()
520            .chain(self.create_tag_column_exprs()?)
521            .chain(
522                self.ctx
523                    .use_tsid
524                    .then_some(DfExpr::Column(Column::from_name(
525                        DATA_SCHEMA_TSID_COLUMN_NAME,
526                    ))),
527            )
528            .chain(Some(self.create_time_index_column_expr()?));
529
530        LogicalPlanBuilder::from(input)
531            .window(window_exprs)
532            .context(DataFusionPlanningSnafu)?
533            .filter(filter)
534            .context(DataFusionPlanningSnafu)?
535            .sort(group_sort_expr)
536            .context(DataFusionPlanningSnafu)?
537            .project(project_fields)
538            .context(DataFusionPlanningSnafu)?
539            .build()
540            .context(DataFusionPlanningSnafu)
541    }
542
543    async fn prom_unary_expr_to_plan(
544        &mut self,
545        query_engine_state: &QueryEngineState,
546        unary_expr: &UnaryExpr,
547    ) -> Result<LogicalPlan> {
548        let UnaryExpr { expr } = unary_expr;
549        // Unary Expr in PromQL implys the `-` operator
550        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
551        self.projection_for_each_field_column(input, |col| {
552            Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
553        })
554    }
555
556    async fn prom_binary_expr_to_plan(
557        &mut self,
558        query_engine_state: &QueryEngineState,
559        binary_expr: &PromBinaryExpr,
560    ) -> Result<LogicalPlan> {
561        let PromBinaryExpr {
562            lhs,
563            rhs,
564            op,
565            modifier,
566        } = binary_expr;
567
568        // if set to true, comparison operator will return 0/1 (for true/false) instead of
569        // filter on the result column
570        let should_return_bool = if let Some(m) = modifier {
571            m.return_bool
572        } else {
573            false
574        };
575        let is_comparison_op = Self::is_token_a_comparison_op(*op);
576
577        // we should build a filter plan here if the op is comparison op and need not
578        // to return 0/1. Otherwise, we should build a projection plan
579        match (
580            Self::try_build_literal_expr(lhs),
581            Self::try_build_literal_expr(rhs),
582        ) {
583            (Some(lhs), Some(rhs)) => {
584                self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
585                self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
586                self.ctx.reset_table_name_and_schema();
587                let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
588                let mut field_expr = field_expr_builder(lhs, rhs)?;
589
590                if is_comparison_op && should_return_bool {
591                    field_expr = DfExpr::Cast(Cast {
592                        expr: Box::new(field_expr),
593                        data_type: ArrowDataType::Float64,
594                    });
595                }
596
597                Ok(LogicalPlan::Extension(Extension {
598                    node: Arc::new(
599                        EmptyMetric::new(
600                            self.ctx.start,
601                            self.ctx.end,
602                            self.ctx.interval,
603                            SPECIAL_TIME_FUNCTION.to_string(),
604                            DEFAULT_FIELD_COLUMN.to_string(),
605                            Some(field_expr),
606                        )
607                        .context(DataFusionPlanningSnafu)?,
608                    ),
609                }))
610            }
611            // lhs is a literal, rhs is a column
612            (Some(mut expr), None) => {
613                let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
614                // check if the literal is a special time expr
615                if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
616                    expr = time_expr
617                }
618                let bin_expr_builder = |col: &String| {
619                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
620                    let mut binary_expr =
621                        binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
622
623                    if is_comparison_op && should_return_bool {
624                        binary_expr = DfExpr::Cast(Cast {
625                            expr: Box::new(binary_expr),
626                            data_type: ArrowDataType::Float64,
627                        });
628                    }
629                    Ok(binary_expr)
630                };
631                if is_comparison_op && !should_return_bool {
632                    self.filter_on_field_column(input, bin_expr_builder)
633                } else {
634                    self.projection_for_each_field_column(input, bin_expr_builder)
635                }
636            }
637            // lhs is a column, rhs is a literal
638            (None, Some(mut expr)) => {
639                let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
640                // check if the literal is a special time expr
641                if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
642                    expr = time_expr
643                }
644                let bin_expr_builder = |col: &String| {
645                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
646                    let mut binary_expr =
647                        binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
648
649                    if is_comparison_op && should_return_bool {
650                        binary_expr = DfExpr::Cast(Cast {
651                            expr: Box::new(binary_expr),
652                            data_type: ArrowDataType::Float64,
653                        });
654                    }
655                    Ok(binary_expr)
656                };
657                if is_comparison_op && !should_return_bool {
658                    self.filter_on_field_column(input, bin_expr_builder)
659                } else {
660                    self.projection_for_each_field_column(input, bin_expr_builder)
661                }
662            }
663            // both are columns. join them on time index
664            (None, None) => {
665                let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
666                let left_field_columns = self.ctx.field_columns.clone();
667                let left_time_index_column = self.ctx.time_index_column.clone();
668                let mut left_table_ref = self
669                    .table_ref()
670                    .unwrap_or_else(|_| TableReference::bare(""));
671                let left_context = self.ctx.clone();
672
673                let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
674                let right_field_columns = self.ctx.field_columns.clone();
675                let right_time_index_column = self.ctx.time_index_column.clone();
676                let mut right_table_ref = self
677                    .table_ref()
678                    .unwrap_or_else(|_| TableReference::bare(""));
679                let right_context = self.ctx.clone();
680
681                // TODO(ruihang): avoid join if left and right are the same table
682
683                // set op has "special" join semantics
684                if Self::is_token_a_set_op(*op) {
685                    return self.set_op_on_non_field_columns(
686                        left_input,
687                        right_input,
688                        left_context,
689                        right_context,
690                        *op,
691                        modifier,
692                    );
693                }
694
695                // normal join
696                if left_table_ref == right_table_ref {
697                    // rename table references to avoid ambiguity
698                    left_table_ref = TableReference::bare("lhs");
699                    right_table_ref = TableReference::bare("rhs");
700                    // `self.ctx` have ctx in right plan, if right plan have no tag,
701                    // we use left plan ctx as the ctx for subsequent calculations,
702                    // to avoid case like `host + scalar(...)`
703                    // we need preserve tag column on `host` table in subsequent projection,
704                    // which only show in left plan ctx.
705                    if self.ctx.tag_columns.is_empty() {
706                        self.ctx = left_context.clone();
707                        self.ctx.table_name = Some("lhs".to_string());
708                    } else {
709                        self.ctx.table_name = Some("rhs".to_string());
710                    }
711                }
712                let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
713
714                let join_plan = self.join_on_non_field_columns(
715                    left_input,
716                    right_input,
717                    left_table_ref.clone(),
718                    right_table_ref.clone(),
719                    left_time_index_column,
720                    right_time_index_column,
721                    // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
722                    // under this case we only join on time index
723                    left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
724                    modifier,
725                )?;
726                let join_plan_schema = join_plan.schema().clone();
727
728                let bin_expr_builder = |_: &String| {
729                    let (left_col_name, right_col_name) = field_columns.next().unwrap();
730                    let left_col = join_plan_schema
731                        .qualified_field_with_name(Some(&left_table_ref), left_col_name)
732                        .context(DataFusionPlanningSnafu)?
733                        .into();
734                    let right_col = join_plan_schema
735                        .qualified_field_with_name(Some(&right_table_ref), right_col_name)
736                        .context(DataFusionPlanningSnafu)?
737                        .into();
738
739                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
740                    let mut binary_expr =
741                        binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
742                    if is_comparison_op && should_return_bool {
743                        binary_expr = DfExpr::Cast(Cast {
744                            expr: Box::new(binary_expr),
745                            data_type: ArrowDataType::Float64,
746                        });
747                    }
748                    Ok(binary_expr)
749                };
750                if is_comparison_op && !should_return_bool {
751                    // PromQL comparison operators without `bool` are filters:
752                    //   - keep the instant-vector side sample values
753                    //   - drop samples where the comparison is false
754                    //
755                    // So we filter on the join result and then project only the side that should
756                    // be preserved according to PromQL semantics.
757                    let filtered = self.filter_on_field_column(join_plan, bin_expr_builder)?;
758                    let (project_table_ref, project_context) =
759                        match (lhs.value_type(), rhs.value_type()) {
760                            (ValueType::Scalar, ValueType::Vector) => {
761                                (&right_table_ref, &right_context)
762                            }
763                            _ => (&left_table_ref, &left_context),
764                        };
765                    self.project_binary_join_side(filtered, project_table_ref, project_context)
766                } else {
767                    self.projection_for_each_field_column(join_plan, bin_expr_builder)
768                }
769            }
770        }
771    }
772
773    fn project_binary_join_side(
774        &mut self,
775        input: LogicalPlan,
776        table_ref: &TableReference,
777        context: &PromPlannerContext,
778    ) -> Result<LogicalPlan> {
779        let schema = input.schema();
780
781        let mut project_exprs =
782            Vec::with_capacity(context.tag_columns.len() + context.field_columns.len() + 2);
783
784        // Project time index from the chosen side.
785        if let Some(time_index_column) = &context.time_index_column {
786            let time_index_col = schema
787                .qualified_field_with_name(Some(table_ref), time_index_column)
788                .context(DataFusionPlanningSnafu)?
789                .into();
790            project_exprs.push(DfExpr::Column(time_index_col));
791        }
792
793        // Project field columns from the chosen side.
794        for field_column in &context.field_columns {
795            let field_col = schema
796                .qualified_field_with_name(Some(table_ref), field_column)
797                .context(DataFusionPlanningSnafu)?
798                .into();
799            project_exprs.push(DfExpr::Column(field_col));
800        }
801
802        // Project tag columns from the chosen side.
803        for tag_column in &context.tag_columns {
804            let tag_col = schema
805                .qualified_field_with_name(Some(table_ref), tag_column)
806                .context(DataFusionPlanningSnafu)?
807                .into();
808            project_exprs.push(DfExpr::Column(tag_col));
809        }
810
811        // Preserve `__tsid` if present, so it can still be used internally downstream. It's
812        // stripped from the final output anyway.
813        if context.use_tsid
814            && let Ok(tsid_col) =
815                schema.qualified_field_with_name(Some(table_ref), DATA_SCHEMA_TSID_COLUMN_NAME)
816        {
817            project_exprs.push(DfExpr::Column(tsid_col.into()));
818        }
819
820        let plan = LogicalPlanBuilder::from(input)
821            .project(project_exprs)
822            .context(DataFusionPlanningSnafu)?
823            .build()
824            .context(DataFusionPlanningSnafu)?;
825
826        // Update context to reflect the projected schema. Don't keep a table qualifier since
827        // the result is a derived expression.
828        self.ctx = context.clone();
829        self.ctx.table_name = None;
830        self.ctx.schema_name = None;
831
832        Ok(plan)
833    }
834
835    fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
836        let NumberLiteral { val } = number_literal;
837        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
838        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
839        self.ctx.reset_table_name_and_schema();
840        let literal_expr = df_prelude::lit(*val);
841
842        let plan = LogicalPlan::Extension(Extension {
843            node: Arc::new(
844                EmptyMetric::new(
845                    self.ctx.start,
846                    self.ctx.end,
847                    self.ctx.interval,
848                    SPECIAL_TIME_FUNCTION.to_string(),
849                    DEFAULT_FIELD_COLUMN.to_string(),
850                    Some(literal_expr),
851                )
852                .context(DataFusionPlanningSnafu)?,
853            ),
854        });
855        Ok(plan)
856    }
857
858    fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
859        let StringLiteral { val } = string_literal;
860        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
861        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
862        self.ctx.reset_table_name_and_schema();
863        let literal_expr = df_prelude::lit(val.clone());
864
865        let plan = LogicalPlan::Extension(Extension {
866            node: Arc::new(
867                EmptyMetric::new(
868                    self.ctx.start,
869                    self.ctx.end,
870                    self.ctx.interval,
871                    SPECIAL_TIME_FUNCTION.to_string(),
872                    DEFAULT_FIELD_COLUMN.to_string(),
873                    Some(literal_expr),
874                )
875                .context(DataFusionPlanningSnafu)?,
876            ),
877        });
878        Ok(plan)
879    }
880
881    async fn prom_vector_selector_to_plan(
882        &mut self,
883        vector_selector: &VectorSelector,
884        timestamp_fn: bool,
885    ) -> Result<LogicalPlan> {
886        let VectorSelector {
887            name,
888            offset,
889            matchers,
890            at: _,
891        } = vector_selector;
892        let matchers = self.preprocess_label_matchers(matchers, name)?;
893        if let Some(empty_plan) = self.setup_context().await? {
894            return Ok(empty_plan);
895        }
896        let normalize = self
897            .selector_to_series_normalize_plan(offset, matchers, false)
898            .await?;
899
900        let normalize = if timestamp_fn {
901            // If evaluating the PromQL `timestamp()` function, project the time index column as the value column
902            // before wrapping with [`InstantManipulate`], so the output matches PromQL's `timestamp()` semantics.
903            self.create_timestamp_func_plan(normalize)?
904        } else {
905            normalize
906        };
907
908        let manipulate = InstantManipulate::new(
909            self.ctx.start,
910            self.ctx.end,
911            self.ctx.lookback_delta,
912            self.ctx.interval,
913            self.ctx
914                .time_index_column
915                .clone()
916                .expect("time index should be set in `setup_context`"),
917            self.ctx.field_columns.first().cloned(),
918            normalize,
919        );
920        Ok(LogicalPlan::Extension(Extension {
921            node: Arc::new(manipulate),
922        }))
923    }
924
925    /// Builds a projection plan for the PromQL `timestamp()` function.
926    /// Projects the time index column as the value column for each row.
927    ///
928    /// # Arguments
929    /// * `normalize` - Input [`LogicalPlan`] for the normalized series.
930    ///
931    /// # Returns
932    /// Returns a [`Result<LogicalPlan>`] where the resulting logical plan projects the timestamp
933    /// column as the value column, along with the original tag and time index columns.
934    ///
935    /// # Timestamp vs. Time Function
936    ///
937    /// - **Timestamp Function (`timestamp()`)**: In PromQL, the `timestamp()` function returns the
938    ///   timestamp (time index) of each sample as the value column.
939    ///
940    /// - **Time Function (`time()`)**: The `time()` function returns the evaluation time of the query
941    ///   as a scalar value.
942    ///
943    /// # Side Effects
944    /// Updates the planner context's field columns to the timestamp column name.
945    ///
946    fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
947        let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
948            .alias(DEFAULT_FIELD_COLUMN);
949        self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
950        let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
951        project_exprs.push(self.create_time_index_column_expr()?);
952        project_exprs.push(time_expr);
953        project_exprs.extend(self.create_tag_column_exprs()?);
954
955        LogicalPlanBuilder::from(normalize)
956            .project(project_exprs)
957            .context(DataFusionPlanningSnafu)?
958            .build()
959            .context(DataFusionPlanningSnafu)
960    }
961
962    async fn prom_matrix_selector_to_plan(
963        &mut self,
964        matrix_selector: &MatrixSelector,
965    ) -> Result<LogicalPlan> {
966        let MatrixSelector { vs, range } = matrix_selector;
967        let VectorSelector {
968            name,
969            offset,
970            matchers,
971            ..
972        } = vs;
973        let matchers = self.preprocess_label_matchers(matchers, name)?;
974        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
975        let range_ms = range.as_millis() as _;
976        self.ctx.range = Some(range_ms);
977
978        // Some functions like rate may require special fields in the RangeManipulate plan
979        // so we can't skip RangeManipulate.
980        let normalize = match self.setup_context().await? {
981            Some(empty_plan) => empty_plan,
982            None => {
983                self.selector_to_series_normalize_plan(offset, matchers, true)
984                    .await?
985            }
986        };
987        let manipulate = RangeManipulate::new(
988            self.ctx.start,
989            self.ctx.end,
990            self.ctx.interval,
991            // TODO(ruihang): convert via Timestamp datatypes to support different time units
992            range_ms,
993            self.ctx
994                .time_index_column
995                .clone()
996                .expect("time index should be set in `setup_context`"),
997            self.ctx.field_columns.clone(),
998            normalize,
999        )
1000        .context(DataFusionPlanningSnafu)?;
1001
1002        Ok(LogicalPlan::Extension(Extension {
1003            node: Arc::new(manipulate),
1004        }))
1005    }
1006
1007    async fn prom_call_expr_to_plan(
1008        &mut self,
1009        query_engine_state: &QueryEngineState,
1010        call_expr: &Call,
1011    ) -> Result<LogicalPlan> {
1012        let Call { func, args } = call_expr;
1013        // some special functions that are not expression but a plan
1014        match func.name {
1015            SPECIAL_HISTOGRAM_QUANTILE => {
1016                return self.create_histogram_plan(args, query_engine_state).await;
1017            }
1018            SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
1019            SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
1020            SPECIAL_ABSENT_FUNCTION => {
1021                return self.create_absent_plan(args, query_engine_state).await;
1022            }
1023            _ => {}
1024        }
1025
1026        // transform function arguments
1027        let args = self.create_function_args(&args.args)?;
1028        let input = if let Some(prom_expr) = &args.input {
1029            self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
1030                .await?
1031        } else {
1032            self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1033            self.ctx.reset_table_name_and_schema();
1034            self.ctx.tag_columns = vec![];
1035            self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1036            LogicalPlan::Extension(Extension {
1037                node: Arc::new(
1038                    EmptyMetric::new(
1039                        self.ctx.start,
1040                        self.ctx.end,
1041                        self.ctx.interval,
1042                        SPECIAL_TIME_FUNCTION.to_string(),
1043                        DEFAULT_FIELD_COLUMN.to_string(),
1044                        None,
1045                    )
1046                    .context(DataFusionPlanningSnafu)?,
1047                ),
1048            })
1049        };
1050        let (mut func_exprs, new_tags) =
1051            self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
1052        func_exprs.insert(0, self.create_time_index_column_expr()?);
1053        func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
1054
1055        let builder = LogicalPlanBuilder::from(input)
1056            .project(func_exprs)
1057            .context(DataFusionPlanningSnafu)?
1058            .filter(self.create_empty_values_filter_expr()?)
1059            .context(DataFusionPlanningSnafu)?;
1060
1061        let builder = match func.name {
1062            "sort" => builder
1063                .sort(self.create_field_columns_sort_exprs(true))
1064                .context(DataFusionPlanningSnafu)?,
1065            "sort_desc" => builder
1066                .sort(self.create_field_columns_sort_exprs(false))
1067                .context(DataFusionPlanningSnafu)?,
1068            "sort_by_label" => builder
1069                .sort(Self::create_sort_exprs_by_tags(
1070                    func.name,
1071                    args.literals,
1072                    true,
1073                )?)
1074                .context(DataFusionPlanningSnafu)?,
1075            "sort_by_label_desc" => builder
1076                .sort(Self::create_sort_exprs_by_tags(
1077                    func.name,
1078                    args.literals,
1079                    false,
1080                )?)
1081                .context(DataFusionPlanningSnafu)?,
1082
1083            _ => builder,
1084        };
1085
1086        // Update context tags after building plan
1087        // We can't push them before planning, because they won't exist until projection.
1088        for tag in new_tags {
1089            self.ctx.tag_columns.push(tag);
1090        }
1091
1092        let plan = builder.build().context(DataFusionPlanningSnafu)?;
1093        common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
1094
1095        Ok(plan)
1096    }
1097
1098    async fn prom_ext_expr_to_plan(
1099        &mut self,
1100        query_engine_state: &QueryEngineState,
1101        ext_expr: &promql_parser::parser::ast::Extension,
1102    ) -> Result<LogicalPlan> {
1103        // let promql_parser::parser::ast::Extension { expr } = ext_expr;
1104        let expr = &ext_expr.expr;
1105        let children = expr.children();
1106        let plan = self
1107            .prom_expr_to_plan(&children[0], query_engine_state)
1108            .await?;
1109        // Wrapper for the explanation/analyze of the existing plan
1110        // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
1111        // if `analyze` is true, runs the actual plan and produces
1112        // information about metrics during run.
1113        // if `verbose` is true, prints out additional details when VERBOSE keyword is specified
1114        match expr.name() {
1115            ANALYZE_NODE_NAME => LogicalPlanBuilder::from(plan)
1116                .explain(false, true)
1117                .unwrap()
1118                .build()
1119                .context(DataFusionPlanningSnafu),
1120            ANALYZE_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1121                .explain(true, true)
1122                .unwrap()
1123                .build()
1124                .context(DataFusionPlanningSnafu),
1125            EXPLAIN_NODE_NAME => LogicalPlanBuilder::from(plan)
1126                .explain(false, false)
1127                .unwrap()
1128                .build()
1129                .context(DataFusionPlanningSnafu),
1130            EXPLAIN_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1131                .explain(true, false)
1132                .unwrap()
1133                .build()
1134                .context(DataFusionPlanningSnafu),
1135            ALIAS_NODE_NAME => {
1136                let alias = expr
1137                    .as_any()
1138                    .downcast_ref::<AliasExpr>()
1139                    .context(UnexpectedPlanExprSnafu {
1140                        desc: "Expected AliasExpr",
1141                    })?
1142                    .alias
1143                    .clone();
1144                self.apply_alias(plan, alias)
1145            }
1146            _ => LogicalPlanBuilder::empty(true)
1147                .build()
1148                .context(DataFusionPlanningSnafu),
1149        }
1150    }
1151
1152    /// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].
1153    /// Returns a new [Matchers] that doesn't contain metric name matcher.
1154    ///
1155    /// Each call to this function means new selector is started. Thus, the context will be reset
1156    /// at first.
1157    ///
1158    /// Name rule:
1159    /// - if `name` is some, then the matchers MUST NOT contain `__name__` matcher.
1160    /// - if `name` is none, then the matchers MAY contain NONE OR MULTIPLE `__name__` matchers.
1161    #[allow(clippy::mutable_key_type)]
1162    fn preprocess_label_matchers(
1163        &mut self,
1164        label_matchers: &Matchers,
1165        name: &Option<String>,
1166    ) -> Result<Matchers> {
1167        self.ctx.reset();
1168
1169        let metric_name;
1170        if let Some(name) = name.clone() {
1171            metric_name = Some(name);
1172            ensure!(
1173                label_matchers.find_matchers(METRIC_NAME).is_empty(),
1174                MultipleMetricMatchersSnafu
1175            );
1176        } else {
1177            let mut matches = label_matchers.find_matchers(METRIC_NAME);
1178            ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1179            ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1180            ensure!(
1181                matches[0].op == MatchOp::Equal,
1182                UnsupportedMatcherOpSnafu {
1183                    matcher_op: matches[0].op.to_string(),
1184                    matcher: METRIC_NAME
1185                }
1186            );
1187            metric_name = matches.pop().map(|m| m.value);
1188        }
1189
1190        self.ctx.table_name = metric_name;
1191
1192        let mut matchers = HashSet::new();
1193        for matcher in &label_matchers.matchers {
1194            // TODO(ruihang): support other metric match ops
1195            if matcher.name == FIELD_COLUMN_MATCHER {
1196                self.ctx
1197                    .field_column_matcher
1198                    .get_or_insert_default()
1199                    .push(matcher.clone());
1200            } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1201                ensure!(
1202                    matcher.op == MatchOp::Equal,
1203                    UnsupportedMatcherOpSnafu {
1204                        matcher: matcher.name.clone(),
1205                        matcher_op: matcher.op.to_string(),
1206                    }
1207                );
1208                self.ctx.schema_name = Some(matcher.value.clone());
1209            } else if matcher.name != METRIC_NAME {
1210                self.ctx.selector_matcher.push(matcher.clone());
1211                let _ = matchers.insert(matcher.clone());
1212            }
1213        }
1214
1215        Ok(Matchers::new(matchers.into_iter().collect()))
1216    }
1217
1218    async fn selector_to_series_normalize_plan(
1219        &mut self,
1220        offset: &Option<Offset>,
1221        label_matchers: Matchers,
1222        is_range_selector: bool,
1223    ) -> Result<LogicalPlan> {
1224        // make table scan plan
1225        let table_ref = self.table_ref()?;
1226        let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1227        let table_schema = table_scan.schema();
1228
1229        // make filter exprs
1230        let offset_duration = match offset {
1231            Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1232            Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1233            None => 0,
1234        };
1235        let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1236        if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1237            scan_filters.push(time_index_filter);
1238        }
1239        table_scan = LogicalPlanBuilder::from(table_scan)
1240            .filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
1241            .context(DataFusionPlanningSnafu)?
1242            .build()
1243            .context(DataFusionPlanningSnafu)?;
1244
1245        // make a projection plan if there is any `__field__` matcher
1246        if let Some(field_matchers) = &self.ctx.field_column_matcher {
1247            let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1248            // opt-in set
1249            let mut result_set = HashSet::new();
1250            // opt-out set
1251            let mut reverse_set = HashSet::new();
1252            for matcher in field_matchers {
1253                match &matcher.op {
1254                    MatchOp::Equal => {
1255                        if col_set.contains(&matcher.value) {
1256                            let _ = result_set.insert(matcher.value.clone());
1257                        } else {
1258                            return Err(ColumnNotFoundSnafu {
1259                                col: matcher.value.clone(),
1260                            }
1261                            .build());
1262                        }
1263                    }
1264                    MatchOp::NotEqual => {
1265                        if col_set.contains(&matcher.value) {
1266                            let _ = reverse_set.insert(matcher.value.clone());
1267                        } else {
1268                            return Err(ColumnNotFoundSnafu {
1269                                col: matcher.value.clone(),
1270                            }
1271                            .build());
1272                        }
1273                    }
1274                    MatchOp::Re(regex) => {
1275                        for col in &self.ctx.field_columns {
1276                            if regex.is_match(col) {
1277                                let _ = result_set.insert(col.clone());
1278                            }
1279                        }
1280                    }
1281                    MatchOp::NotRe(regex) => {
1282                        for col in &self.ctx.field_columns {
1283                            if regex.is_match(col) {
1284                                let _ = reverse_set.insert(col.clone());
1285                            }
1286                        }
1287                    }
1288                }
1289            }
1290            // merge two set
1291            if result_set.is_empty() {
1292                result_set = col_set.into_iter().cloned().collect();
1293            }
1294            for col in reverse_set {
1295                let _ = result_set.remove(&col);
1296            }
1297
1298            // mask the field columns in context using computed result set
1299            self.ctx.field_columns = self
1300                .ctx
1301                .field_columns
1302                .drain(..)
1303                .filter(|col| result_set.contains(col))
1304                .collect();
1305
1306            let exprs = result_set
1307                .into_iter()
1308                .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1309                .chain(self.create_tag_column_exprs()?)
1310                .chain(
1311                    self.ctx
1312                        .use_tsid
1313                        .then_some(DfExpr::Column(Column::new_unqualified(
1314                            DATA_SCHEMA_TSID_COLUMN_NAME,
1315                        ))),
1316                )
1317                .chain(Some(self.create_time_index_column_expr()?))
1318                .collect::<Vec<_>>();
1319
1320            // reuse this variable for simplicity
1321            table_scan = LogicalPlanBuilder::from(table_scan)
1322                .project(exprs)
1323                .context(DataFusionPlanningSnafu)?
1324                .build()
1325                .context(DataFusionPlanningSnafu)?;
1326        }
1327
1328        // make sort plan
1329        let series_key_columns = if self.ctx.use_tsid {
1330            vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
1331        } else {
1332            self.ctx.tag_columns.clone()
1333        };
1334
1335        let sort_exprs = if self.ctx.use_tsid {
1336            vec![
1337                DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true),
1338                self.create_time_index_column_expr()?.sort(true, true),
1339            ]
1340        } else {
1341            self.create_tag_and_time_index_column_sort_exprs()?
1342        };
1343
1344        let sort_plan = LogicalPlanBuilder::from(table_scan)
1345            .sort(sort_exprs)
1346            .context(DataFusionPlanningSnafu)?
1347            .build()
1348            .context(DataFusionPlanningSnafu)?;
1349
1350        // make divide plan
1351        let time_index_column =
1352            self.ctx
1353                .time_index_column
1354                .clone()
1355                .with_context(|| TimeIndexNotFoundSnafu {
1356                    table: table_ref.to_string(),
1357                })?;
1358        let divide_plan = LogicalPlan::Extension(Extension {
1359            node: Arc::new(SeriesDivide::new(
1360                series_key_columns.clone(),
1361                time_index_column,
1362                sort_plan,
1363            )),
1364        });
1365
1366        // make series_normalize plan
1367        if !is_range_selector && offset_duration == 0 {
1368            return Ok(divide_plan);
1369        }
1370        let series_normalize = SeriesNormalize::new(
1371            offset_duration,
1372            self.ctx
1373                .time_index_column
1374                .clone()
1375                .with_context(|| TimeIndexNotFoundSnafu {
1376                    table: table_ref.to_quoted_string(),
1377                })?,
1378            is_range_selector,
1379            series_key_columns,
1380            divide_plan,
1381        );
1382        let logical_plan = LogicalPlan::Extension(Extension {
1383            node: Arc::new(series_normalize),
1384        });
1385
1386        Ok(logical_plan)
1387    }
1388
1389    /// Convert [LabelModifier] to [Column] exprs for aggregation.
1390    /// Timestamp column and tag columns will be included.
1391    ///
1392    /// # Side effect
1393    ///
1394    /// This method will also change the tag columns in ctx if `update_ctx` is true.
1395    fn agg_modifier_to_col(
1396        &mut self,
1397        input_schema: &DFSchemaRef,
1398        modifier: &Option<LabelModifier>,
1399        update_ctx: bool,
1400    ) -> Result<Vec<DfExpr>> {
1401        match modifier {
1402            None => {
1403                if update_ctx {
1404                    self.ctx.tag_columns.clear();
1405                }
1406                Ok(vec![self.create_time_index_column_expr()?])
1407            }
1408            Some(LabelModifier::Include(labels)) => {
1409                if update_ctx {
1410                    self.ctx.tag_columns.clear();
1411                }
1412                let mut exprs = Vec::with_capacity(labels.labels.len());
1413                for label in &labels.labels {
1414                    if is_metric_engine_internal_column(label) {
1415                        continue;
1416                    }
1417                    // nonexistence label will be ignored
1418                    if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label)
1419                    {
1420                        exprs.push(DfExpr::Column(Column::from_name(column_name.clone())));
1421
1422                        if update_ctx {
1423                            // update the tag columns in context
1424                            self.ctx.tag_columns.push(column_name);
1425                        }
1426                    }
1427                }
1428                // add timestamp column
1429                exprs.push(self.create_time_index_column_expr()?);
1430
1431                Ok(exprs)
1432            }
1433            Some(LabelModifier::Exclude(labels)) => {
1434                let mut all_fields = input_schema
1435                    .fields()
1436                    .iter()
1437                    .map(|f| f.name())
1438                    .collect::<BTreeSet<_>>();
1439
1440                // Exclude metric engine internal columns (not PromQL labels) from the implicit
1441                // "without" label set.
1442                all_fields.retain(|col| !is_metric_engine_internal_column(col.as_str()));
1443
1444                // remove "without"-ed fields
1445                // nonexistence label will be ignored
1446                for label in &labels.labels {
1447                    let _ = all_fields.remove(label);
1448                }
1449
1450                // remove time index and value fields
1451                if let Some(time_index) = &self.ctx.time_index_column {
1452                    let _ = all_fields.remove(time_index);
1453                }
1454                for value in &self.ctx.field_columns {
1455                    let _ = all_fields.remove(value);
1456                }
1457
1458                if update_ctx {
1459                    // change the tag columns in context
1460                    self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1461                }
1462
1463                // collect remaining fields and convert to col expr
1464                let mut exprs = all_fields
1465                    .into_iter()
1466                    .map(|c| DfExpr::Column(Column::from(c)))
1467                    .collect::<Vec<_>>();
1468
1469                // add timestamp column
1470                exprs.push(self.create_time_index_column_expr()?);
1471
1472                Ok(exprs)
1473            }
1474        }
1475    }
1476
1477    // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
1478    pub fn matchers_to_expr(
1479        label_matchers: Matchers,
1480        table_schema: &DFSchemaRef,
1481    ) -> Result<Vec<DfExpr>> {
1482        let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1483        for matcher in label_matchers.matchers {
1484            if matcher.name == SCHEMA_COLUMN_MATCHER
1485                || matcher.name == DB_COLUMN_MATCHER
1486                || matcher.name == FIELD_COLUMN_MATCHER
1487            {
1488                continue;
1489            }
1490
1491            let column_name = Self::find_case_sensitive_column(table_schema, matcher.name.as_str());
1492            let col = if let Some(column_name) = column_name {
1493                DfExpr::Column(Column::from_name(column_name))
1494            } else {
1495                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
1496                    .alias(matcher.name.clone())
1497            };
1498            let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1499            let expr = match matcher.op {
1500                MatchOp::Equal => col.eq(lit),
1501                MatchOp::NotEqual => col.not_eq(lit),
1502                MatchOp::Re(re) => {
1503                    // TODO(ruihang): a more programmatic way to handle this in datafusion
1504
1505                    // This is a hack to handle `.+` and `.*`, and is not strictly correct
1506                    // `.` doesn't match newline (`\n`). Given this is in PromQL context,
1507                    // most of the time it's fine.
1508                    if re.as_str() == "^(?:.*)$" {
1509                        continue;
1510                    }
1511                    if re.as_str() == "^(?:.+)$" {
1512                        col.not_eq(DfExpr::Literal(
1513                            ScalarValue::Utf8(Some(String::new())),
1514                            None,
1515                        ))
1516                    } else {
1517                        DfExpr::BinaryExpr(BinaryExpr {
1518                            left: Box::new(col),
1519                            op: Operator::RegexMatch,
1520                            right: Box::new(DfExpr::Literal(
1521                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1522                                None,
1523                            )),
1524                        })
1525                    }
1526                }
1527                MatchOp::NotRe(re) => {
1528                    if re.as_str() == "^(?:.*)$" {
1529                        DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1530                    } else if re.as_str() == "^(?:.+)$" {
1531                        col.eq(DfExpr::Literal(
1532                            ScalarValue::Utf8(Some(String::new())),
1533                            None,
1534                        ))
1535                    } else {
1536                        DfExpr::BinaryExpr(BinaryExpr {
1537                            left: Box::new(col),
1538                            op: Operator::RegexNotMatch,
1539                            right: Box::new(DfExpr::Literal(
1540                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1541                                None,
1542                            )),
1543                        })
1544                    }
1545                }
1546            };
1547            exprs.push(expr);
1548        }
1549
1550        Ok(exprs)
1551    }
1552
1553    fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option<String> {
1554        if is_metric_engine_internal_column(column) {
1555            return None;
1556        }
1557        schema
1558            .fields()
1559            .iter()
1560            .find(|field| field.name() == column)
1561            .map(|field| field.name().clone())
1562    }
1563
1564    fn table_from_source(&self, source: &Arc<dyn TableSource>) -> Result<table::TableRef> {
1565        Ok(source
1566            .as_any()
1567            .downcast_ref::<DefaultTableSource>()
1568            .context(UnknownTableSnafu)?
1569            .table_provider
1570            .as_any()
1571            .downcast_ref::<DfTableProviderAdapter>()
1572            .context(UnknownTableSnafu)?
1573            .table())
1574    }
1575
1576    fn table_ref(&self) -> Result<TableReference> {
1577        let table_name = self
1578            .ctx
1579            .table_name
1580            .clone()
1581            .context(TableNameNotFoundSnafu)?;
1582
1583        // set schema name if `__schema__` is given
1584        let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1585            TableReference::partial(schema_name.as_str(), table_name.as_str())
1586        } else {
1587            TableReference::bare(table_name.as_str())
1588        };
1589
1590        Ok(table_ref)
1591    }
1592
1593    fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1594        let start = self.ctx.start;
1595        let end = self.ctx.end;
1596        if end < start {
1597            return InvalidTimeRangeSnafu { start, end }.fail();
1598        }
1599        let lookback_delta = self.ctx.lookback_delta;
1600        let range = self.ctx.range.unwrap_or_default();
1601        let interval = self.ctx.interval;
1602        let time_index_expr = self.create_time_index_column_expr()?;
1603        let num_points = (end - start) / interval;
1604
1605        // Prometheus semantics:
1606        // - Instant selector lookback: (eval_ts - lookback_delta, eval_ts]
1607        // - Range selector:           (eval_ts - range, eval_ts]
1608        //
1609        // So samples positioned exactly at the lower boundary must be excluded. We align the scan
1610        // lower bound with Prometheus by shifting it forward by 1ms (millisecond granularity),
1611        // while still using a `>=` filter.
1612        let selector_window = if range == 0 { lookback_delta } else { range };
1613        let lower_exclusive_adjustment = if selector_window > 0 { 1 } else { 0 };
1614
1615        // Scan a continuous time range
1616        if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1617            let single_time_range = time_index_expr
1618                .clone()
1619                .gt_eq(DfExpr::Literal(
1620                    ScalarValue::TimestampMillisecond(
1621                        Some(
1622                            self.ctx.start - offset_duration - selector_window
1623                                + lower_exclusive_adjustment,
1624                        ),
1625                        None,
1626                    ),
1627                    None,
1628                ))
1629                .and(time_index_expr.lt_eq(DfExpr::Literal(
1630                    ScalarValue::TimestampMillisecond(Some(self.ctx.end - offset_duration), None),
1631                    None,
1632                )));
1633            return Ok(Some(single_time_range));
1634        }
1635
1636        // Otherwise scan scatter ranges separately
1637        let mut filters = Vec::with_capacity(num_points as usize + 1);
1638        for timestamp in (start..=end).step_by(interval as usize) {
1639            filters.push(
1640                time_index_expr
1641                    .clone()
1642                    .gt_eq(DfExpr::Literal(
1643                        ScalarValue::TimestampMillisecond(
1644                            Some(
1645                                timestamp - offset_duration - selector_window
1646                                    + lower_exclusive_adjustment,
1647                            ),
1648                            None,
1649                        ),
1650                        None,
1651                    ))
1652                    .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1653                        ScalarValue::TimestampMillisecond(Some(timestamp - offset_duration), None),
1654                        None,
1655                    ))),
1656            )
1657        }
1658
1659        Ok(filters.into_iter().reduce(DfExpr::or))
1660    }
1661
1662    /// Create a table scan plan and a filter plan with given filter.
1663    ///
1664    /// # Panic
1665    /// If the filter is empty
1666    async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1667        let provider = self
1668            .table_provider
1669            .resolve_table(table_ref.clone())
1670            .await
1671            .context(CatalogSnafu)?;
1672
1673        let logical_table = self.table_from_source(&provider)?;
1674
1675        // Try to rewrite the table scan to physical table scan if possible.
1676        let mut maybe_phy_table_ref = table_ref.clone();
1677        let mut scan_provider = provider;
1678        let mut table_id_filter: Option<u32> = None;
1679
1680        // If it's a metric engine logical table, scan its physical table directly and filter by
1681        // `__table_id = logical_table_id` to get access to internal columns like `__tsid`.
1682        if logical_table.table_info().meta.engine == METRIC_ENGINE_NAME
1683            && let Some(physical_table_name) = logical_table
1684                .table_info()
1685                .meta
1686                .options
1687                .extra_options
1688                .get(LOGICAL_TABLE_METADATA_KEY)
1689        {
1690            let physical_table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1691                TableReference::partial(schema_name.as_str(), physical_table_name.as_str())
1692            } else {
1693                TableReference::bare(physical_table_name.as_str())
1694            };
1695
1696            let physical_provider = match self
1697                .table_provider
1698                .resolve_table(physical_table_ref.clone())
1699                .await
1700            {
1701                Ok(provider) => provider,
1702                Err(e) if e.status_code() == StatusCode::TableNotFound => {
1703                    // Fall back to scanning the logical table. It still works, but without
1704                    // `__tsid` optimization.
1705                    scan_provider.clone()
1706                }
1707                Err(e) => return Err(e).context(CatalogSnafu),
1708            };
1709
1710            if !Arc::ptr_eq(&physical_provider, &scan_provider) {
1711                // Only rewrite when internal columns exist in physical schema.
1712                let physical_table = self.table_from_source(&physical_provider)?;
1713
1714                let has_table_id = physical_table
1715                    .schema()
1716                    .column_schema_by_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
1717                    .is_some();
1718                let has_tsid = physical_table
1719                    .schema()
1720                    .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1721                    .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1722
1723                if has_table_id && has_tsid {
1724                    scan_provider = physical_provider;
1725                    maybe_phy_table_ref = physical_table_ref;
1726                    table_id_filter = Some(logical_table.table_info().ident.table_id);
1727                }
1728            }
1729        }
1730
1731        let scan_table = self.table_from_source(&scan_provider)?;
1732
1733        let use_tsid = table_id_filter.is_some()
1734            && scan_table
1735                .schema()
1736                .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1737                .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1738        self.ctx.use_tsid = use_tsid;
1739
1740        let all_table_tags = self.ctx.tag_columns.clone();
1741
1742        let scan_tag_columns = if use_tsid {
1743            let mut scan_tags = self.ctx.tag_columns.clone();
1744            for matcher in &self.ctx.selector_matcher {
1745                if is_metric_engine_internal_column(&matcher.name) {
1746                    continue;
1747                }
1748                if all_table_tags.iter().any(|tag| tag == &matcher.name) {
1749                    scan_tags.push(matcher.name.clone());
1750                }
1751            }
1752            scan_tags.sort_unstable();
1753            scan_tags.dedup();
1754            scan_tags
1755        } else {
1756            self.ctx.tag_columns.clone()
1757        };
1758
1759        let is_time_index_ms = scan_table
1760            .schema()
1761            .timestamp_column()
1762            .with_context(|| TimeIndexNotFoundSnafu {
1763                table: maybe_phy_table_ref.to_quoted_string(),
1764            })?
1765            .data_type
1766            == ConcreteDataType::timestamp_millisecond_datatype();
1767
1768        let scan_projection = if table_id_filter.is_some() {
1769            let mut required_columns = HashSet::new();
1770            required_columns.insert(DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string());
1771            required_columns.insert(self.ctx.time_index_column.clone().with_context(|| {
1772                TimeIndexNotFoundSnafu {
1773                    table: maybe_phy_table_ref.to_quoted_string(),
1774                }
1775            })?);
1776            for col in &scan_tag_columns {
1777                required_columns.insert(col.clone());
1778            }
1779            for col in &self.ctx.field_columns {
1780                required_columns.insert(col.clone());
1781            }
1782            if use_tsid {
1783                required_columns.insert(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
1784            }
1785
1786            let arrow_schema = scan_table.schema().arrow_schema().clone();
1787            Some(
1788                arrow_schema
1789                    .fields()
1790                    .iter()
1791                    .enumerate()
1792                    .filter(|(_, field)| required_columns.contains(field.name().as_str()))
1793                    .map(|(idx, _)| idx)
1794                    .collect::<Vec<_>>(),
1795            )
1796        } else {
1797            None
1798        };
1799
1800        let mut scan_plan =
1801            LogicalPlanBuilder::scan(maybe_phy_table_ref.clone(), scan_provider, scan_projection)
1802                .context(DataFusionPlanningSnafu)?
1803                .build()
1804                .context(DataFusionPlanningSnafu)?;
1805
1806        if let Some(table_id) = table_id_filter {
1807            scan_plan = LogicalPlanBuilder::from(scan_plan)
1808                .filter(
1809                    DfExpr::Column(Column::from_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME))
1810                        .eq(lit(table_id)),
1811                )
1812                .context(DataFusionPlanningSnafu)?
1813                .alias(table_ref.clone()) // rename the relation back to logical table's name after filtering
1814                .context(DataFusionPlanningSnafu)?
1815                .build()
1816                .context(DataFusionPlanningSnafu)?;
1817        }
1818
1819        if !is_time_index_ms {
1820            // cast to ms if time_index not in Millisecond precision
1821            let expr: Vec<_> = self
1822                .create_field_column_exprs()?
1823                .into_iter()
1824                .chain(
1825                    scan_tag_columns
1826                        .iter()
1827                        .map(|tag| DfExpr::Column(Column::from_name(tag))),
1828                )
1829                .chain(self.ctx.use_tsid.then_some(DfExpr::Column(Column::new(
1830                    Some(table_ref.clone()),
1831                    DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
1832                ))))
1833                .chain(Some(DfExpr::Alias(Alias {
1834                    expr: Box::new(DfExpr::Cast(Cast {
1835                        expr: Box::new(self.create_time_index_column_expr()?),
1836                        data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1837                    })),
1838                    relation: Some(table_ref.clone()),
1839                    name: self
1840                        .ctx
1841                        .time_index_column
1842                        .as_ref()
1843                        .with_context(|| TimeIndexNotFoundSnafu {
1844                            table: table_ref.to_quoted_string(),
1845                        })?
1846                        .clone(),
1847                    metadata: None,
1848                })))
1849                .collect::<Vec<_>>();
1850            scan_plan = LogicalPlanBuilder::from(scan_plan)
1851                .project(expr)
1852                .context(DataFusionPlanningSnafu)?
1853                .build()
1854                .context(DataFusionPlanningSnafu)?;
1855        } else if table_id_filter.is_some() {
1856            // Drop the internal `__table_id` column after filtering.
1857            let project_exprs = self
1858                .create_field_column_exprs()?
1859                .into_iter()
1860                .chain(
1861                    scan_tag_columns
1862                        .iter()
1863                        .map(|tag| DfExpr::Column(Column::from_name(tag))),
1864                )
1865                .chain(
1866                    self.ctx
1867                        .use_tsid
1868                        .then_some(DfExpr::Column(Column::from_name(
1869                            DATA_SCHEMA_TSID_COLUMN_NAME,
1870                        ))),
1871                )
1872                .chain(Some(self.create_time_index_column_expr()?))
1873                .collect::<Vec<_>>();
1874
1875            scan_plan = LogicalPlanBuilder::from(scan_plan)
1876                .project(project_exprs)
1877                .context(DataFusionPlanningSnafu)?
1878                .build()
1879                .context(DataFusionPlanningSnafu)?;
1880        }
1881
1882        let result = LogicalPlanBuilder::from(scan_plan)
1883            .build()
1884            .context(DataFusionPlanningSnafu)?;
1885        Ok(result)
1886    }
1887
1888    fn collect_row_key_tag_columns_from_plan(
1889        &self,
1890        plan: &LogicalPlan,
1891    ) -> Result<BTreeSet<String>> {
1892        fn walk(
1893            planner: &PromPlanner,
1894            plan: &LogicalPlan,
1895            out: &mut BTreeSet<String>,
1896        ) -> Result<()> {
1897            if let LogicalPlan::TableScan(scan) = plan {
1898                let table = planner.table_from_source(&scan.source)?;
1899                for col in table.table_info().meta.row_key_column_names() {
1900                    if col != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
1901                        && col != DATA_SCHEMA_TSID_COLUMN_NAME
1902                        && !is_metric_engine_internal_column(col)
1903                    {
1904                        out.insert(col.clone());
1905                    }
1906                }
1907            }
1908
1909            for input in plan.inputs() {
1910                walk(planner, input, out)?;
1911            }
1912            Ok(())
1913        }
1914
1915        let mut out = BTreeSet::new();
1916        walk(self, plan, &mut out)?;
1917        Ok(out)
1918    }
1919
1920    fn ensure_tag_columns_available(
1921        &self,
1922        plan: LogicalPlan,
1923        required_tags: &BTreeSet<String>,
1924    ) -> Result<LogicalPlan> {
1925        if required_tags.is_empty() {
1926            return Ok(plan);
1927        }
1928
1929        struct Rewriter {
1930            required_tags: BTreeSet<String>,
1931        }
1932
1933        impl TreeNodeRewriter for Rewriter {
1934            type Node = LogicalPlan;
1935
1936            fn f_up(
1937                &mut self,
1938                node: Self::Node,
1939            ) -> datafusion_common::Result<Transformed<Self::Node>> {
1940                match node {
1941                    LogicalPlan::TableScan(scan) => {
1942                        let schema = scan.source.schema();
1943                        let mut projection = match scan.projection.clone() {
1944                            Some(p) => p,
1945                            None => {
1946                                // Scanning all columns already covers required tags.
1947                                return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1948                            }
1949                        };
1950
1951                        let mut changed = false;
1952                        for tag in &self.required_tags {
1953                            if let Some((idx, _)) = schema
1954                                .fields()
1955                                .iter()
1956                                .enumerate()
1957                                .find(|(_, field)| field.name() == tag)
1958                                && !projection.contains(&idx)
1959                            {
1960                                projection.push(idx);
1961                                changed = true;
1962                            }
1963                        }
1964
1965                        if !changed {
1966                            return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1967                        }
1968
1969                        projection.sort_unstable();
1970                        projection.dedup();
1971
1972                        let new_scan = TableScan::try_new(
1973                            scan.table_name.clone(),
1974                            scan.source.clone(),
1975                            Some(projection),
1976                            scan.filters,
1977                            scan.fetch,
1978                        )?;
1979                        Ok(Transformed::yes(LogicalPlan::TableScan(new_scan)))
1980                    }
1981                    LogicalPlan::Projection(proj) => {
1982                        let input_schema = proj.input.schema();
1983
1984                        let existing = proj
1985                            .schema
1986                            .fields()
1987                            .iter()
1988                            .map(|f| f.name().as_str())
1989                            .collect::<HashSet<_>>();
1990
1991                        let mut expr = proj.expr.clone();
1992                        let mut has_changed = false;
1993                        for tag in &self.required_tags {
1994                            if existing.contains(tag.as_str()) {
1995                                continue;
1996                            }
1997
1998                            if let Some(idx) = input_schema.index_of_column_by_name(None, tag) {
1999                                expr.push(DfExpr::Column(Column::from(
2000                                    input_schema.qualified_field(idx),
2001                                )));
2002                                has_changed = true;
2003                            }
2004                        }
2005
2006                        if !has_changed {
2007                            return Ok(Transformed::no(LogicalPlan::Projection(proj)));
2008                        }
2009
2010                        let new_proj = Projection::try_new(expr, proj.input)?;
2011                        Ok(Transformed::yes(LogicalPlan::Projection(new_proj)))
2012                    }
2013                    other => Ok(Transformed::no(other)),
2014                }
2015            }
2016        }
2017
2018        let mut rewriter = Rewriter {
2019            required_tags: required_tags.clone(),
2020        };
2021        let rewritten = plan
2022            .rewrite(&mut rewriter)
2023            .context(DataFusionPlanningSnafu)?;
2024        Ok(rewritten.data)
2025    }
2026
2027    fn refresh_tag_columns_from_schema(&mut self, schema: &DFSchemaRef) {
2028        let time_index = self.ctx.time_index_column.as_deref();
2029        let field_columns = self.ctx.field_columns.iter().collect::<HashSet<_>>();
2030
2031        let mut tags = schema
2032            .fields()
2033            .iter()
2034            .map(|f| f.name())
2035            .filter(|name| Some(name.as_str()) != time_index)
2036            .filter(|name| !field_columns.contains(name))
2037            .filter(|name| !is_metric_engine_internal_column(name))
2038            .cloned()
2039            .collect::<Vec<_>>();
2040        tags.sort_unstable();
2041        tags.dedup();
2042        self.ctx.tag_columns = tags;
2043    }
2044
2045    /// Setup [PromPlannerContext]'s state fields.
2046    ///
2047    /// Returns a logical plan for an empty metric.
2048    async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
2049        let table_ref = self.table_ref()?;
2050        let source = match self.table_provider.resolve_table(table_ref.clone()).await {
2051            Err(e) if e.status_code() == StatusCode::TableNotFound => {
2052                let plan = self.setup_context_for_empty_metric()?;
2053                return Ok(Some(plan));
2054            }
2055            res => res.context(CatalogSnafu)?,
2056        };
2057        let table = self.table_from_source(&source)?;
2058
2059        // set time index column name
2060        let time_index = table
2061            .schema()
2062            .timestamp_column()
2063            .with_context(|| TimeIndexNotFoundSnafu {
2064                table: table_ref.to_quoted_string(),
2065            })?
2066            .name
2067            .clone();
2068        self.ctx.time_index_column = Some(time_index);
2069
2070        // set values columns
2071        let values = table
2072            .table_info()
2073            .meta
2074            .field_column_names()
2075            .cloned()
2076            .collect();
2077        self.ctx.field_columns = values;
2078
2079        // set primary key (tag) columns
2080        let tags = table
2081            .table_info()
2082            .meta
2083            .row_key_column_names()
2084            .filter(|col| {
2085                // remove metric engine's internal columns
2086                col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
2087            })
2088            .cloned()
2089            .collect();
2090        self.ctx.tag_columns = tags;
2091
2092        self.ctx.use_tsid = false;
2093
2094        Ok(None)
2095    }
2096
2097    /// Setup [PromPlannerContext]'s state fields for a non existent table
2098    /// without any rows.
2099    fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
2100        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2101        self.ctx.reset_table_name_and_schema();
2102        self.ctx.tag_columns = vec![];
2103        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
2104        self.ctx.use_tsid = false;
2105
2106        // The table doesn't have any data, so we set start to 0 and end to -1.
2107        let plan = LogicalPlan::Extension(Extension {
2108            node: Arc::new(
2109                EmptyMetric::new(
2110                    0,
2111                    -1,
2112                    self.ctx.interval,
2113                    SPECIAL_TIME_FUNCTION.to_string(),
2114                    DEFAULT_FIELD_COLUMN.to_string(),
2115                    Some(lit(0.0f64)),
2116                )
2117                .context(DataFusionPlanningSnafu)?,
2118            ),
2119        });
2120        Ok(plan)
2121    }
2122
2123    // TODO(ruihang): insert column expr
2124    fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
2125        let mut result = FunctionArgs::default();
2126
2127        for arg in args {
2128            // First try to parse as literal expression (including binary expressions like 100.0 + 3.0)
2129            if let Some(expr) = Self::try_build_literal_expr(arg) {
2130                result.literals.push(expr);
2131            } else {
2132                // If not a literal, treat as vector input
2133                match arg.as_ref() {
2134                    PromExpr::Subquery(_)
2135                    | PromExpr::VectorSelector(_)
2136                    | PromExpr::MatrixSelector(_)
2137                    | PromExpr::Extension(_)
2138                    | PromExpr::Aggregate(_)
2139                    | PromExpr::Paren(_)
2140                    | PromExpr::Call(_)
2141                    | PromExpr::Binary(_)
2142                    | PromExpr::Unary(_) => {
2143                        if result.input.replace(*arg.clone()).is_some() {
2144                            MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
2145                        }
2146                    }
2147
2148                    _ => {
2149                        let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
2150                        result.literals.push(expr);
2151                    }
2152                }
2153            }
2154        }
2155
2156        Ok(result)
2157    }
2158
2159    /// Creates function expressions for projection and returns the expressions and new tags.
2160    ///
2161    /// # Side Effects
2162    ///
2163    /// This method will update [PromPlannerContext]'s fields and tags if needed.
2164    fn create_function_expr(
2165        &mut self,
2166        func: &Function,
2167        other_input_exprs: Vec<DfExpr>,
2168        query_engine_state: &QueryEngineState,
2169    ) -> Result<(Vec<DfExpr>, Vec<String>)> {
2170        // TODO(ruihang): check function args list
2171        let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
2172
2173        // TODO(ruihang): set this according to in-param list
2174        let field_column_pos = 0;
2175        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2176        // New labels after executing the function, e.g. `label_replace` etc.
2177        let mut new_tags = vec![];
2178        let scalar_func = match func.name {
2179            "increase" => ScalarFunc::ExtrapolateUdf(
2180                Arc::new(Increase::scalar_udf()),
2181                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2182            ),
2183            "rate" => ScalarFunc::ExtrapolateUdf(
2184                Arc::new(Rate::scalar_udf()),
2185                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2186            ),
2187            "delta" => ScalarFunc::ExtrapolateUdf(
2188                Arc::new(Delta::scalar_udf()),
2189                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2190            ),
2191            "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
2192            "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
2193            "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
2194            "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
2195            "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
2196            "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
2197            "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
2198            "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
2199            "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
2200            "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
2201            "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
2202            "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
2203            "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
2204            "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
2205            "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
2206            "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
2207            "predict_linear" => {
2208                other_input_exprs[0] = DfExpr::Cast(Cast {
2209                    expr: Box::new(other_input_exprs[0].clone()),
2210                    data_type: ArrowDataType::Int64,
2211                });
2212                ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
2213            }
2214            "double_exponential_smoothing" | "holt_winters" => {
2215                ScalarFunc::Udf(Arc::new(DoubleExponentialSmoothing::scalar_udf()))
2216            }
2217            "time" => {
2218                exprs.push(build_special_time_expr(
2219                    self.ctx.time_index_column.as_ref().unwrap(),
2220                ));
2221                ScalarFunc::GeneratedExpr
2222            }
2223            "minute" => {
2224                // date_part('minute', time_index)
2225                let expr = self.date_part_on_time_index("minute")?;
2226                exprs.push(expr);
2227                ScalarFunc::GeneratedExpr
2228            }
2229            "hour" => {
2230                // date_part('hour', time_index)
2231                let expr = self.date_part_on_time_index("hour")?;
2232                exprs.push(expr);
2233                ScalarFunc::GeneratedExpr
2234            }
2235            "month" => {
2236                // date_part('month', time_index)
2237                let expr = self.date_part_on_time_index("month")?;
2238                exprs.push(expr);
2239                ScalarFunc::GeneratedExpr
2240            }
2241            "year" => {
2242                // date_part('year', time_index)
2243                let expr = self.date_part_on_time_index("year")?;
2244                exprs.push(expr);
2245                ScalarFunc::GeneratedExpr
2246            }
2247            "day_of_month" => {
2248                // date_part('day', time_index)
2249                let expr = self.date_part_on_time_index("day")?;
2250                exprs.push(expr);
2251                ScalarFunc::GeneratedExpr
2252            }
2253            "day_of_week" => {
2254                // date_part('dow', time_index)
2255                let expr = self.date_part_on_time_index("dow")?;
2256                exprs.push(expr);
2257                ScalarFunc::GeneratedExpr
2258            }
2259            "day_of_year" => {
2260                // date_part('doy', time_index)
2261                let expr = self.date_part_on_time_index("doy")?;
2262                exprs.push(expr);
2263                ScalarFunc::GeneratedExpr
2264            }
2265            "days_in_month" => {
2266                // date_part(
2267                //     'days',
2268                //     (date_trunc('month', <TIME INDEX>::date) + interval '1 month - 1 day')
2269                // );
2270                let day_lit_expr = "day".lit();
2271                let month_lit_expr = "month".lit();
2272                let interval_1month_lit_expr =
2273                    DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
2274                let interval_1day_lit_expr = DfExpr::Literal(
2275                    ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
2276                    None,
2277                );
2278                let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
2279                    left: Box::new(interval_1month_lit_expr),
2280                    op: Operator::Minus,
2281                    right: Box::new(interval_1day_lit_expr),
2282                });
2283                let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
2284                    func: datafusion_functions::datetime::date_trunc(),
2285                    args: vec![month_lit_expr, self.create_time_index_column_expr()?],
2286                });
2287                let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
2288                    left: Box::new(date_trunc_expr),
2289                    op: Operator::Plus,
2290                    right: Box::new(the_1month_minus_1day_expr),
2291                });
2292                let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
2293                    func: datafusion_functions::datetime::date_part(),
2294                    args: vec![day_lit_expr, date_trunc_plus_interval_expr],
2295                });
2296
2297                exprs.push(date_part_expr);
2298                ScalarFunc::GeneratedExpr
2299            }
2300
2301            "label_join" => {
2302                let (concat_expr, dst_label) = Self::build_concat_labels_expr(
2303                    &mut other_input_exprs,
2304                    &self.ctx,
2305                    query_engine_state,
2306                )?;
2307
2308                // Reserve the current field columns except the `dst_label`.
2309                for value in &self.ctx.field_columns {
2310                    if *value != dst_label {
2311                        let expr = DfExpr::Column(Column::from_name(value));
2312                        exprs.push(expr);
2313                    }
2314                }
2315
2316                // Remove it from tag columns if exists to avoid duplicated column names
2317                self.ctx.tag_columns.retain(|tag| *tag != dst_label);
2318                new_tags.push(dst_label);
2319                // Add the new label expr to evaluate
2320                exprs.push(concat_expr);
2321
2322                ScalarFunc::GeneratedExpr
2323            }
2324            "label_replace" => {
2325                if let Some((replace_expr, dst_label)) = self
2326                    .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
2327                {
2328                    // Reserve the current field columns except the `dst_label`.
2329                    for value in &self.ctx.field_columns {
2330                        if *value != dst_label {
2331                            let expr = DfExpr::Column(Column::from_name(value));
2332                            exprs.push(expr);
2333                        }
2334                    }
2335
2336                    ensure!(
2337                        !self.ctx.tag_columns.contains(&dst_label),
2338                        SameLabelSetSnafu
2339                    );
2340                    new_tags.push(dst_label);
2341                    // Add the new label expr to evaluate
2342                    exprs.push(replace_expr);
2343                } else {
2344                    // Keep the current field columns
2345                    for value in &self.ctx.field_columns {
2346                        let expr = DfExpr::Column(Column::from_name(value));
2347                        exprs.push(expr);
2348                    }
2349                }
2350
2351                ScalarFunc::GeneratedExpr
2352            }
2353            "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
2354                // These functions are not expression but a part of plan,
2355                // they are processed by `prom_call_expr_to_plan`.
2356                for value in &self.ctx.field_columns {
2357                    let expr = DfExpr::Column(Column::from_name(value));
2358                    exprs.push(expr);
2359                }
2360
2361                ScalarFunc::GeneratedExpr
2362            }
2363            "round" => {
2364                if other_input_exprs.is_empty() {
2365                    other_input_exprs.push_front(0.0f64.lit());
2366                }
2367                ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
2368            }
2369            "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
2370            "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
2371            "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
2372            "pi" => {
2373                // pi functions doesn't accepts any arguments, needs special processing
2374                let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2375                    func: datafusion::functions::math::pi(),
2376                    args: vec![],
2377                });
2378                exprs.push(fn_expr);
2379
2380                ScalarFunc::GeneratedExpr
2381            }
2382            _ => {
2383                if let Some(f) = query_engine_state
2384                    .session_state()
2385                    .scalar_functions()
2386                    .get(func.name)
2387                {
2388                    ScalarFunc::DataFusionBuiltin(f.clone())
2389                } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
2390                    let func_state = query_engine_state.function_state();
2391                    let query_ctx = self.table_provider.query_ctx();
2392
2393                    ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
2394                        state: func_state,
2395                        query_ctx: query_ctx.clone(),
2396                    })))
2397                } else if let Some(f) = datafusion_functions::math::functions()
2398                    .iter()
2399                    .find(|f| f.name() == func.name)
2400                {
2401                    ScalarFunc::DataFusionUdf(f.clone())
2402                } else {
2403                    return UnsupportedExprSnafu {
2404                        name: func.name.to_string(),
2405                    }
2406                    .fail();
2407                }
2408            }
2409        };
2410
2411        for value in &self.ctx.field_columns {
2412            let col_expr = DfExpr::Column(Column::from_name(value));
2413
2414            match scalar_func.clone() {
2415                ScalarFunc::DataFusionBuiltin(func) => {
2416                    other_input_exprs.insert(field_column_pos, col_expr);
2417                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2418                        func,
2419                        args: other_input_exprs.clone().into(),
2420                    });
2421                    exprs.push(fn_expr);
2422                    let _ = other_input_exprs.remove(field_column_pos);
2423                }
2424                ScalarFunc::DataFusionUdf(func) => {
2425                    let args = itertools::chain!(
2426                        other_input_exprs.iter().take(field_column_pos).cloned(),
2427                        std::iter::once(col_expr),
2428                        other_input_exprs.iter().skip(field_column_pos).cloned()
2429                    )
2430                    .collect_vec();
2431                    exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
2432                }
2433                ScalarFunc::Udf(func) => {
2434                    let ts_range_expr = DfExpr::Column(Column::from_name(
2435                        RangeManipulate::build_timestamp_range_name(
2436                            self.ctx.time_index_column.as_ref().unwrap(),
2437                        ),
2438                    ));
2439                    other_input_exprs.insert(field_column_pos, ts_range_expr);
2440                    other_input_exprs.insert(field_column_pos + 1, col_expr);
2441                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2442                        func,
2443                        args: other_input_exprs.clone().into(),
2444                    });
2445                    exprs.push(fn_expr);
2446                    let _ = other_input_exprs.remove(field_column_pos + 1);
2447                    let _ = other_input_exprs.remove(field_column_pos);
2448                }
2449                ScalarFunc::ExtrapolateUdf(func, range_length) => {
2450                    let ts_range_expr = DfExpr::Column(Column::from_name(
2451                        RangeManipulate::build_timestamp_range_name(
2452                            self.ctx.time_index_column.as_ref().unwrap(),
2453                        ),
2454                    ));
2455                    other_input_exprs.insert(field_column_pos, ts_range_expr);
2456                    other_input_exprs.insert(field_column_pos + 1, col_expr);
2457                    other_input_exprs
2458                        .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
2459                    other_input_exprs.push_back(lit(range_length));
2460                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2461                        func,
2462                        args: other_input_exprs.clone().into(),
2463                    });
2464                    exprs.push(fn_expr);
2465                    let _ = other_input_exprs.pop_back();
2466                    let _ = other_input_exprs.remove(field_column_pos + 2);
2467                    let _ = other_input_exprs.remove(field_column_pos + 1);
2468                    let _ = other_input_exprs.remove(field_column_pos);
2469                }
2470                ScalarFunc::GeneratedExpr => {}
2471            }
2472        }
2473
2474        // Update value columns' name, and alias them to remove qualifiers
2475        // For label functions such as `label_join`, `label_replace`, etc.,
2476        // we keep the fields unchanged.
2477        if !matches!(func.name, "label_join" | "label_replace") {
2478            let mut new_field_columns = Vec::with_capacity(exprs.len());
2479
2480            exprs = exprs
2481                .into_iter()
2482                .map(|expr| {
2483                    let display_name = expr.schema_name().to_string();
2484                    new_field_columns.push(display_name.clone());
2485                    Ok(expr.alias(display_name))
2486                })
2487                .collect::<std::result::Result<Vec<_>, _>>()
2488                .context(DataFusionPlanningSnafu)?;
2489
2490            self.ctx.field_columns = new_field_columns;
2491        }
2492
2493        Ok((exprs, new_tags))
2494    }
2495
2496    /// Validate label name according to Prometheus specification.
2497    /// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
2498    /// Additionally, label names starting with double underscores are reserved for internal use.
2499    fn validate_label_name(label_name: &str) -> Result<()> {
2500        // Check if label name starts with double underscores (reserved)
2501        if label_name.starts_with("__") {
2502            return InvalidDestinationLabelNameSnafu { label_name }.fail();
2503        }
2504        // Check if label name matches the required pattern
2505        if !LABEL_NAME_REGEX.is_match(label_name) {
2506            return InvalidDestinationLabelNameSnafu { label_name }.fail();
2507        }
2508
2509        Ok(())
2510    }
2511
2512    /// Build expr for `label_replace` function
2513    fn build_regexp_replace_label_expr(
2514        &self,
2515        other_input_exprs: &mut VecDeque<DfExpr>,
2516        query_engine_state: &QueryEngineState,
2517    ) -> Result<Option<(DfExpr, String)>> {
2518        // label_replace(vector, dst_label, replacement, src_label, regex)
2519        let dst_label = match other_input_exprs.pop_front() {
2520            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2521            other => UnexpectedPlanExprSnafu {
2522                desc: format!("expected dst_label string literal, but found {:?}", other),
2523            }
2524            .fail()?,
2525        };
2526
2527        // Validate the destination label name
2528        Self::validate_label_name(&dst_label)?;
2529        let replacement = match other_input_exprs.pop_front() {
2530            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
2531            other => UnexpectedPlanExprSnafu {
2532                desc: format!("expected replacement string literal, but found {:?}", other),
2533            }
2534            .fail()?,
2535        };
2536        let src_label = match other_input_exprs.pop_front() {
2537            Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2538            other => UnexpectedPlanExprSnafu {
2539                desc: format!("expected src_label string literal, but found {:?}", other),
2540            }
2541            .fail()?,
2542        };
2543
2544        let regex = match other_input_exprs.pop_front() {
2545            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2546            other => UnexpectedPlanExprSnafu {
2547                desc: format!("expected regex string literal, but found {:?}", other),
2548            }
2549            .fail()?,
2550        };
2551
2552        // Validate the regex before using it
2553        // doc: https://prometheus.io/docs/prometheus/latest/querying/functions/#label_replace
2554        regex::Regex::new(&regex).map_err(|_| {
2555            InvalidRegularExpressionSnafu {
2556                regex: regex.clone(),
2557            }
2558            .build()
2559        })?;
2560
2561        // If the src_label exists and regex is empty, keep everything unchanged.
2562        if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2563            return Ok(None);
2564        }
2565
2566        // If the src_label doesn't exists, and
2567        if !self.ctx.tag_columns.contains(&src_label) {
2568            if replacement.is_empty() {
2569                // the replacement is empty, keep everything unchanged.
2570                return Ok(None);
2571            } else {
2572                // the replacement is not empty, always adds dst_label with replacement value.
2573                return Ok(Some((
2574                    // alias literal `replacement` as dst_label
2575                    lit(replacement).alias(&dst_label),
2576                    dst_label,
2577                )));
2578            }
2579        }
2580
2581        // Preprocess the regex:
2582        // https://github.com/prometheus/prometheus/blob/d902abc50d6652ba8fe9a81ff8e5cce936114eba/promql/functions.go#L1575C32-L1575C37
2583        let regex = format!("^(?s:{regex})$");
2584
2585        let session_state = query_engine_state.session_state();
2586        let func = session_state
2587            .scalar_functions()
2588            .get("regexp_replace")
2589            .context(UnsupportedExprSnafu {
2590                name: "regexp_replace",
2591            })?;
2592
2593        // regexp_replace(src_label, regex, replacement)
2594        let args = vec![
2595            if src_label.is_empty() {
2596                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2597            } else {
2598                DfExpr::Column(Column::from_name(src_label))
2599            },
2600            DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2601            DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2602        ];
2603
2604        Ok(Some((
2605            DfExpr::ScalarFunction(ScalarFunction {
2606                func: func.clone(),
2607                args,
2608            })
2609            .alias(&dst_label),
2610            dst_label,
2611        )))
2612    }
2613
2614    /// Build expr for `label_join` function
2615    fn build_concat_labels_expr(
2616        other_input_exprs: &mut VecDeque<DfExpr>,
2617        ctx: &PromPlannerContext,
2618        query_engine_state: &QueryEngineState,
2619    ) -> Result<(DfExpr, String)> {
2620        // label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)
2621
2622        let dst_label = match other_input_exprs.pop_front() {
2623            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2624            other => UnexpectedPlanExprSnafu {
2625                desc: format!("expected dst_label string literal, but found {:?}", other),
2626            }
2627            .fail()?,
2628        };
2629        let separator = match other_input_exprs.pop_front() {
2630            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2631            other => UnexpectedPlanExprSnafu {
2632                desc: format!("expected separator string literal, but found {:?}", other),
2633            }
2634            .fail()?,
2635        };
2636
2637        // Create a set of available columns (tag columns + field columns + time index column)
2638        let available_columns: HashSet<&str> = ctx
2639            .tag_columns
2640            .iter()
2641            .chain(ctx.field_columns.iter())
2642            .chain(ctx.time_index_column.as_ref())
2643            .map(|s| s.as_str())
2644            .collect();
2645
2646        let src_labels = other_input_exprs
2647            .iter()
2648            .map(|expr| {
2649                // Cast source label into column or null literal
2650                match expr {
2651                    DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2652                        if label.is_empty() {
2653                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2654                        } else if available_columns.contains(label.as_str()) {
2655                            // Label exists in the table schema
2656                            Ok(DfExpr::Column(Column::from_name(label)))
2657                        } else {
2658                            // Label doesn't exist, treat as empty string (null)
2659                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2660                        }
2661                    }
2662                    other => UnexpectedPlanExprSnafu {
2663                        desc: format!(
2664                            "expected source label string literal, but found {:?}",
2665                            other
2666                        ),
2667                    }
2668                    .fail(),
2669                }
2670            })
2671            .collect::<Result<Vec<_>>>()?;
2672        ensure!(
2673            !src_labels.is_empty(),
2674            FunctionInvalidArgumentSnafu {
2675                fn_name: "label_join"
2676            }
2677        );
2678
2679        let session_state = query_engine_state.session_state();
2680        let func = session_state
2681            .scalar_functions()
2682            .get("concat_ws")
2683            .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2684
2685        // concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
2686        let mut args = Vec::with_capacity(1 + src_labels.len());
2687        args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2688        args.extend(src_labels);
2689
2690        Ok((
2691            DfExpr::ScalarFunction(ScalarFunction {
2692                func: func.clone(),
2693                args,
2694            })
2695            .alias(&dst_label),
2696            dst_label,
2697        ))
2698    }
2699
2700    fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2701        Ok(DfExpr::Column(Column::from_name(
2702            self.ctx
2703                .time_index_column
2704                .clone()
2705                .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2706        )))
2707    }
2708
2709    fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2710        let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2711        for tag in &self.ctx.tag_columns {
2712            let expr = DfExpr::Column(Column::from_name(tag));
2713            result.push(expr);
2714        }
2715        Ok(result)
2716    }
2717
2718    fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2719        let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2720        for field in &self.ctx.field_columns {
2721            let expr = DfExpr::Column(Column::from_name(field));
2722            result.push(expr);
2723        }
2724        Ok(result)
2725    }
2726
2727    fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2728        let mut result = self
2729            .ctx
2730            .tag_columns
2731            .iter()
2732            .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2733            .collect::<Vec<_>>();
2734        result.push(self.create_time_index_column_expr()?.sort(true, true));
2735        Ok(result)
2736    }
2737
2738    fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2739        self.ctx
2740            .field_columns
2741            .iter()
2742            .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2743            .collect::<Vec<_>>()
2744    }
2745
2746    fn create_sort_exprs_by_tags(
2747        func: &str,
2748        tags: Vec<DfExpr>,
2749        asc: bool,
2750    ) -> Result<Vec<SortExpr>> {
2751        ensure!(
2752            !tags.is_empty(),
2753            FunctionInvalidArgumentSnafu { fn_name: func }
2754        );
2755
2756        tags.iter()
2757            .map(|col| match col {
2758                DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2759                    Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2760                }
2761                other => UnexpectedPlanExprSnafu {
2762                    desc: format!("expected label string literal, but found {:?}", other),
2763                }
2764                .fail(),
2765            })
2766            .collect::<Result<Vec<_>>>()
2767    }
2768
2769    fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2770        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2771        for value in &self.ctx.field_columns {
2772            let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2773            exprs.push(expr);
2774        }
2775
2776        // This error context should be computed lazily: the planner may set `ctx.table_name` to
2777        // `None` for derived expressions (e.g. after projecting the LHS of a vector-vector
2778        // comparison filter). Eagerly calling `table_ref()?` here can turn a valid plan into
2779        // a `TableNameNotFound` error even when `conjunction(exprs)` succeeds.
2780        conjunction(exprs).with_context(|| ValueNotFoundSnafu {
2781            table: self
2782                .table_ref()
2783                .map(|t| t.to_quoted_string())
2784                .unwrap_or_else(|_| "unknown".to_string()),
2785        })
2786    }
2787
2788    /// Creates a set of DataFusion `DfExpr::AggregateFunction` expressions for each value column using the specified aggregate function.
2789    ///
2790    /// # Side Effects
2791    ///
2792    /// This method modifies the value columns in the context by replacing them with the new columns
2793    /// created by the aggregate function application.
2794    ///
2795    /// # Returns
2796    ///
2797    /// Returns a tuple of `(aggregate_expressions, previous_field_expressions)` where:
2798    /// - `aggregate_expressions`: Expressions that apply the aggregate function to the original fields
2799    /// - `previous_field_expressions`: Original field expressions before aggregation. This is non-empty
2800    ///   only when the operation is `count_values`, as this operation requires preserving the original
2801    ///   values for grouping.
2802    ///
2803    fn create_aggregate_exprs(
2804        &mut self,
2805        op: TokenType,
2806        param: &Option<Box<PromExpr>>,
2807        input_plan: &LogicalPlan,
2808    ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2809        let mut non_col_args = Vec::new();
2810        let is_group_agg = op.id() == token::T_GROUP;
2811        if is_group_agg {
2812            ensure!(
2813                self.ctx.field_columns.len() == 1,
2814                MultiFieldsNotSupportedSnafu {
2815                    operator: "group()"
2816                }
2817            );
2818        }
2819        let aggr = match op.id() {
2820            token::T_SUM => sum_udaf(),
2821            token::T_QUANTILE => {
2822                let q =
2823                    Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2824                non_col_args.push(q);
2825                quantile_udaf()
2826            }
2827            token::T_AVG => avg_udaf(),
2828            token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2829            token::T_MIN => min_udaf(),
2830            token::T_MAX => max_udaf(),
2831            // PromQL's `group()` aggregator produces 1 for each group.
2832            // Use `max(1.0)` (per-group) to match semantics and output type (Float64).
2833            token::T_GROUP => max_udaf(),
2834            token::T_STDDEV => stddev_pop_udaf(),
2835            token::T_STDVAR => var_pop_udaf(),
2836            token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2837                name: format!("{op:?}"),
2838            }
2839            .fail()?,
2840            _ => UnexpectedTokenSnafu { token: op }.fail()?,
2841        };
2842
2843        // perform aggregate operation to each value column
2844        let exprs: Vec<DfExpr> = self
2845            .ctx
2846            .field_columns
2847            .iter()
2848            .map(|col| {
2849                if is_group_agg {
2850                    aggr.call(vec![lit(1_f64)])
2851                } else {
2852                    non_col_args.push(DfExpr::Column(Column::from_name(col)));
2853                    let expr = aggr.call(non_col_args.clone());
2854                    non_col_args.pop();
2855                    expr
2856                }
2857            })
2858            .collect::<Vec<_>>();
2859
2860        // if the aggregator is `count_values`, it must be grouped by current fields.
2861        let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2862            let prev_field_exprs: Vec<_> = self
2863                .ctx
2864                .field_columns
2865                .iter()
2866                .map(|col| DfExpr::Column(Column::from_name(col)))
2867                .collect();
2868
2869            ensure!(
2870                self.ctx.field_columns.len() == 1,
2871                UnsupportedExprSnafu {
2872                    name: "count_values on multi-value input"
2873                }
2874            );
2875
2876            prev_field_exprs
2877        } else {
2878            vec![]
2879        };
2880
2881        // update value column name according to the aggregators,
2882        let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2883
2884        let normalized_exprs =
2885            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2886        for expr in normalized_exprs {
2887            new_field_columns.push(expr.schema_name().to_string());
2888        }
2889        self.ctx.field_columns = new_field_columns;
2890
2891        Ok((exprs, prev_field_exprs))
2892    }
2893
2894    fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2895        let param = param
2896            .as_deref()
2897            .with_context(|| FunctionInvalidArgumentSnafu {
2898                fn_name: op.to_string(),
2899            })?;
2900        let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2901            return FunctionInvalidArgumentSnafu {
2902                fn_name: op.to_string(),
2903            }
2904            .fail();
2905        };
2906
2907        Ok(val)
2908    }
2909
2910    fn get_param_as_literal_expr(
2911        param: &Option<Box<PromExpr>>,
2912        op: Option<TokenType>,
2913        expected_type: Option<ArrowDataType>,
2914    ) -> Result<DfExpr> {
2915        let prom_param = param.as_deref().with_context(|| {
2916            if let Some(op) = op {
2917                FunctionInvalidArgumentSnafu {
2918                    fn_name: op.to_string(),
2919                }
2920            } else {
2921                FunctionInvalidArgumentSnafu {
2922                    fn_name: "unknown".to_string(),
2923                }
2924            }
2925        })?;
2926
2927        let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2928            if let Some(op) = op {
2929                FunctionInvalidArgumentSnafu {
2930                    fn_name: op.to_string(),
2931                }
2932            } else {
2933                FunctionInvalidArgumentSnafu {
2934                    fn_name: "unknown".to_string(),
2935                }
2936            }
2937        })?;
2938
2939        // check if the type is expected
2940        if let Some(expected_type) = expected_type {
2941            // literal should not have reference to column
2942            let expr_type = expr
2943                .get_type(&DFSchema::empty())
2944                .context(DataFusionPlanningSnafu)?;
2945            if expected_type != expr_type {
2946                return FunctionInvalidArgumentSnafu {
2947                    fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2948                }
2949                .fail();
2950            }
2951        }
2952
2953        Ok(expr)
2954    }
2955
2956    /// Create [DfExpr::WindowFunction] expr for each value column with given window function.
2957    ///
2958    fn create_window_exprs(
2959        &mut self,
2960        op: TokenType,
2961        group_exprs: Vec<DfExpr>,
2962        input_plan: &LogicalPlan,
2963    ) -> Result<Vec<DfExpr>> {
2964        ensure!(
2965            self.ctx.field_columns.len() == 1,
2966            UnsupportedExprSnafu {
2967                name: "topk or bottomk on multi-value input"
2968            }
2969        );
2970
2971        assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2972
2973        let asc = matches!(op.id(), token::T_BOTTOMK);
2974
2975        let tag_sort_exprs = self
2976            .create_tag_column_exprs()?
2977            .into_iter()
2978            .map(|expr| expr.sort(asc, true));
2979
2980        // perform window operation to each value column
2981        let exprs: Vec<DfExpr> = self
2982            .ctx
2983            .field_columns
2984            .iter()
2985            .map(|col| {
2986                let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2987                // Order by value in the specific order
2988                sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2989                // Then tags if the values are equal,
2990                // Try to ensure the relative stability of the output results.
2991                sort_exprs.extend(tag_sort_exprs.clone());
2992
2993                DfExpr::WindowFunction(Box::new(WindowFunction {
2994                    fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2995                    params: WindowFunctionParams {
2996                        args: vec![],
2997                        partition_by: group_exprs.clone(),
2998                        order_by: sort_exprs,
2999                        window_frame: WindowFrame::new(Some(true)),
3000                        null_treatment: None,
3001                        distinct: false,
3002                        filter: None,
3003                    },
3004                }))
3005            })
3006            .collect();
3007
3008        let normalized_exprs =
3009            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
3010        Ok(normalized_exprs)
3011    }
3012
3013    /// Try to build a [f64] from [PromExpr].
3014    #[deprecated(
3015        note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
3016    )]
3017    fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
3018        match expr {
3019            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
3020            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
3021            PromExpr::Unary(UnaryExpr { expr, .. }) => {
3022                Self::try_build_float_literal(expr).map(|f| -f)
3023            }
3024            PromExpr::StringLiteral(_)
3025            | PromExpr::Binary(_)
3026            | PromExpr::VectorSelector(_)
3027            | PromExpr::MatrixSelector(_)
3028            | PromExpr::Call(_)
3029            | PromExpr::Extension(_)
3030            | PromExpr::Aggregate(_)
3031            | PromExpr::Subquery(_) => None,
3032        }
3033    }
3034
3035    /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
3036    async fn create_histogram_plan(
3037        &mut self,
3038        args: &PromFunctionArgs,
3039        query_engine_state: &QueryEngineState,
3040    ) -> Result<LogicalPlan> {
3041        if args.args.len() != 2 {
3042            return FunctionInvalidArgumentSnafu {
3043                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3044            }
3045            .fail();
3046        }
3047        #[allow(deprecated)]
3048        let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
3049            FunctionInvalidArgumentSnafu {
3050                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3051            }
3052        })?;
3053
3054        let input = args.args[1].as_ref().clone();
3055        let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
3056        // `histogram_quantile` folds buckets across `le`, so `__tsid` (which includes `le`) is not
3057        // a stable series identifier anymore. Also, HistogramFold infers label columns from the
3058        // input schema and must not treat `__tsid` as a label column.
3059        let input_plan = self.strip_tsid_column(input_plan)?;
3060        self.ctx.use_tsid = false;
3061
3062        if !self.ctx.has_le_tag() {
3063            // Return empty result instead of error when 'le' column is not found
3064            // This handles the case when histogram metrics don't exist
3065            return Ok(LogicalPlan::EmptyRelation(
3066                datafusion::logical_expr::EmptyRelation {
3067                    produce_one_row: false,
3068                    schema: Arc::new(DFSchema::empty()),
3069                },
3070            ));
3071        }
3072        let time_index_column =
3073            self.ctx
3074                .time_index_column
3075                .clone()
3076                .with_context(|| TimeIndexNotFoundSnafu {
3077                    table: self.ctx.table_name.clone().unwrap_or_default(),
3078                })?;
3079        // FIXME(ruihang): support multi fields
3080        let field_column = self
3081            .ctx
3082            .field_columns
3083            .first()
3084            .with_context(|| FunctionInvalidArgumentSnafu {
3085                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3086            })?
3087            .clone();
3088        // remove le column from tag columns
3089        self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
3090
3091        Ok(LogicalPlan::Extension(Extension {
3092            node: Arc::new(
3093                HistogramFold::new(
3094                    LE_COLUMN_NAME.to_string(),
3095                    field_column,
3096                    time_index_column,
3097                    phi,
3098                    input_plan,
3099                )
3100                .context(DataFusionPlanningSnafu)?,
3101            ),
3102        }))
3103    }
3104
3105    /// Create a [SPECIAL_VECTOR_FUNCTION] plan
3106    async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
3107        if args.args.len() != 1 {
3108            return FunctionInvalidArgumentSnafu {
3109                fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
3110            }
3111            .fail();
3112        }
3113        let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
3114
3115        // reuse `SPECIAL_TIME_FUNCTION` as name of time index column
3116        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
3117        self.ctx.reset_table_name_and_schema();
3118        self.ctx.tag_columns = vec![];
3119        self.ctx.field_columns = vec![greptime_value().to_string()];
3120        Ok(LogicalPlan::Extension(Extension {
3121            node: Arc::new(
3122                EmptyMetric::new(
3123                    self.ctx.start,
3124                    self.ctx.end,
3125                    self.ctx.interval,
3126                    SPECIAL_TIME_FUNCTION.to_string(),
3127                    greptime_value().to_string(),
3128                    Some(lit),
3129                )
3130                .context(DataFusionPlanningSnafu)?,
3131            ),
3132        }))
3133    }
3134
3135    /// Create a [SCALAR_FUNCTION] plan
3136    async fn create_scalar_plan(
3137        &mut self,
3138        args: &PromFunctionArgs,
3139        query_engine_state: &QueryEngineState,
3140    ) -> Result<LogicalPlan> {
3141        ensure!(
3142            args.len() == 1,
3143            FunctionInvalidArgumentSnafu {
3144                fn_name: SCALAR_FUNCTION
3145            }
3146        );
3147        let input = self
3148            .prom_expr_to_plan(&args.args[0], query_engine_state)
3149            .await?;
3150        ensure!(
3151            self.ctx.field_columns.len() == 1,
3152            MultiFieldsNotSupportedSnafu {
3153                operator: SCALAR_FUNCTION
3154            },
3155        );
3156        let scalar_plan = LogicalPlan::Extension(Extension {
3157            node: Arc::new(
3158                ScalarCalculate::new(
3159                    self.ctx.start,
3160                    self.ctx.end,
3161                    self.ctx.interval,
3162                    input,
3163                    self.ctx.time_index_column.as_ref().unwrap(),
3164                    &self.ctx.tag_columns,
3165                    &self.ctx.field_columns[0],
3166                    self.ctx.table_name.as_deref(),
3167                )
3168                .context(PromqlPlanNodeSnafu)?,
3169            ),
3170        });
3171        // scalar plan have no tag columns
3172        self.ctx.tag_columns.clear();
3173        self.ctx.field_columns.clear();
3174        self.ctx
3175            .field_columns
3176            .push(scalar_plan.schema().field(1).name().clone());
3177        Ok(scalar_plan)
3178    }
3179
3180    /// Create a [SPECIAL_ABSENT_FUNCTION] plan
3181    async fn create_absent_plan(
3182        &mut self,
3183        args: &PromFunctionArgs,
3184        query_engine_state: &QueryEngineState,
3185    ) -> Result<LogicalPlan> {
3186        if args.args.len() != 1 {
3187            return FunctionInvalidArgumentSnafu {
3188                fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
3189            }
3190            .fail();
3191        }
3192        let input = self
3193            .prom_expr_to_plan(&args.args[0], query_engine_state)
3194            .await?;
3195
3196        let time_index_expr = self.create_time_index_column_expr()?;
3197        let first_field_expr =
3198            self.create_field_column_exprs()?
3199                .pop()
3200                .with_context(|| ValueNotFoundSnafu {
3201                    table: self.ctx.table_name.clone().unwrap_or_default(),
3202                })?;
3203        let first_value_expr = first_value(first_field_expr, vec![]);
3204
3205        let ordered_aggregated_input = LogicalPlanBuilder::from(input)
3206            .aggregate(
3207                vec![time_index_expr.clone()],
3208                vec![first_value_expr.clone()],
3209            )
3210            .context(DataFusionPlanningSnafu)?
3211            .sort(vec![time_index_expr.sort(true, false)])
3212            .context(DataFusionPlanningSnafu)?
3213            .build()
3214            .context(DataFusionPlanningSnafu)?;
3215
3216        let fake_labels = self
3217            .ctx
3218            .selector_matcher
3219            .iter()
3220            .filter_map(|matcher| match matcher.op {
3221                MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
3222                _ => None,
3223            })
3224            .collect::<Vec<_>>();
3225
3226        // Create the absent plan
3227        let absent_plan = LogicalPlan::Extension(Extension {
3228            node: Arc::new(
3229                Absent::try_new(
3230                    self.ctx.start,
3231                    self.ctx.end,
3232                    self.ctx.interval,
3233                    self.ctx.time_index_column.as_ref().unwrap().clone(),
3234                    self.ctx.field_columns[0].clone(),
3235                    fake_labels,
3236                    ordered_aggregated_input,
3237                )
3238                .context(DataFusionPlanningSnafu)?,
3239            ),
3240        });
3241
3242        Ok(absent_plan)
3243    }
3244
3245    /// Try to build a DataFusion Literal Expression from PromQL Expr, return
3246    /// `None` if the input is not a literal expression.
3247    fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
3248        match expr {
3249            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
3250            PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
3251            PromExpr::VectorSelector(_)
3252            | PromExpr::MatrixSelector(_)
3253            | PromExpr::Extension(_)
3254            | PromExpr::Aggregate(_)
3255            | PromExpr::Subquery(_) => None,
3256            PromExpr::Call(Call { func, .. }) => {
3257                if func.name == SPECIAL_TIME_FUNCTION {
3258                    // For time() function, don't treat it as a literal
3259                    // Let it be handled as a regular function call
3260                    None
3261                } else {
3262                    None
3263                }
3264            }
3265            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
3266            // TODO(ruihang): support Unary operator
3267            PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
3268            PromExpr::Binary(PromBinaryExpr {
3269                lhs,
3270                rhs,
3271                op,
3272                modifier,
3273            }) => {
3274                let lhs = Self::try_build_literal_expr(lhs)?;
3275                let rhs = Self::try_build_literal_expr(rhs)?;
3276                let is_comparison_op = Self::is_token_a_comparison_op(*op);
3277                let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
3278                let expr = expr_builder(lhs, rhs).ok()?;
3279
3280                let should_return_bool = if let Some(m) = modifier {
3281                    m.return_bool
3282                } else {
3283                    false
3284                };
3285                if is_comparison_op && should_return_bool {
3286                    Some(DfExpr::Cast(Cast {
3287                        expr: Box::new(expr),
3288                        data_type: ArrowDataType::Float64,
3289                    }))
3290                } else {
3291                    Some(expr)
3292                }
3293            }
3294        }
3295    }
3296
3297    fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
3298        match expr {
3299            PromExpr::Call(Call { func, .. }) => {
3300                if func.name == SPECIAL_TIME_FUNCTION
3301                    && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
3302                {
3303                    Some(build_special_time_expr(time_index_col))
3304                } else {
3305                    None
3306                }
3307            }
3308            _ => None,
3309        }
3310    }
3311
3312    /// Return a lambda to build binary expression from token.
3313    /// Because some binary operator are function in DataFusion like `atan2` or `^`.
3314    #[allow(clippy::type_complexity)]
3315    fn prom_token_to_binary_expr_builder(
3316        token: TokenType,
3317    ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
3318        let cast_float = |expr| {
3319            if matches!(
3320                &expr,
3321                DfExpr::Cast(Cast {
3322                    data_type: ArrowDataType::Float64,
3323                    ..
3324                })
3325            ) || matches!(&expr, DfExpr::Literal(ScalarValue::Float64(_), _))
3326            {
3327                expr
3328            } else {
3329                DfExpr::Cast(Cast {
3330                    expr: Box::new(expr),
3331                    data_type: ArrowDataType::Float64,
3332                })
3333            }
3334        };
3335        match token.id() {
3336            token::T_ADD => Ok(Box::new(move |lhs, rhs| {
3337                Ok(cast_float(lhs) + cast_float(rhs))
3338            })),
3339            token::T_SUB => Ok(Box::new(move |lhs, rhs| {
3340                Ok(cast_float(lhs) - cast_float(rhs))
3341            })),
3342            token::T_MUL => Ok(Box::new(move |lhs, rhs| {
3343                Ok(cast_float(lhs) * cast_float(rhs))
3344            })),
3345            token::T_DIV => Ok(Box::new(move |lhs, rhs| {
3346                Ok(cast_float(lhs) / cast_float(rhs))
3347            })),
3348            token::T_MOD => Ok(Box::new(move |lhs: DfExpr, rhs| {
3349                Ok(cast_float(lhs) % cast_float(rhs))
3350            })),
3351            token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
3352            token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
3353            token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
3354            token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
3355            token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
3356            token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
3357            token::T_POW => Ok(Box::new(move |lhs, rhs| {
3358                Ok(DfExpr::ScalarFunction(ScalarFunction {
3359                    func: datafusion_functions::math::power(),
3360                    args: vec![cast_float(lhs), cast_float(rhs)],
3361                }))
3362            })),
3363            token::T_ATAN2 => Ok(Box::new(move |lhs, rhs| {
3364                Ok(DfExpr::ScalarFunction(ScalarFunction {
3365                    func: datafusion_functions::math::atan2(),
3366                    args: vec![cast_float(lhs), cast_float(rhs)],
3367                }))
3368            })),
3369            _ => UnexpectedTokenSnafu { token }.fail(),
3370        }
3371    }
3372
3373    /// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
3374    fn is_token_a_comparison_op(token: TokenType) -> bool {
3375        matches!(
3376            token.id(),
3377            token::T_EQLC
3378                | token::T_NEQ
3379                | token::T_GTR
3380                | token::T_LSS
3381                | token::T_GTE
3382                | token::T_LTE
3383        )
3384    }
3385
3386    /// Check if the given op is a set operator (UNION, INTERSECT and EXCEPT in SQL).
3387    fn is_token_a_set_op(token: TokenType) -> bool {
3388        matches!(
3389            token.id(),
3390            token::T_LAND // INTERSECT
3391                | token::T_LOR // UNION
3392                | token::T_LUNLESS // EXCEPT
3393        )
3394    }
3395
3396    /// Build a inner join on time index column and tag columns to concat two logical plans.
3397    /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
3398    #[allow(clippy::too_many_arguments)]
3399    fn join_on_non_field_columns(
3400        &self,
3401        left: LogicalPlan,
3402        right: LogicalPlan,
3403        left_table_ref: TableReference,
3404        right_table_ref: TableReference,
3405        left_time_index_column: Option<String>,
3406        right_time_index_column: Option<String>,
3407        only_join_time_index: bool,
3408        modifier: &Option<BinModifier>,
3409    ) -> Result<LogicalPlan> {
3410        let mut left_tag_columns = if only_join_time_index {
3411            BTreeSet::new()
3412        } else {
3413            self.ctx
3414                .tag_columns
3415                .iter()
3416                .cloned()
3417                .collect::<BTreeSet<_>>()
3418        };
3419        let mut right_tag_columns = left_tag_columns.clone();
3420
3421        // apply modifier
3422        if let Some(modifier) = modifier {
3423            // apply label modifier
3424            if let Some(matching) = &modifier.matching {
3425                match matching {
3426                    // keeps columns mentioned in `on`
3427                    LabelModifier::Include(on) => {
3428                        let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
3429                        left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
3430                        right_tag_columns =
3431                            right_tag_columns.intersection(&mask).cloned().collect();
3432                    }
3433                    // removes columns memtioned in `ignoring`
3434                    LabelModifier::Exclude(ignoring) => {
3435                        // doesn't check existence of label
3436                        for label in &ignoring.labels {
3437                            let _ = left_tag_columns.remove(label);
3438                            let _ = right_tag_columns.remove(label);
3439                        }
3440                    }
3441                }
3442            }
3443        }
3444
3445        // push time index column if it exists
3446        if let (Some(left_time_index_column), Some(right_time_index_column)) =
3447            (left_time_index_column, right_time_index_column)
3448        {
3449            left_tag_columns.insert(left_time_index_column);
3450            right_tag_columns.insert(right_time_index_column);
3451        }
3452
3453        let right = LogicalPlanBuilder::from(right)
3454            .alias(right_table_ref)
3455            .context(DataFusionPlanningSnafu)?
3456            .build()
3457            .context(DataFusionPlanningSnafu)?;
3458
3459        // Inner Join on time index column to concat two operator
3460        LogicalPlanBuilder::from(left)
3461            .alias(left_table_ref)
3462            .context(DataFusionPlanningSnafu)?
3463            .join_detailed(
3464                right,
3465                JoinType::Inner,
3466                (
3467                    left_tag_columns
3468                        .into_iter()
3469                        .map(Column::from_name)
3470                        .collect::<Vec<_>>(),
3471                    right_tag_columns
3472                        .into_iter()
3473                        .map(Column::from_name)
3474                        .collect::<Vec<_>>(),
3475                ),
3476                None,
3477                NullEquality::NullEqualsNull,
3478            )
3479            .context(DataFusionPlanningSnafu)?
3480            .build()
3481            .context(DataFusionPlanningSnafu)
3482    }
3483
3484    /// Build a set operator (AND/OR/UNLESS)
3485    fn set_op_on_non_field_columns(
3486        &mut self,
3487        left: LogicalPlan,
3488        mut right: LogicalPlan,
3489        left_context: PromPlannerContext,
3490        right_context: PromPlannerContext,
3491        op: TokenType,
3492        modifier: &Option<BinModifier>,
3493    ) -> Result<LogicalPlan> {
3494        let mut left_tag_col_set = left_context
3495            .tag_columns
3496            .iter()
3497            .cloned()
3498            .collect::<HashSet<_>>();
3499        let mut right_tag_col_set = right_context
3500            .tag_columns
3501            .iter()
3502            .cloned()
3503            .collect::<HashSet<_>>();
3504
3505        if matches!(op.id(), token::T_LOR) {
3506            return self.or_operator(
3507                left,
3508                right,
3509                left_tag_col_set,
3510                right_tag_col_set,
3511                left_context,
3512                right_context,
3513                modifier,
3514            );
3515        }
3516
3517        // apply modifier
3518        if let Some(modifier) = modifier {
3519            // one-to-many and many-to-one are not supported
3520            ensure!(
3521                matches!(
3522                    modifier.card,
3523                    VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
3524                ),
3525                UnsupportedVectorMatchSnafu {
3526                    name: modifier.card.clone(),
3527                },
3528            );
3529            // apply label modifier
3530            if let Some(matching) = &modifier.matching {
3531                match matching {
3532                    // keeps columns mentioned in `on`
3533                    LabelModifier::Include(on) => {
3534                        let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
3535                        left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
3536                        right_tag_col_set =
3537                            right_tag_col_set.intersection(&mask).cloned().collect();
3538                    }
3539                    // removes columns memtioned in `ignoring`
3540                    LabelModifier::Exclude(ignoring) => {
3541                        // doesn't check existence of label
3542                        for label in &ignoring.labels {
3543                            let _ = left_tag_col_set.remove(label);
3544                            let _ = right_tag_col_set.remove(label);
3545                        }
3546                    }
3547                }
3548            }
3549        }
3550        // ensure two sides have the same tag columns
3551        if !matches!(op.id(), token::T_LOR) {
3552            ensure!(
3553                left_tag_col_set == right_tag_col_set,
3554                CombineTableColumnMismatchSnafu {
3555                    left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
3556                    right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
3557                }
3558            )
3559        };
3560        let left_time_index = left_context.time_index_column.clone().unwrap();
3561        let right_time_index = right_context.time_index_column.clone().unwrap();
3562        let join_keys = left_tag_col_set
3563            .iter()
3564            .cloned()
3565            .chain([left_time_index.clone()])
3566            .collect::<Vec<_>>();
3567        self.ctx.time_index_column = Some(left_time_index.clone());
3568        self.ctx.use_tsid = left_context.use_tsid;
3569
3570        // alias right time index column if necessary
3571        if left_context.time_index_column != right_context.time_index_column {
3572            let right_project_exprs = right
3573                .schema()
3574                .fields()
3575                .iter()
3576                .map(|field| {
3577                    if field.name() == &right_time_index {
3578                        DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
3579                    } else {
3580                        DfExpr::Column(Column::from_name(field.name()))
3581                    }
3582                })
3583                .collect::<Vec<_>>();
3584
3585            right = LogicalPlanBuilder::from(right)
3586                .project(right_project_exprs)
3587                .context(DataFusionPlanningSnafu)?
3588                .build()
3589                .context(DataFusionPlanningSnafu)?;
3590        }
3591
3592        ensure!(
3593            left_context.field_columns.len() == 1,
3594            MultiFieldsNotSupportedSnafu {
3595                operator: "AND operator"
3596            }
3597        );
3598        // Update the field column in context.
3599        // The AND/UNLESS operator only keep the field column in left input.
3600        let left_field_col = left_context.field_columns.first().unwrap();
3601        self.ctx.field_columns = vec![left_field_col.clone()];
3602
3603        // Generate join plan.
3604        // All set operations in PromQL are "distinct"
3605        match op.id() {
3606            token::T_LAND => LogicalPlanBuilder::from(left)
3607                .distinct()
3608                .context(DataFusionPlanningSnafu)?
3609                .join_detailed(
3610                    right,
3611                    JoinType::LeftSemi,
3612                    (join_keys.clone(), join_keys),
3613                    None,
3614                    NullEquality::NullEqualsNull,
3615                )
3616                .context(DataFusionPlanningSnafu)?
3617                .build()
3618                .context(DataFusionPlanningSnafu),
3619            token::T_LUNLESS => LogicalPlanBuilder::from(left)
3620                .distinct()
3621                .context(DataFusionPlanningSnafu)?
3622                .join_detailed(
3623                    right,
3624                    JoinType::LeftAnti,
3625                    (join_keys.clone(), join_keys),
3626                    None,
3627                    NullEquality::NullEqualsNull,
3628                )
3629                .context(DataFusionPlanningSnafu)?
3630                .build()
3631                .context(DataFusionPlanningSnafu),
3632            token::T_LOR => {
3633                // OR is handled at the beginning of this function, as it cannot
3634                // be expressed using JOIN like AND and UNLESS.
3635                unreachable!()
3636            }
3637            _ => UnexpectedTokenSnafu { token: op }.fail(),
3638        }
3639    }
3640
3641    // TODO(ruihang): change function name
3642    #[allow(clippy::too_many_arguments)]
3643    fn or_operator(
3644        &mut self,
3645        left: LogicalPlan,
3646        right: LogicalPlan,
3647        left_tag_cols_set: HashSet<String>,
3648        right_tag_cols_set: HashSet<String>,
3649        left_context: PromPlannerContext,
3650        right_context: PromPlannerContext,
3651        modifier: &Option<BinModifier>,
3652    ) -> Result<LogicalPlan> {
3653        // checks
3654        ensure!(
3655            left_context.field_columns.len() == right_context.field_columns.len(),
3656            CombineTableColumnMismatchSnafu {
3657                left: left_context.field_columns.clone(),
3658                right: right_context.field_columns.clone()
3659            }
3660        );
3661        ensure!(
3662            left_context.field_columns.len() == 1,
3663            MultiFieldsNotSupportedSnafu {
3664                operator: "OR operator"
3665            }
3666        );
3667
3668        // prepare hash sets
3669        let all_tags = left_tag_cols_set
3670            .union(&right_tag_cols_set)
3671            .cloned()
3672            .collect::<HashSet<_>>();
3673        let tags_not_in_left = all_tags
3674            .difference(&left_tag_cols_set)
3675            .cloned()
3676            .collect::<Vec<_>>();
3677        let tags_not_in_right = all_tags
3678            .difference(&right_tag_cols_set)
3679            .cloned()
3680            .collect::<Vec<_>>();
3681        let left_qualifier = left.schema().qualified_field(0).0.cloned();
3682        let right_qualifier = right.schema().qualified_field(0).0.cloned();
3683        let left_qualifier_string = left_qualifier
3684            .as_ref()
3685            .map(|l| l.to_string())
3686            .unwrap_or_default();
3687        let right_qualifier_string = right_qualifier
3688            .as_ref()
3689            .map(|r| r.to_string())
3690            .unwrap_or_default();
3691        let left_time_index_column =
3692            left_context
3693                .time_index_column
3694                .clone()
3695                .with_context(|| TimeIndexNotFoundSnafu {
3696                    table: left_qualifier_string.clone(),
3697                })?;
3698        let right_time_index_column =
3699            right_context
3700                .time_index_column
3701                .clone()
3702                .with_context(|| TimeIndexNotFoundSnafu {
3703                    table: right_qualifier_string.clone(),
3704                })?;
3705        // Take the name of first field column. The length is checked above.
3706        let left_field_col = left_context.field_columns.first().unwrap();
3707        let right_field_col = right_context.field_columns.first().unwrap();
3708        let left_has_tsid = left
3709            .schema()
3710            .fields()
3711            .iter()
3712            .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3713        let right_has_tsid = right
3714            .schema()
3715            .fields()
3716            .iter()
3717            .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3718
3719        // step 0: fill all columns in output schema
3720        let mut all_columns_set = left
3721            .schema()
3722            .fields()
3723            .iter()
3724            .chain(right.schema().fields().iter())
3725            .map(|field| field.name().clone())
3726            .collect::<HashSet<_>>();
3727        // Keep `__tsid` only when both sides contain it, otherwise it may break schema alignment
3728        // (e.g. `unknown_metric or some_metric`).
3729        if !(left_has_tsid && right_has_tsid) {
3730            all_columns_set.remove(DATA_SCHEMA_TSID_COLUMN_NAME);
3731        }
3732        // remove time index column
3733        all_columns_set.remove(&left_time_index_column);
3734        all_columns_set.remove(&right_time_index_column);
3735        // remove field column in the right
3736        if left_field_col != right_field_col {
3737            all_columns_set.remove(right_field_col);
3738        }
3739        let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3740        // sort to ensure the generated schema is not volatile
3741        all_columns.sort_unstable();
3742        // use left time index column name as the result time index column name
3743        all_columns.insert(0, left_time_index_column.clone());
3744
3745        // step 1: align schema using project, fill non-exist columns with null
3746        let left_proj_exprs = all_columns.iter().map(|col| {
3747            if tags_not_in_left.contains(col) {
3748                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3749            } else {
3750                DfExpr::Column(Column::new(None::<String>, col))
3751            }
3752        });
3753        let right_time_index_expr = DfExpr::Column(Column::new(
3754            right_qualifier.clone(),
3755            right_time_index_column,
3756        ))
3757        .alias(left_time_index_column.clone());
3758        // The field column in right side may not have qualifier (it may be removed by join operation),
3759        // so we need to find it from the schema.
3760        let right_qualifier_for_field = right
3761            .schema()
3762            .iter()
3763            .find(|(_, f)| f.name() == right_field_col)
3764            .map(|(q, _)| q)
3765            .with_context(|| ColumnNotFoundSnafu {
3766                col: right_field_col.clone(),
3767            })?
3768            .cloned();
3769
3770        // `skip(1)` to skip the time index column
3771        let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3772            // expr
3773            if col == left_field_col && left_field_col != right_field_col {
3774                // qualify field in right side if necessary to handle different field name
3775                DfExpr::Column(Column::new(
3776                    right_qualifier_for_field.clone(),
3777                    right_field_col,
3778                ))
3779            } else if tags_not_in_right.contains(col) {
3780                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3781            } else {
3782                DfExpr::Column(Column::new(None::<String>, col))
3783            }
3784        });
3785        let right_proj_exprs = [right_time_index_expr]
3786            .into_iter()
3787            .chain(right_proj_exprs_without_time_index);
3788
3789        let left_projected = LogicalPlanBuilder::from(left)
3790            .project(left_proj_exprs)
3791            .context(DataFusionPlanningSnafu)?
3792            .alias(left_qualifier_string.clone())
3793            .context(DataFusionPlanningSnafu)?
3794            .build()
3795            .context(DataFusionPlanningSnafu)?;
3796        let right_projected = LogicalPlanBuilder::from(right)
3797            .project(right_proj_exprs)
3798            .context(DataFusionPlanningSnafu)?
3799            .alias(right_qualifier_string.clone())
3800            .context(DataFusionPlanningSnafu)?
3801            .build()
3802            .context(DataFusionPlanningSnafu)?;
3803
3804        // step 2: compute match columns
3805        let mut match_columns = if let Some(modifier) = modifier
3806            && let Some(matching) = &modifier.matching
3807        {
3808            match matching {
3809                // keeps columns mentioned in `on`
3810                LabelModifier::Include(on) => on.labels.clone(),
3811                // removes columns memtioned in `ignoring`
3812                LabelModifier::Exclude(ignoring) => {
3813                    let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3814                    all_tags.difference(&ignoring).cloned().collect()
3815                }
3816            }
3817        } else {
3818            all_tags.iter().cloned().collect()
3819        };
3820        // sort to ensure the generated plan is not volatile
3821        match_columns.sort_unstable();
3822        // step 3: build `UnionDistinctOn` plan
3823        let schema = left_projected.schema().clone();
3824        let union_distinct_on = UnionDistinctOn::new(
3825            left_projected,
3826            right_projected,
3827            match_columns,
3828            left_time_index_column.clone(),
3829            schema,
3830        );
3831        let result = LogicalPlan::Extension(Extension {
3832            node: Arc::new(union_distinct_on),
3833        });
3834
3835        // step 4: update context
3836        self.ctx.time_index_column = Some(left_time_index_column);
3837        self.ctx.tag_columns = all_tags.into_iter().collect();
3838        self.ctx.field_columns = vec![left_field_col.clone()];
3839        self.ctx.use_tsid = left_has_tsid && right_has_tsid;
3840
3841        Ok(result)
3842    }
3843
3844    /// Build a projection that project and perform operation expr for every value columns.
3845    /// Non-value columns (tag and timestamp) will be preserved in the projection.
3846    ///
3847    /// # Side effect
3848    ///
3849    /// This function will update the value columns in the context. Those new column names
3850    /// don't contains qualifier.
3851    fn projection_for_each_field_column<F>(
3852        &mut self,
3853        input: LogicalPlan,
3854        name_to_expr: F,
3855    ) -> Result<LogicalPlan>
3856    where
3857        F: FnMut(&String) -> Result<DfExpr>,
3858    {
3859        let non_field_columns_iter = self
3860            .ctx
3861            .tag_columns
3862            .iter()
3863            .chain(self.ctx.time_index_column.iter())
3864            .map(|col| {
3865                Ok(DfExpr::Column(Column::new(
3866                    self.ctx.table_name.clone().map(TableReference::bare),
3867                    col,
3868                )))
3869            });
3870
3871        // build computation exprs
3872        let result_field_columns = self
3873            .ctx
3874            .field_columns
3875            .iter()
3876            .map(name_to_expr)
3877            .collect::<Result<Vec<_>>>()?;
3878
3879        // alias the computation exprs to remove qualifier
3880        self.ctx.field_columns = result_field_columns
3881            .iter()
3882            .map(|expr| expr.schema_name().to_string())
3883            .collect();
3884        let field_columns_iter = result_field_columns
3885            .into_iter()
3886            .zip(self.ctx.field_columns.iter())
3887            .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3888
3889        // chain non-field columns (unchanged) and field columns (applied computation then alias)
3890        let project_fields = non_field_columns_iter
3891            .chain(field_columns_iter)
3892            .collect::<Result<Vec<_>>>()?;
3893
3894        LogicalPlanBuilder::from(input)
3895            .project(project_fields)
3896            .context(DataFusionPlanningSnafu)?
3897            .build()
3898            .context(DataFusionPlanningSnafu)
3899    }
3900
3901    /// Build a filter plan that filter on value column. Notice that only one value column
3902    /// is expected.
3903    fn filter_on_field_column<F>(
3904        &self,
3905        input: LogicalPlan,
3906        mut name_to_expr: F,
3907    ) -> Result<LogicalPlan>
3908    where
3909        F: FnMut(&String) -> Result<DfExpr>,
3910    {
3911        ensure!(
3912            self.ctx.field_columns.len() == 1,
3913            UnsupportedExprSnafu {
3914                name: "filter on multi-value input"
3915            }
3916        );
3917
3918        let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3919
3920        LogicalPlanBuilder::from(input)
3921            .filter(field_column_filter)
3922            .context(DataFusionPlanningSnafu)?
3923            .build()
3924            .context(DataFusionPlanningSnafu)
3925    }
3926
3927    /// Generate an expr like `date_part("hour", <TIME_INDEX>)`. Caller should ensure the
3928    /// time index column in context is set
3929    fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3930        let input_expr = datafusion::logical_expr::col(
3931            self.ctx
3932                .time_index_column
3933                .as_ref()
3934                // table name doesn't matters here
3935                .with_context(|| TimeIndexNotFoundSnafu {
3936                    table: "<doesn't matter>",
3937                })?,
3938        );
3939        let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3940            func: datafusion_functions::datetime::date_part(),
3941            args: vec![date_part.lit(), input_expr],
3942        });
3943        Ok(fn_expr)
3944    }
3945
3946    fn strip_tsid_column(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
3947        let schema = plan.schema();
3948        if !schema
3949            .fields()
3950            .iter()
3951            .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
3952        {
3953            return Ok(plan);
3954        }
3955
3956        let project_exprs = schema
3957            .fields()
3958            .iter()
3959            .filter(|field| field.name() != DATA_SCHEMA_TSID_COLUMN_NAME)
3960            .map(|field| Ok(DfExpr::Column(Column::from_name(field.name().clone()))))
3961            .collect::<Result<Vec<_>>>()?;
3962
3963        LogicalPlanBuilder::from(plan)
3964            .project(project_exprs)
3965            .context(DataFusionPlanningSnafu)?
3966            .build()
3967            .context(DataFusionPlanningSnafu)
3968    }
3969
3970    /// Apply an alias to the query result by adding a projection with the alias name
3971    fn apply_alias(&mut self, plan: LogicalPlan, alias_name: String) -> Result<LogicalPlan> {
3972        let fields_expr = self.create_field_column_exprs()?;
3973
3974        // TODO(dennis): how to support multi-value aliasing?
3975        ensure!(
3976            fields_expr.len() == 1,
3977            UnsupportedExprSnafu {
3978                name: "alias on multi-value result"
3979            }
3980        );
3981
3982        let project_fields = fields_expr
3983            .into_iter()
3984            .map(|expr| expr.alias(&alias_name))
3985            .chain(self.create_tag_column_exprs()?)
3986            .chain(Some(self.create_time_index_column_expr()?));
3987
3988        LogicalPlanBuilder::from(plan)
3989            .project(project_fields)
3990            .context(DataFusionPlanningSnafu)?
3991            .build()
3992            .context(DataFusionPlanningSnafu)
3993    }
3994}
3995
3996#[derive(Default, Debug)]
3997struct FunctionArgs {
3998    input: Option<PromExpr>,
3999    literals: Vec<DfExpr>,
4000}
4001
4002/// Represents different types of scalar functions supported in PromQL expressions.
4003/// Each variant defines how the function should be processed and what arguments it expects.
4004#[derive(Debug, Clone)]
4005enum ScalarFunc {
4006    /// DataFusion's registered(including built-in) scalar functions (e.g., abs, sqrt, round, clamp).
4007    /// These are passed through directly to DataFusion's execution engine.
4008    /// Processing: Simple argument insertion at the specified position.
4009    DataFusionBuiltin(Arc<ScalarUdfDef>),
4010    /// User-defined functions registered in DataFusion's function registry.
4011    /// Similar to DataFusionBuiltin but for custom functions not built into DataFusion.
4012    /// Processing: Direct pass-through with argument positioning.
4013    DataFusionUdf(Arc<ScalarUdfDef>),
4014    /// PromQL-specific functions that operate on time series data with temporal context.
4015    /// These functions require both timestamp ranges and values to perform calculations.
4016    /// Processing: Automatically injects timestamp_range and value columns as first arguments.
4017    /// Examples: idelta, irate, resets, changes, deriv, *_over_time function
4018    Udf(Arc<ScalarUdfDef>),
4019    /// PromQL functions requiring extrapolation calculations with explicit range information.
4020    /// These functions need to know the time range length to perform rate calculations.
4021    /// The second field contains the range length in milliseconds.
4022    /// Processing: Injects timestamp_range, value, time_index columns and appends range_length.
4023    /// Examples: increase, rate, delta
4024    // TODO(ruihang): maybe merge with Udf later
4025    ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
4026    /// Functions that generate expressions directly without external UDF calls.
4027    /// The expression is constructed during function matching and requires no additional processing.
4028    /// Examples: time(), minute(), hour(), month(), year() and other date/time extractors
4029    GeneratedExpr,
4030}
4031
4032#[cfg(test)]
4033mod test {
4034    use std::time::{Duration, UNIX_EPOCH};
4035
4036    use catalog::RegisterTableRequest;
4037    use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
4038    use common_base::Plugins;
4039    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
4040    use common_query::prelude::greptime_timestamp;
4041    use common_query::test_util::DummyDecoder;
4042    use datatypes::prelude::ConcreteDataType;
4043    use datatypes::schema::{ColumnSchema, Schema};
4044    use promql_parser::label::Labels;
4045    use promql_parser::parser;
4046    use session::context::QueryContext;
4047    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
4048    use table::test_util::EmptyTable;
4049
4050    use super::*;
4051    use crate::QueryEngineContext;
4052    use crate::options::QueryOptions;
4053    use crate::parser::QueryLanguageParser;
4054
4055    fn build_query_engine_state() -> QueryEngineState {
4056        QueryEngineState::new(
4057            new_memory_catalog_manager().unwrap(),
4058            None,
4059            None,
4060            None,
4061            None,
4062            None,
4063            false,
4064            Plugins::default(),
4065            QueryOptions::default(),
4066        )
4067    }
4068
4069    async fn build_optimized_promql_plan(
4070        table_provider: DfTableSourceProvider,
4071        eval_stmt: &EvalStmt,
4072    ) -> LogicalPlan {
4073        let state = build_query_engine_state();
4074        let raw_plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &state)
4075            .await
4076            .unwrap();
4077        let context = QueryEngineContext::new(state.session_state(), QueryContext::arc());
4078        state
4079            .optimize_by_extension_rules(raw_plan, &context)
4080            .unwrap()
4081    }
4082
4083    async fn build_optimized_tsid_plan(
4084        query: &str,
4085        num_tag: usize,
4086        num_field: usize,
4087        end_secs: u64,
4088        lookback_secs: u64,
4089    ) -> String {
4090        let eval_stmt = EvalStmt {
4091            expr: parser::parse(query).unwrap(),
4092            start: UNIX_EPOCH,
4093            end: UNIX_EPOCH
4094                .checked_add(Duration::from_secs(end_secs))
4095                .unwrap(),
4096            interval: Duration::from_secs(5),
4097            lookback_delta: Duration::from_secs(lookback_secs),
4098        };
4099        let table_provider = build_test_table_provider_with_tsid(
4100            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4101            num_tag,
4102            num_field,
4103        )
4104        .await;
4105
4106        build_optimized_promql_plan(table_provider, &eval_stmt)
4107            .await
4108            .display_indent_schema()
4109            .to_string()
4110    }
4111
4112    async fn assert_nested_count_rewrite_applies(query: &str, expected_outer_agg: &str) {
4113        let plan_str = build_optimized_tsid_plan(query, 2, 1, 100_000, 1).await;
4114
4115        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4116        assert!(plan_str.contains("Projection: some_metric.timestamp, some_metric.tag_0"));
4117        assert!(plan_str.contains("Distinct:"));
4118        assert!(plan_str.contains(expected_outer_agg), "{plan_str}");
4119        assert!(!plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
4120    }
4121
4122    async fn assert_nested_count_rewrite_missing(query: &str, num_tag: usize, lookback_secs: u64) {
4123        let plan_str = build_optimized_tsid_plan(query, num_tag, 1, 100_000, lookback_secs).await;
4124        assert!(!plan_str.contains("Distinct:"), "{plan_str}");
4125    }
4126
4127    async fn build_test_table_provider(
4128        table_name_tuples: &[(String, String)],
4129        num_tag: usize,
4130        num_field: usize,
4131    ) -> DfTableSourceProvider {
4132        let catalog_list = MemoryCatalogManager::with_default_setup();
4133        for (schema_name, table_name) in table_name_tuples {
4134            let mut columns = vec![];
4135            for i in 0..num_tag {
4136                columns.push(ColumnSchema::new(
4137                    format!("tag_{i}"),
4138                    ConcreteDataType::string_datatype(),
4139                    false,
4140                ));
4141            }
4142            columns.push(
4143                ColumnSchema::new(
4144                    "timestamp".to_string(),
4145                    ConcreteDataType::timestamp_millisecond_datatype(),
4146                    false,
4147                )
4148                .with_time_index(true),
4149            );
4150            for i in 0..num_field {
4151                columns.push(ColumnSchema::new(
4152                    format!("field_{i}"),
4153                    ConcreteDataType::float64_datatype(),
4154                    true,
4155                ));
4156            }
4157            let schema = Arc::new(Schema::new(columns));
4158            let table_meta = TableMetaBuilder::empty()
4159                .schema(schema)
4160                .primary_key_indices((0..num_tag).collect())
4161                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4162                .next_column_id(1024)
4163                .build()
4164                .unwrap();
4165            let table_info = TableInfoBuilder::default()
4166                .name(table_name.clone())
4167                .meta(table_meta)
4168                .build()
4169                .unwrap();
4170            let table = EmptyTable::from_table_info(&table_info);
4171
4172            assert!(
4173                catalog_list
4174                    .register_table_sync(RegisterTableRequest {
4175                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4176                        schema: schema_name.clone(),
4177                        table_name: table_name.clone(),
4178                        table_id: 1024,
4179                        table,
4180                    })
4181                    .is_ok()
4182            );
4183        }
4184
4185        DfTableSourceProvider::new(
4186            catalog_list,
4187            false,
4188            QueryContext::arc(),
4189            DummyDecoder::arc(),
4190            false,
4191        )
4192    }
4193
4194    async fn build_test_table_provider_with_tsid(
4195        table_name_tuples: &[(String, String)],
4196        num_tag: usize,
4197        num_field: usize,
4198    ) -> DfTableSourceProvider {
4199        let catalog_list = MemoryCatalogManager::with_default_setup();
4200
4201        let physical_table_name = "phy";
4202        let physical_table_id = 999u32;
4203
4204        // Register a metric engine physical table with internal columns.
4205        {
4206            let mut columns = vec![
4207                ColumnSchema::new(
4208                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
4209                    ConcreteDataType::uint32_datatype(),
4210                    false,
4211                ),
4212                ColumnSchema::new(
4213                    DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
4214                    ConcreteDataType::uint64_datatype(),
4215                    false,
4216                ),
4217            ];
4218            for i in 0..num_tag {
4219                columns.push(ColumnSchema::new(
4220                    format!("tag_{i}"),
4221                    ConcreteDataType::string_datatype(),
4222                    false,
4223                ));
4224            }
4225            columns.push(
4226                ColumnSchema::new(
4227                    "timestamp".to_string(),
4228                    ConcreteDataType::timestamp_millisecond_datatype(),
4229                    false,
4230                )
4231                .with_time_index(true),
4232            );
4233            for i in 0..num_field {
4234                columns.push(ColumnSchema::new(
4235                    format!("field_{i}"),
4236                    ConcreteDataType::float64_datatype(),
4237                    true,
4238                ));
4239            }
4240
4241            let schema = Arc::new(Schema::new(columns));
4242            let primary_key_indices = (0..(2 + num_tag)).collect::<Vec<_>>();
4243            let table_meta = TableMetaBuilder::empty()
4244                .schema(schema)
4245                .primary_key_indices(primary_key_indices)
4246                .value_indices((2 + num_tag..2 + num_tag + 1 + num_field).collect())
4247                .engine(METRIC_ENGINE_NAME.to_string())
4248                .next_column_id(1024)
4249                .build()
4250                .unwrap();
4251            let table_info = TableInfoBuilder::default()
4252                .table_id(physical_table_id)
4253                .name(physical_table_name)
4254                .meta(table_meta)
4255                .build()
4256                .unwrap();
4257            let table = EmptyTable::from_table_info(&table_info);
4258
4259            assert!(
4260                catalog_list
4261                    .register_table_sync(RegisterTableRequest {
4262                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4263                        schema: DEFAULT_SCHEMA_NAME.to_string(),
4264                        table_name: physical_table_name.to_string(),
4265                        table_id: physical_table_id,
4266                        table,
4267                    })
4268                    .is_ok()
4269            );
4270        }
4271
4272        // Register metric engine logical tables without `__tsid`, referencing the physical table.
4273        for (idx, (schema_name, table_name)) in table_name_tuples.iter().enumerate() {
4274            let mut columns = vec![];
4275            for i in 0..num_tag {
4276                columns.push(ColumnSchema::new(
4277                    format!("tag_{i}"),
4278                    ConcreteDataType::string_datatype(),
4279                    false,
4280                ));
4281            }
4282            columns.push(
4283                ColumnSchema::new(
4284                    "timestamp".to_string(),
4285                    ConcreteDataType::timestamp_millisecond_datatype(),
4286                    false,
4287                )
4288                .with_time_index(true),
4289            );
4290            for i in 0..num_field {
4291                columns.push(ColumnSchema::new(
4292                    format!("field_{i}"),
4293                    ConcreteDataType::float64_datatype(),
4294                    true,
4295                ));
4296            }
4297
4298            let schema = Arc::new(Schema::new(columns));
4299            let mut options = table::requests::TableOptions::default();
4300            options.extra_options.insert(
4301                LOGICAL_TABLE_METADATA_KEY.to_string(),
4302                physical_table_name.to_string(),
4303            );
4304            let table_id = 1024u32 + idx as u32;
4305            let table_meta = TableMetaBuilder::empty()
4306                .schema(schema)
4307                .primary_key_indices((0..num_tag).collect())
4308                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4309                .engine(METRIC_ENGINE_NAME.to_string())
4310                .options(options)
4311                .next_column_id(1024)
4312                .build()
4313                .unwrap();
4314            let table_info = TableInfoBuilder::default()
4315                .table_id(table_id)
4316                .name(table_name.clone())
4317                .meta(table_meta)
4318                .build()
4319                .unwrap();
4320            let table = EmptyTable::from_table_info(&table_info);
4321
4322            assert!(
4323                catalog_list
4324                    .register_table_sync(RegisterTableRequest {
4325                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4326                        schema: schema_name.clone(),
4327                        table_name: table_name.clone(),
4328                        table_id,
4329                        table,
4330                    })
4331                    .is_ok()
4332            );
4333        }
4334
4335        DfTableSourceProvider::new(
4336            catalog_list,
4337            false,
4338            QueryContext::arc(),
4339            DummyDecoder::arc(),
4340            false,
4341        )
4342    }
4343
4344    async fn build_test_table_provider_with_fields(
4345        table_name_tuples: &[(String, String)],
4346        tags: &[&str],
4347    ) -> DfTableSourceProvider {
4348        let catalog_list = MemoryCatalogManager::with_default_setup();
4349        for (schema_name, table_name) in table_name_tuples {
4350            let mut columns = vec![];
4351            let num_tag = tags.len();
4352            for tag in tags {
4353                columns.push(ColumnSchema::new(
4354                    tag.to_string(),
4355                    ConcreteDataType::string_datatype(),
4356                    false,
4357                ));
4358            }
4359            columns.push(
4360                ColumnSchema::new(
4361                    greptime_timestamp().to_string(),
4362                    ConcreteDataType::timestamp_millisecond_datatype(),
4363                    false,
4364                )
4365                .with_time_index(true),
4366            );
4367            columns.push(ColumnSchema::new(
4368                greptime_value().to_string(),
4369                ConcreteDataType::float64_datatype(),
4370                true,
4371            ));
4372            let schema = Arc::new(Schema::new(columns));
4373            let table_meta = TableMetaBuilder::empty()
4374                .schema(schema)
4375                .primary_key_indices((0..num_tag).collect())
4376                .next_column_id(1024)
4377                .build()
4378                .unwrap();
4379            let table_info = TableInfoBuilder::default()
4380                .name(table_name.clone())
4381                .meta(table_meta)
4382                .build()
4383                .unwrap();
4384            let table = EmptyTable::from_table_info(&table_info);
4385
4386            assert!(
4387                catalog_list
4388                    .register_table_sync(RegisterTableRequest {
4389                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4390                        schema: schema_name.clone(),
4391                        table_name: table_name.clone(),
4392                        table_id: 1024,
4393                        table,
4394                    })
4395                    .is_ok()
4396            );
4397        }
4398
4399        DfTableSourceProvider::new(
4400            catalog_list,
4401            false,
4402            QueryContext::arc(),
4403            DummyDecoder::arc(),
4404            false,
4405        )
4406    }
4407
4408    // {
4409    //     input: `abs(some_metric{foo!="bar"})`,
4410    //     expected: &Call{
4411    //         Func: MustGetFunction("abs"),
4412    //         Args: Expressions{
4413    //             &VectorSelector{
4414    //                 Name: "some_metric",
4415    //                 LabelMatchers: []*labels.Matcher{
4416    //                     MustLabelMatcher(labels.MatchNotEqual, "foo", "bar"),
4417    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
4418    //                 },
4419    //             },
4420    //         },
4421    //     },
4422    // },
4423    async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
4424        let prom_expr =
4425            parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
4426        let eval_stmt = EvalStmt {
4427            expr: prom_expr,
4428            start: UNIX_EPOCH,
4429            end: UNIX_EPOCH
4430                .checked_add(Duration::from_secs(100_000))
4431                .unwrap(),
4432            interval: Duration::from_secs(5),
4433            lookback_delta: Duration::from_secs(1),
4434        };
4435
4436        let table_provider = build_test_table_provider(
4437            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4438            1,
4439            1,
4440        )
4441        .await;
4442        let plan =
4443            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4444                .await
4445                .unwrap();
4446
4447        let expected = String::from(
4448            "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(ms), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4449            \n  Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(ms), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4450            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4451            \n      PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4452            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4453	            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4454            \n            TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"
4455        ).replace("TEMPLATE", plan_name);
4456
4457        assert_eq!(plan.display_indent_schema().to_string(), expected);
4458    }
4459
4460    #[tokio::test]
4461    async fn single_abs() {
4462        do_single_instant_function_call("abs", "abs").await;
4463    }
4464
4465    #[tokio::test]
4466    #[should_panic]
4467    async fn single_absent() {
4468        do_single_instant_function_call("absent", "").await;
4469    }
4470
4471    #[tokio::test]
4472    async fn single_ceil() {
4473        do_single_instant_function_call("ceil", "ceil").await;
4474    }
4475
4476    #[tokio::test]
4477    async fn single_exp() {
4478        do_single_instant_function_call("exp", "exp").await;
4479    }
4480
4481    #[tokio::test]
4482    async fn single_ln() {
4483        do_single_instant_function_call("ln", "ln").await;
4484    }
4485
4486    #[tokio::test]
4487    async fn single_log2() {
4488        do_single_instant_function_call("log2", "log2").await;
4489    }
4490
4491    #[tokio::test]
4492    async fn single_log10() {
4493        do_single_instant_function_call("log10", "log10").await;
4494    }
4495
4496    #[tokio::test]
4497    #[should_panic]
4498    async fn single_scalar() {
4499        do_single_instant_function_call("scalar", "").await;
4500    }
4501
4502    #[tokio::test]
4503    #[should_panic]
4504    async fn single_sgn() {
4505        do_single_instant_function_call("sgn", "").await;
4506    }
4507
4508    #[tokio::test]
4509    #[should_panic]
4510    async fn single_sort() {
4511        do_single_instant_function_call("sort", "").await;
4512    }
4513
4514    #[tokio::test]
4515    #[should_panic]
4516    async fn single_sort_desc() {
4517        do_single_instant_function_call("sort_desc", "").await;
4518    }
4519
4520    #[tokio::test]
4521    async fn single_sqrt() {
4522        do_single_instant_function_call("sqrt", "sqrt").await;
4523    }
4524
4525    #[tokio::test]
4526    #[should_panic]
4527    async fn single_timestamp() {
4528        do_single_instant_function_call("timestamp", "").await;
4529    }
4530
4531    #[tokio::test]
4532    async fn single_acos() {
4533        do_single_instant_function_call("acos", "acos").await;
4534    }
4535
4536    #[tokio::test]
4537    #[should_panic]
4538    async fn single_acosh() {
4539        do_single_instant_function_call("acosh", "").await;
4540    }
4541
4542    #[tokio::test]
4543    async fn single_asin() {
4544        do_single_instant_function_call("asin", "asin").await;
4545    }
4546
4547    #[tokio::test]
4548    #[should_panic]
4549    async fn single_asinh() {
4550        do_single_instant_function_call("asinh", "").await;
4551    }
4552
4553    #[tokio::test]
4554    async fn single_atan() {
4555        do_single_instant_function_call("atan", "atan").await;
4556    }
4557
4558    #[tokio::test]
4559    #[should_panic]
4560    async fn single_atanh() {
4561        do_single_instant_function_call("atanh", "").await;
4562    }
4563
4564    #[tokio::test]
4565    async fn single_cos() {
4566        do_single_instant_function_call("cos", "cos").await;
4567    }
4568
4569    #[tokio::test]
4570    #[should_panic]
4571    async fn single_cosh() {
4572        do_single_instant_function_call("cosh", "").await;
4573    }
4574
4575    #[tokio::test]
4576    async fn single_sin() {
4577        do_single_instant_function_call("sin", "sin").await;
4578    }
4579
4580    #[tokio::test]
4581    #[should_panic]
4582    async fn single_sinh() {
4583        do_single_instant_function_call("sinh", "").await;
4584    }
4585
4586    #[tokio::test]
4587    async fn single_tan() {
4588        do_single_instant_function_call("tan", "tan").await;
4589    }
4590
4591    #[tokio::test]
4592    #[should_panic]
4593    async fn single_tanh() {
4594        do_single_instant_function_call("tanh", "").await;
4595    }
4596
4597    #[tokio::test]
4598    #[should_panic]
4599    async fn single_deg() {
4600        do_single_instant_function_call("deg", "").await;
4601    }
4602
4603    #[tokio::test]
4604    #[should_panic]
4605    async fn single_rad() {
4606        do_single_instant_function_call("rad", "").await;
4607    }
4608
4609    // {
4610    //     input: "avg by (foo)(some_metric)",
4611    //     expected: &AggregateExpr{
4612    //         Op: AVG,
4613    //         Expr: &VectorSelector{
4614    //             Name: "some_metric",
4615    //             LabelMatchers: []*labels.Matcher{
4616    //                 MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
4617    //             },
4618    //             PosRange: PositionRange{
4619    //                 Start: 13,
4620    //                 End:   24,
4621    //             },
4622    //         },
4623    //         Grouping: []string{"foo"},
4624    //         PosRange: PositionRange{
4625    //             Start: 0,
4626    //             End:   25,
4627    //         },
4628    //     },
4629    // },
4630    async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
4631        let prom_expr = parser::parse(&format!(
4632            "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
4633        ))
4634        .unwrap();
4635        let mut eval_stmt = EvalStmt {
4636            expr: prom_expr,
4637            start: UNIX_EPOCH,
4638            end: UNIX_EPOCH
4639                .checked_add(Duration::from_secs(100_000))
4640                .unwrap(),
4641            interval: Duration::from_secs(5),
4642            lookback_delta: Duration::from_secs(1),
4643        };
4644
4645        // test group by
4646        let table_provider = build_test_table_provider(
4647            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4648            2,
4649            2,
4650        )
4651        .await;
4652        let plan =
4653            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4654                .await
4655                .unwrap();
4656        let expected_no_without = String::from(
4657            "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4658            \n  Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4659            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4660            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4661            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4662            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4663            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]"
4664        ).replace("TEMPLATE", plan_name);
4665        assert_eq!(
4666            plan.display_indent_schema().to_string(),
4667            expected_no_without
4668        );
4669
4670        // test group without
4671        if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
4672            *modifier = Some(LabelModifier::Exclude(Labels {
4673                labels: vec![String::from("tag_1")].into_iter().collect(),
4674            }));
4675        }
4676        let table_provider = build_test_table_provider(
4677            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4678            2,
4679            2,
4680        )
4681        .await;
4682        let plan =
4683            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4684                .await
4685                .unwrap();
4686        let expected_without = String::from(
4687            "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4688            \n  Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4689            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4690            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4691            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4692            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4693            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]"
4694        ).replace("TEMPLATE", plan_name);
4695        assert_eq!(plan.display_indent_schema().to_string(), expected_without);
4696    }
4697
4698    #[tokio::test]
4699    async fn aggregate_sum() {
4700        do_aggregate_expr_plan("sum", "sum").await;
4701    }
4702
4703    #[tokio::test]
4704    async fn tsid_is_used_for_series_divide_when_available() {
4705        let prom_expr = parser::parse("some_metric").unwrap();
4706        let eval_stmt = EvalStmt {
4707            expr: prom_expr,
4708            start: UNIX_EPOCH,
4709            end: UNIX_EPOCH
4710                .checked_add(Duration::from_secs(100_000))
4711                .unwrap(),
4712            interval: Duration::from_secs(5),
4713            lookback_delta: Duration::from_secs(1),
4714        };
4715
4716        let table_provider = build_test_table_provider_with_tsid(
4717            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4718            1,
4719            1,
4720        )
4721        .await;
4722        let plan =
4723            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4724                .await
4725                .unwrap();
4726
4727        let plan_str = plan.display_indent_schema().to_string();
4728        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4729        assert!(plan_str.contains("__tsid ASC NULLS FIRST"));
4730        assert!(
4731            !plan
4732                .schema()
4733                .fields()
4734                .iter()
4735                .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4736        );
4737    }
4738
4739    #[tokio::test]
4740    async fn scalar_count_count_range_keeps_full_window() {
4741        let plan_str = build_optimized_tsid_plan(
4742            "scalar(count(count(some_metric) by (tag_0)))",
4743            1,
4744            1,
4745            100_000,
4746            1,
4747        )
4748        .await;
4749        assert!(plan_str.contains("ScalarCalculate: tags=[]"));
4750        assert!(plan_str.contains("PromInstantManipulate: range=[0..100000000]"));
4751        assert!(!plan_str.contains("PromInstantManipulate: range=[99999000..99999000]"));
4752    }
4753
4754    #[tokio::test]
4755    async fn scalar_count_count_rewrite_applies_inside_binary_expr_for_tsid_input() {
4756        let plan_str = build_optimized_tsid_plan(
4757            "sum(irate(some_metric[1h])) / scalar(count(count(some_metric) by (tag_0)))",
4758            2,
4759            1,
4760            10,
4761            300,
4762        )
4763        .await;
4764        assert!(plan_str.contains("Distinct:"), "{plan_str}");
4765    }
4766
4767    #[tokio::test]
4768    async fn nested_count_rewrite_keeps_full_series_key_with_tsid_input() {
4769        assert_nested_count_rewrite_applies(
4770            "count(count(some_metric) by (tag_0))",
4771            "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(count(some_metric.field_0))]]"
4772        )
4773        .await;
4774    }
4775
4776    #[tokio::test]
4777    async fn nested_sum_count_rewrite_keeps_full_series_key_with_tsid_input() {
4778        assert_nested_count_rewrite_applies(
4779            "count(sum(some_metric) by (tag_0))",
4780            "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(sum(some_metric.field_0))]]"
4781        )
4782        .await;
4783    }
4784
4785    #[tokio::test]
4786    async fn nested_supported_inner_aggs_rewrite_apply_for_tsid_input() {
4787        for (query, expected_outer_agg) in [
4788            (
4789                "count(avg(some_metric) by (tag_0))",
4790                "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(avg(some_metric.field_0))]]",
4791            ),
4792            (
4793                "count(min(some_metric) by (tag_0))",
4794                "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(min(some_metric.field_0))]]",
4795            ),
4796            (
4797                "count(max(some_metric) by (tag_0))",
4798                "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(max(some_metric.field_0))]]",
4799            ),
4800            (
4801                "count(stddev(some_metric) by (tag_0))",
4802                "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(stddev_pop(some_metric.field_0))]]",
4803            ),
4804            (
4805                "count(stdvar(some_metric) by (tag_0))",
4806                "Aggregate: groupBy=[[some_metric.timestamp]], aggr=[[count(Int64(1)) AS count(var_pop(some_metric.field_0))]]",
4807            ),
4808        ] {
4809            assert_nested_count_rewrite_applies(query, expected_outer_agg).await;
4810        }
4811    }
4812
4813    #[tokio::test]
4814    async fn nested_non_count_inner_aggs_rewrite_filter_null_values_for_tsid_input() {
4815        let count_plan =
4816            build_optimized_tsid_plan("count(count(some_metric) by (tag_0))", 2, 1, 100_000, 1)
4817                .await;
4818        assert!(
4819            !count_plan.contains("some_metric.field_0 IS NOT NULL"),
4820            "{count_plan}"
4821        );
4822
4823        for query in [
4824            "count(sum(some_metric) by (tag_0))",
4825            "count(avg(some_metric) by (tag_0))",
4826            "count(min(some_metric) by (tag_0))",
4827            "count(max(some_metric) by (tag_0))",
4828            "count(stddev(some_metric) by (tag_0))",
4829            "count(stdvar(some_metric) by (tag_0))",
4830        ] {
4831            let plan_str = build_optimized_tsid_plan(query, 2, 1, 100_000, 1).await;
4832            assert!(
4833                plan_str.contains("Filter: some_metric.field_0 IS NOT NULL"),
4834                "{query}: {plan_str}"
4835            );
4836        }
4837    }
4838
4839    #[tokio::test]
4840    async fn nested_unsupported_or_non_direct_inner_aggs_do_not_rewrite() {
4841        assert_nested_count_rewrite_missing("count(group(some_metric) by (tag_0))", 2, 1).await;
4842        assert_nested_count_rewrite_missing(
4843            "count(sum(irate(some_metric[1h])) by (tag_0))",
4844            2,
4845            300,
4846        )
4847        .await;
4848    }
4849
4850    #[tokio::test]
4851    async fn physical_table_name_is_not_leaked_in_plan() {
4852        let prom_expr = parser::parse("some_metric").unwrap();
4853        let eval_stmt = EvalStmt {
4854            expr: prom_expr,
4855            start: UNIX_EPOCH,
4856            end: UNIX_EPOCH
4857                .checked_add(Duration::from_secs(100_000))
4858                .unwrap(),
4859            interval: Duration::from_secs(5),
4860            lookback_delta: Duration::from_secs(1),
4861        };
4862
4863        let table_provider = build_test_table_provider_with_tsid(
4864            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4865            1,
4866            1,
4867        )
4868        .await;
4869        let plan =
4870            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4871                .await
4872                .unwrap();
4873
4874        let plan_str = plan.display_indent_schema().to_string();
4875        assert!(plan_str.contains("TableScan: phy"), "{plan}");
4876        assert!(plan_str.contains("SubqueryAlias: some_metric"));
4877        assert!(plan_str.contains("Filter: phy.__table_id = UInt32(1024)"));
4878        assert!(!plan_str.contains("TableScan: some_metric"));
4879    }
4880
4881    #[tokio::test]
4882    async fn sum_without_does_not_group_by_tsid() {
4883        let prom_expr = parser::parse("sum without (tag_0) (some_metric)").unwrap();
4884        let eval_stmt = EvalStmt {
4885            expr: prom_expr,
4886            start: UNIX_EPOCH,
4887            end: UNIX_EPOCH
4888                .checked_add(Duration::from_secs(100_000))
4889                .unwrap(),
4890            interval: Duration::from_secs(5),
4891            lookback_delta: Duration::from_secs(1),
4892        };
4893
4894        let table_provider = build_test_table_provider_with_tsid(
4895            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4896            1,
4897            1,
4898        )
4899        .await;
4900        let plan =
4901            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4902                .await
4903                .unwrap();
4904
4905        let plan_str = plan.display_indent_schema().to_string();
4906        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4907
4908        let aggr_line = plan_str
4909            .lines()
4910            .find(|line| line.contains("Aggregate: groupBy="))
4911            .unwrap();
4912        assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4913    }
4914
4915    #[tokio::test]
4916    async fn topk_without_does_not_partition_by_tsid() {
4917        let prom_expr = parser::parse("topk without (tag_0) (1, some_metric)").unwrap();
4918        let eval_stmt = EvalStmt {
4919            expr: prom_expr,
4920            start: UNIX_EPOCH,
4921            end: UNIX_EPOCH
4922                .checked_add(Duration::from_secs(100_000))
4923                .unwrap(),
4924            interval: Duration::from_secs(5),
4925            lookback_delta: Duration::from_secs(1),
4926        };
4927
4928        let table_provider = build_test_table_provider_with_tsid(
4929            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4930            1,
4931            1,
4932        )
4933        .await;
4934        let plan =
4935            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4936                .await
4937                .unwrap();
4938
4939        let plan_str = plan.display_indent_schema().to_string();
4940        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4941
4942        let window_line = plan_str
4943            .lines()
4944            .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
4945            .unwrap();
4946        let partition_by = window_line
4947            .split("PARTITION BY [")
4948            .nth(1)
4949            .and_then(|s| s.split("] ORDER BY").next())
4950            .unwrap();
4951        assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4952    }
4953
4954    #[tokio::test]
4955    async fn sum_by_does_not_group_by_tsid() {
4956        let prom_expr = parser::parse("sum by (__tsid) (some_metric)").unwrap();
4957        let eval_stmt = EvalStmt {
4958            expr: prom_expr,
4959            start: UNIX_EPOCH,
4960            end: UNIX_EPOCH
4961                .checked_add(Duration::from_secs(100_000))
4962                .unwrap(),
4963            interval: Duration::from_secs(5),
4964            lookback_delta: Duration::from_secs(1),
4965        };
4966
4967        let table_provider = build_test_table_provider_with_tsid(
4968            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4969            1,
4970            1,
4971        )
4972        .await;
4973        let plan =
4974            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4975                .await
4976                .unwrap();
4977
4978        let plan_str = plan.display_indent_schema().to_string();
4979        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4980
4981        let aggr_line = plan_str
4982            .lines()
4983            .find(|line| line.contains("Aggregate: groupBy="))
4984            .unwrap();
4985        assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4986    }
4987
4988    #[tokio::test]
4989    async fn topk_by_does_not_partition_by_tsid() {
4990        let prom_expr = parser::parse("topk by (__tsid) (1, some_metric)").unwrap();
4991        let eval_stmt = EvalStmt {
4992            expr: prom_expr,
4993            start: UNIX_EPOCH,
4994            end: UNIX_EPOCH
4995                .checked_add(Duration::from_secs(100_000))
4996                .unwrap(),
4997            interval: Duration::from_secs(5),
4998            lookback_delta: Duration::from_secs(1),
4999        };
5000
5001        let table_provider = build_test_table_provider_with_tsid(
5002            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5003            1,
5004            1,
5005        )
5006        .await;
5007        let plan =
5008            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5009                .await
5010                .unwrap();
5011
5012        let plan_str = plan.display_indent_schema().to_string();
5013        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5014
5015        let window_line = plan_str
5016            .lines()
5017            .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
5018            .unwrap();
5019        let partition_by = window_line
5020            .split("PARTITION BY [")
5021            .nth(1)
5022            .and_then(|s| s.split("] ORDER BY").next())
5023            .unwrap();
5024        assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
5025    }
5026
5027    #[tokio::test]
5028    async fn selector_matcher_on_tsid_does_not_use_internal_column() {
5029        let prom_expr = parser::parse(r#"some_metric{__tsid="123"}"#).unwrap();
5030        let eval_stmt = EvalStmt {
5031            expr: prom_expr,
5032            start: UNIX_EPOCH,
5033            end: UNIX_EPOCH
5034                .checked_add(Duration::from_secs(100_000))
5035                .unwrap(),
5036            interval: Duration::from_secs(5),
5037            lookback_delta: Duration::from_secs(1),
5038        };
5039
5040        let table_provider = build_test_table_provider_with_tsid(
5041            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5042            1,
5043            1,
5044        )
5045        .await;
5046        let plan =
5047            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5048                .await
5049                .unwrap();
5050
5051        fn collect_filter_cols(plan: &LogicalPlan, out: &mut HashSet<Column>) {
5052            if let LogicalPlan::Filter(filter) = plan {
5053                datafusion_expr::utils::expr_to_columns(&filter.predicate, out).unwrap();
5054            }
5055            for input in plan.inputs() {
5056                collect_filter_cols(input, out);
5057            }
5058        }
5059
5060        let mut filter_cols = HashSet::new();
5061        collect_filter_cols(&plan, &mut filter_cols);
5062        assert!(
5063            !filter_cols
5064                .iter()
5065                .any(|c| c.name == DATA_SCHEMA_TSID_COLUMN_NAME)
5066        );
5067    }
5068
5069    #[tokio::test]
5070    async fn tsid_is_not_used_when_physical_table_is_missing() {
5071        let prom_expr = parser::parse("some_metric").unwrap();
5072        let eval_stmt = EvalStmt {
5073            expr: prom_expr,
5074            start: UNIX_EPOCH,
5075            end: UNIX_EPOCH
5076                .checked_add(Duration::from_secs(100_000))
5077                .unwrap(),
5078            interval: Duration::from_secs(5),
5079            lookback_delta: Duration::from_secs(1),
5080        };
5081
5082        let catalog_list = MemoryCatalogManager::with_default_setup();
5083
5084        // Register a metric engine logical table referencing a missing physical table.
5085        let mut columns = vec![ColumnSchema::new(
5086            "tag_0".to_string(),
5087            ConcreteDataType::string_datatype(),
5088            false,
5089        )];
5090        columns.push(
5091            ColumnSchema::new(
5092                "timestamp".to_string(),
5093                ConcreteDataType::timestamp_millisecond_datatype(),
5094                false,
5095            )
5096            .with_time_index(true),
5097        );
5098        columns.push(ColumnSchema::new(
5099            "field_0".to_string(),
5100            ConcreteDataType::float64_datatype(),
5101            true,
5102        ));
5103        let schema = Arc::new(Schema::new(columns));
5104        let mut options = table::requests::TableOptions::default();
5105        options
5106            .extra_options
5107            .insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "phy".to_string());
5108        let table_meta = TableMetaBuilder::empty()
5109            .schema(schema)
5110            .primary_key_indices(vec![0])
5111            .value_indices(vec![2])
5112            .engine(METRIC_ENGINE_NAME.to_string())
5113            .options(options)
5114            .next_column_id(1024)
5115            .build()
5116            .unwrap();
5117        let table_info = TableInfoBuilder::default()
5118            .table_id(1024)
5119            .name("some_metric")
5120            .meta(table_meta)
5121            .build()
5122            .unwrap();
5123        let table = EmptyTable::from_table_info(&table_info);
5124        catalog_list
5125            .register_table_sync(RegisterTableRequest {
5126                catalog: DEFAULT_CATALOG_NAME.to_string(),
5127                schema: DEFAULT_SCHEMA_NAME.to_string(),
5128                table_name: "some_metric".to_string(),
5129                table_id: 1024,
5130                table,
5131            })
5132            .unwrap();
5133
5134        let table_provider = DfTableSourceProvider::new(
5135            catalog_list,
5136            false,
5137            QueryContext::arc(),
5138            DummyDecoder::arc(),
5139            false,
5140        );
5141
5142        let plan =
5143            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5144                .await
5145                .unwrap();
5146
5147        let plan_str = plan.display_indent_schema().to_string();
5148        assert!(plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
5149        assert!(!plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
5150    }
5151
5152    #[tokio::test]
5153    async fn tsid_is_carried_only_when_aggregate_preserves_label_set() {
5154        let prom_expr = parser::parse("sum by (tag_0) (some_metric)").unwrap();
5155        let eval_stmt = EvalStmt {
5156            expr: prom_expr,
5157            start: UNIX_EPOCH,
5158            end: UNIX_EPOCH
5159                .checked_add(Duration::from_secs(100_000))
5160                .unwrap(),
5161            interval: Duration::from_secs(5),
5162            lookback_delta: Duration::from_secs(1),
5163        };
5164
5165        let table_provider = build_test_table_provider_with_tsid(
5166            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5167            1,
5168            1,
5169        )
5170        .await;
5171        let plan =
5172            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5173                .await
5174                .unwrap();
5175
5176        let plan_str = plan.display_indent_schema().to_string();
5177        assert!(plan_str.contains("first_value") && plan_str.contains("__tsid"));
5178        assert!(
5179            !plan
5180                .schema()
5181                .fields()
5182                .iter()
5183                .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
5184        );
5185
5186        // Merging aggregate: label set is reduced, tsid should not be carried.
5187        let prom_expr = parser::parse("sum(some_metric)").unwrap();
5188        let eval_stmt = EvalStmt {
5189            expr: prom_expr,
5190            start: UNIX_EPOCH,
5191            end: UNIX_EPOCH
5192                .checked_add(Duration::from_secs(100_000))
5193                .unwrap(),
5194            interval: Duration::from_secs(5),
5195            lookback_delta: Duration::from_secs(1),
5196        };
5197        let table_provider = build_test_table_provider_with_tsid(
5198            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5199            1,
5200            1,
5201        )
5202        .await;
5203        let plan =
5204            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5205                .await
5206                .unwrap();
5207        let plan_str = plan.display_indent_schema().to_string();
5208        assert!(!plan_str.contains("first_value"));
5209    }
5210
5211    #[tokio::test]
5212    async fn or_operator_with_unknown_metric_does_not_require_tsid() {
5213        let prom_expr = parser::parse("unknown_metric or some_metric").unwrap();
5214        let eval_stmt = EvalStmt {
5215            expr: prom_expr,
5216            start: UNIX_EPOCH,
5217            end: UNIX_EPOCH
5218                .checked_add(Duration::from_secs(100_000))
5219                .unwrap(),
5220            interval: Duration::from_secs(5),
5221            lookback_delta: Duration::from_secs(1),
5222        };
5223
5224        let table_provider = build_test_table_provider_with_tsid(
5225            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5226            1,
5227            1,
5228        )
5229        .await;
5230
5231        let plan =
5232            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5233                .await
5234                .unwrap();
5235
5236        assert!(
5237            !plan
5238                .schema()
5239                .fields()
5240                .iter()
5241                .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
5242        );
5243    }
5244
5245    #[tokio::test]
5246    async fn aggregate_avg() {
5247        do_aggregate_expr_plan("avg", "avg").await;
5248    }
5249
5250    #[tokio::test]
5251    #[should_panic] // output type doesn't match
5252    async fn aggregate_count() {
5253        do_aggregate_expr_plan("count", "count").await;
5254    }
5255
5256    #[tokio::test]
5257    async fn aggregate_min() {
5258        do_aggregate_expr_plan("min", "min").await;
5259    }
5260
5261    #[tokio::test]
5262    async fn aggregate_max() {
5263        do_aggregate_expr_plan("max", "max").await;
5264    }
5265
5266    #[tokio::test]
5267    async fn aggregate_group() {
5268        // Regression test for `group()` aggregator.
5269        // PromQL: sum(group by (cluster)(kubernetes_build_info{service="kubernetes",job="apiserver"}))
5270        // should be plannable, and `group()` should produce constant 1 for each group.
5271        let prom_expr = parser::parse(
5272            "sum(group by (cluster)(kubernetes_build_info{service=\"kubernetes\",job=\"apiserver\"}))",
5273        )
5274        .unwrap();
5275        let eval_stmt = EvalStmt {
5276            expr: prom_expr,
5277            start: UNIX_EPOCH,
5278            end: UNIX_EPOCH
5279                .checked_add(Duration::from_secs(100_000))
5280                .unwrap(),
5281            interval: Duration::from_secs(5),
5282            lookback_delta: Duration::from_secs(1),
5283        };
5284
5285        let table_provider = build_test_table_provider_with_fields(
5286            &[(
5287                DEFAULT_SCHEMA_NAME.to_string(),
5288                "kubernetes_build_info".to_string(),
5289            )],
5290            &["cluster", "service", "job"],
5291        )
5292        .await;
5293        let plan =
5294            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5295                .await
5296                .unwrap();
5297
5298        let plan_str = plan.display_indent_schema().to_string();
5299        assert!(plan_str.contains("max(Float64(1"));
5300    }
5301
5302    #[tokio::test]
5303    async fn aggregate_stddev() {
5304        do_aggregate_expr_plan("stddev", "stddev_pop").await;
5305    }
5306
5307    #[tokio::test]
5308    async fn aggregate_stdvar() {
5309        do_aggregate_expr_plan("stdvar", "var_pop").await;
5310    }
5311
5312    // TODO(ruihang): add range fn tests once exprs are ready.
5313
5314    // {
5315    //     input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}",
5316    //     expected: &BinaryExpr{
5317    //         Op: ADD,
5318    //         LHS: &VectorSelector{
5319    //             Name: "a",
5320    //             LabelMatchers: []*labels.Matcher{
5321    //                     MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"),
5322    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
5323    //             },
5324    //         },
5325    //         RHS: &VectorSelector{
5326    //             Name: "sum",
5327    //             LabelMatchers: []*labels.Matcher{
5328    //                     MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"),
5329    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
5330    //             },
5331    //         },
5332    //         VectorMatching: &VectorMatching{},
5333    //     },
5334    // },
5335    #[tokio::test]
5336    async fn binary_op_column_column() {
5337        let prom_expr =
5338            parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
5339        let eval_stmt = EvalStmt {
5340            expr: prom_expr,
5341            start: UNIX_EPOCH,
5342            end: UNIX_EPOCH
5343                .checked_add(Duration::from_secs(100_000))
5344                .unwrap(),
5345            interval: Duration::from_secs(5),
5346            lookback_delta: Duration::from_secs(1),
5347        };
5348
5349        let table_provider = build_test_table_provider(
5350            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5351            1,
5352            1,
5353        )
5354        .await;
5355        let plan =
5356            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5357                .await
5358                .unwrap();
5359
5360        let expected = String::from(
5361            "Projection: rhs.tag_0, rhs.timestamp, CAST(lhs.field_0 AS Float64) + CAST(rhs.field_0 AS Float64) AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), lhs.field_0 + rhs.field_0:Float64;N]\
5362            \n  Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5363            \n    SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5364            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5365            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5366            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5367            \n            Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5368            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5369            \n    SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5370            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5371            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5372            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5373            \n            Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5374            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5375        );
5376
5377        assert_eq!(plan.display_indent_schema().to_string(), expected);
5378    }
5379
5380    async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
5381        let prom_expr = parser::parse(query).unwrap();
5382        let eval_stmt = EvalStmt {
5383            expr: prom_expr,
5384            start: UNIX_EPOCH,
5385            end: UNIX_EPOCH
5386                .checked_add(Duration::from_secs(100_000))
5387                .unwrap(),
5388            interval: Duration::from_secs(5),
5389            lookback_delta: Duration::from_secs(1),
5390        };
5391
5392        let table_provider = build_test_table_provider(
5393            &[
5394                (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5395                (
5396                    "greptime_private".to_string(),
5397                    "some_alt_metric".to_string(),
5398                ),
5399            ],
5400            1,
5401            1,
5402        )
5403        .await;
5404        let plan =
5405            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5406                .await
5407                .unwrap();
5408
5409        assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
5410    }
5411
5412    #[tokio::test]
5413    async fn binary_op_literal_column() {
5414        let query = r#"1 + some_metric{tag_0="bar"}"#;
5415        let expected = String::from(
5416            "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + CAST(some_metric.field_0 AS Float64) AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(ms), Float64(1) + field_0:Float64;N]\
5417            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5418            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5419            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5420            \n        Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5421            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5422        );
5423
5424        indie_query_plan_compare(query, expected).await;
5425    }
5426
5427    #[tokio::test]
5428    async fn binary_op_literal_literal() {
5429        let query = r#"1 + 1"#;
5430        let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(ms), value:Float64;N]
5431  TableScan: dummy [time:Timestamp(ms), value:Float64;N]"#;
5432        indie_query_plan_compare(query, expected).await;
5433    }
5434
5435    #[tokio::test]
5436    async fn simple_bool_grammar() {
5437        let query = "some_metric != bool 1.2345";
5438        let expected = String::from(
5439            "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(ms), field_0 != Float64(1.2345):Float64;N]\
5440            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5441            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5442            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5443            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5444            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5445        );
5446
5447        indie_query_plan_compare(query, expected).await;
5448    }
5449
5450    #[tokio::test]
5451    async fn bool_with_additional_arithmetic() {
5452        let query = "some_metric + (1 == bool 2)";
5453        let expected = String::from(
5454            "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 AS Float64) + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(ms), field_0 + Float64(1) = Float64(2):Float64;N]\
5455            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5456            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5457            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5458            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5459            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5460        );
5461
5462        indie_query_plan_compare(query, expected).await;
5463    }
5464
5465    #[tokio::test]
5466    async fn simple_unary() {
5467        let query = "-some_metric";
5468        let expected = String::from(
5469            "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(ms), (- field_0):Float64;N]\
5470            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5471            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5472            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5473            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5474            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5475        );
5476
5477        indie_query_plan_compare(query, expected).await;
5478    }
5479
5480    #[tokio::test]
5481    async fn increase_aggr() {
5482        let query = "increase(some_metric[5m])";
5483        let expected = String::from(
5484            "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(ms), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5485            \n  Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(ms), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5486            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(ms))]\
5487            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5488            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5489            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5490            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5491            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5492        );
5493
5494        indie_query_plan_compare(query, expected).await;
5495    }
5496
5497    #[tokio::test]
5498    async fn less_filter_on_value() {
5499        let query = "some_metric < 1.2345";
5500        let expected = String::from(
5501            "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5502            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5503            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5504            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5505            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5506            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5507        );
5508
5509        indie_query_plan_compare(query, expected).await;
5510    }
5511
5512    #[tokio::test]
5513    async fn count_over_time() {
5514        let query = "count_over_time(some_metric[5m])";
5515        let expected = String::from(
5516            "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5517            \n  Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5518            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(ms))]\
5519            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5520            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5521            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5522            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5523            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5524        );
5525
5526        indie_query_plan_compare(query, expected).await;
5527    }
5528
5529    #[tokio::test]
5530    async fn test_hash_join() {
5531        let mut eval_stmt = EvalStmt {
5532            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5533            start: UNIX_EPOCH,
5534            end: UNIX_EPOCH
5535                .checked_add(Duration::from_secs(100_000))
5536                .unwrap(),
5537            interval: Duration::from_secs(5),
5538            lookback_delta: Duration::from_secs(1),
5539        };
5540
5541        let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
5542
5543        let prom_expr = parser::parse(case).unwrap();
5544        eval_stmt.expr = prom_expr;
5545        let table_provider = build_test_table_provider_with_fields(
5546            &[
5547                (
5548                    DEFAULT_SCHEMA_NAME.to_string(),
5549                    "http_server_requests_seconds_sum".to_string(),
5550                ),
5551                (
5552                    DEFAULT_SCHEMA_NAME.to_string(),
5553                    "http_server_requests_seconds_count".to_string(),
5554                ),
5555            ],
5556            &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
5557        )
5558        .await;
5559        // Should be ok
5560        let plan =
5561            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5562                .await
5563                .unwrap();
5564        let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, CAST(http_server_requests_seconds_sum.greptime_value AS Float64) / CAST(http_server_requests_seconds_count.greptime_value AS Float64) AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
5565            \n  Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
5566            \n    SubqueryAlias: http_server_requests_seconds_sum\
5567            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5568            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5569            \n          Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
5570            \n            Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5571            \n              TableScan: http_server_requests_seconds_sum\
5572            \n    SubqueryAlias: http_server_requests_seconds_count\
5573            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5574            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5575            \n          Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
5576            \n            Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5577            \n              TableScan: http_server_requests_seconds_count";
5578        assert_eq!(plan.to_string(), expected);
5579    }
5580
5581    #[tokio::test]
5582    async fn test_nested_histogram_quantile() {
5583        let mut eval_stmt = EvalStmt {
5584            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5585            start: UNIX_EPOCH,
5586            end: UNIX_EPOCH
5587                .checked_add(Duration::from_secs(100_000))
5588                .unwrap(),
5589            interval: Duration::from_secs(5),
5590            lookback_delta: Duration::from_secs(1),
5591        };
5592
5593        let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
5594
5595        let prom_expr = parser::parse(case).unwrap();
5596        eval_stmt.expr = prom_expr;
5597        let table_provider = build_test_table_provider_with_fields(
5598            &[(
5599                DEFAULT_SCHEMA_NAME.to_string(),
5600                "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
5601            )],
5602            &["pod", "le", "path", "code", "container"],
5603        )
5604        .await;
5605        // Should be ok
5606        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5607            .await
5608            .unwrap();
5609    }
5610
5611    #[tokio::test]
5612    async fn test_parse_and_operator() {
5613        let mut eval_stmt = EvalStmt {
5614            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5615            start: UNIX_EPOCH,
5616            end: UNIX_EPOCH
5617                .checked_add(Duration::from_secs(100_000))
5618                .unwrap(),
5619            interval: Duration::from_secs(5),
5620            lookback_delta: Duration::from_secs(1),
5621        };
5622
5623        let cases = [
5624            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5625            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5626        ];
5627
5628        for case in cases {
5629            let prom_expr = parser::parse(case).unwrap();
5630            eval_stmt.expr = prom_expr;
5631            let table_provider = build_test_table_provider_with_fields(
5632                &[
5633                    (
5634                        DEFAULT_SCHEMA_NAME.to_string(),
5635                        "kubelet_volume_stats_used_bytes".to_string(),
5636                    ),
5637                    (
5638                        DEFAULT_SCHEMA_NAME.to_string(),
5639                        "kubelet_volume_stats_capacity_bytes".to_string(),
5640                    ),
5641                ],
5642                &["namespace", "persistentvolumeclaim"],
5643            )
5644            .await;
5645            // Should be ok
5646            let _ =
5647                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5648                    .await
5649                    .unwrap();
5650        }
5651    }
5652
5653    #[tokio::test]
5654    async fn test_nested_binary_op() {
5655        let mut eval_stmt = EvalStmt {
5656            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5657            start: UNIX_EPOCH,
5658            end: UNIX_EPOCH
5659                .checked_add(Duration::from_secs(100_000))
5660                .unwrap(),
5661            interval: Duration::from_secs(5),
5662            lookback_delta: Duration::from_secs(1),
5663        };
5664
5665        let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
5666        (
5667            sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
5668            or
5669            vector(0)
5670        )"#;
5671
5672        let prom_expr = parser::parse(case).unwrap();
5673        eval_stmt.expr = prom_expr;
5674        let table_provider = build_test_table_provider_with_fields(
5675            &[(
5676                DEFAULT_SCHEMA_NAME.to_string(),
5677                "nginx_ingress_controller_requests".to_string(),
5678            )],
5679            &["namespace", "job"],
5680        )
5681        .await;
5682        // Should be ok
5683        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5684            .await
5685            .unwrap();
5686    }
5687
5688    #[tokio::test]
5689    async fn test_parse_or_operator() {
5690        let mut eval_stmt = EvalStmt {
5691            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5692            start: UNIX_EPOCH,
5693            end: UNIX_EPOCH
5694                .checked_add(Duration::from_secs(100_000))
5695                .unwrap(),
5696            interval: Duration::from_secs(5),
5697            lookback_delta: Duration::from_secs(1),
5698        };
5699
5700        let case = r#"
5701        sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
5702        (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
5703            or
5704        200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
5705        sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
5706
5707        let table_provider = build_test_table_provider_with_fields(
5708            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5709            &["tenant_name", "cluster_name"],
5710        )
5711        .await;
5712        eval_stmt.expr = parser::parse(case).unwrap();
5713        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5714            .await
5715            .unwrap();
5716
5717        let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5718            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
5719            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5720            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5721            or
5722            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5723            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5724            or
5725            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5726            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
5727        let table_provider = build_test_table_provider_with_fields(
5728            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5729            &["tenant_name", "cluster_name"],
5730        )
5731        .await;
5732        eval_stmt.expr = parser::parse(case).unwrap();
5733        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5734            .await
5735            .unwrap();
5736
5737        let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
5738            sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5739            (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5740            (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
5741        let table_provider = build_test_table_provider_with_fields(
5742            &[
5743                (
5744                    DEFAULT_SCHEMA_NAME.to_string(),
5745                    "background_waitevent_cnt".to_string(),
5746                ),
5747                (
5748                    DEFAULT_SCHEMA_NAME.to_string(),
5749                    "foreground_waitevent_cnt".to_string(),
5750                ),
5751            ],
5752            &["tenant_name", "cluster_name"],
5753        )
5754        .await;
5755        eval_stmt.expr = parser::parse(case).unwrap();
5756        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5757            .await
5758            .unwrap();
5759
5760        let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
5761        let table_provider = build_test_table_provider_with_fields(
5762            &[
5763                (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
5764                (
5765                    DEFAULT_SCHEMA_NAME.to_string(),
5766                    "container_cpu_load_average_10s".to_string(),
5767                ),
5768                (
5769                    DEFAULT_SCHEMA_NAME.to_string(),
5770                    "container_spec_cpu_quota".to_string(),
5771                ),
5772            ],
5773            &["cluster_name", "host_name"],
5774        )
5775        .await;
5776        eval_stmt.expr = parser::parse(case).unwrap();
5777        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5778            .await
5779            .unwrap();
5780    }
5781
5782    #[tokio::test]
5783    async fn value_matcher() {
5784        // template
5785        let mut eval_stmt = EvalStmt {
5786            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5787            start: UNIX_EPOCH,
5788            end: UNIX_EPOCH
5789                .checked_add(Duration::from_secs(100_000))
5790                .unwrap(),
5791            interval: Duration::from_secs(5),
5792            lookback_delta: Duration::from_secs(1),
5793        };
5794
5795        let cases = [
5796            // single equal matcher
5797            (
5798                r#"some_metric{__field__="field_1"}"#,
5799                vec![
5800                    "some_metric.field_1",
5801                    "some_metric.tag_0",
5802                    "some_metric.tag_1",
5803                    "some_metric.tag_2",
5804                    "some_metric.timestamp",
5805                ],
5806            ),
5807            // two equal matchers
5808            (
5809                r#"some_metric{__field__="field_1", __field__="field_0"}"#,
5810                vec![
5811                    "some_metric.field_0",
5812                    "some_metric.field_1",
5813                    "some_metric.tag_0",
5814                    "some_metric.tag_1",
5815                    "some_metric.tag_2",
5816                    "some_metric.timestamp",
5817                ],
5818            ),
5819            // single not_eq matcher
5820            (
5821                r#"some_metric{__field__!="field_1"}"#,
5822                vec![
5823                    "some_metric.field_0",
5824                    "some_metric.field_2",
5825                    "some_metric.tag_0",
5826                    "some_metric.tag_1",
5827                    "some_metric.tag_2",
5828                    "some_metric.timestamp",
5829                ],
5830            ),
5831            // two not_eq matchers
5832            (
5833                r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
5834                vec![
5835                    "some_metric.field_0",
5836                    "some_metric.tag_0",
5837                    "some_metric.tag_1",
5838                    "some_metric.tag_2",
5839                    "some_metric.timestamp",
5840                ],
5841            ),
5842            // equal and not_eq matchers (no conflict)
5843            (
5844                r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
5845                vec![
5846                    "some_metric.field_1",
5847                    "some_metric.tag_0",
5848                    "some_metric.tag_1",
5849                    "some_metric.tag_2",
5850                    "some_metric.timestamp",
5851                ],
5852            ),
5853            // equal and not_eq matchers (conflict)
5854            (
5855                r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
5856                vec![
5857                    "some_metric.tag_0",
5858                    "some_metric.tag_1",
5859                    "some_metric.tag_2",
5860                    "some_metric.timestamp",
5861                ],
5862            ),
5863            // single regex eq matcher
5864            (
5865                r#"some_metric{__field__=~"field_1|field_2"}"#,
5866                vec![
5867                    "some_metric.field_1",
5868                    "some_metric.field_2",
5869                    "some_metric.tag_0",
5870                    "some_metric.tag_1",
5871                    "some_metric.tag_2",
5872                    "some_metric.timestamp",
5873                ],
5874            ),
5875            // single regex not_eq matcher
5876            (
5877                r#"some_metric{__field__!~"field_1|field_2"}"#,
5878                vec![
5879                    "some_metric.field_0",
5880                    "some_metric.tag_0",
5881                    "some_metric.tag_1",
5882                    "some_metric.tag_2",
5883                    "some_metric.timestamp",
5884                ],
5885            ),
5886        ];
5887
5888        for case in cases {
5889            let prom_expr = parser::parse(case.0).unwrap();
5890            eval_stmt.expr = prom_expr;
5891            let table_provider = build_test_table_provider(
5892                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5893                3,
5894                3,
5895            )
5896            .await;
5897            let plan =
5898                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5899                    .await
5900                    .unwrap();
5901            let mut fields = plan.schema().field_names();
5902            let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
5903            fields.sort();
5904            expected.sort();
5905            assert_eq!(fields, expected, "case: {:?}", case.0);
5906        }
5907
5908        let bad_cases = [
5909            r#"some_metric{__field__="nonexistent"}"#,
5910            r#"some_metric{__field__!="nonexistent"}"#,
5911        ];
5912
5913        for case in bad_cases {
5914            let prom_expr = parser::parse(case).unwrap();
5915            eval_stmt.expr = prom_expr;
5916            let table_provider = build_test_table_provider(
5917                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5918                3,
5919                3,
5920            )
5921            .await;
5922            let plan =
5923                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5924                    .await;
5925            assert!(plan.is_err(), "case: {:?}", case);
5926        }
5927    }
5928
5929    #[tokio::test]
5930    async fn custom_schema() {
5931        let query = "some_alt_metric{__schema__=\"greptime_private\"}";
5932        let expected = String::from(
5933            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5934            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5935            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5936            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5937            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5938        );
5939
5940        indie_query_plan_compare(query, expected).await;
5941
5942        let query = "some_alt_metric{__database__=\"greptime_private\"}";
5943        let expected = String::from(
5944            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5945            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5946            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5947            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5948            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5949        );
5950
5951        indie_query_plan_compare(query, expected).await;
5952
5953        let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
5954        let expected = String::from(
5955            "Projection: some_metric.tag_0, some_metric.timestamp, CAST(greptime_private.some_alt_metric.field_0 AS Float64) / CAST(some_metric.field_0 AS Float64) AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
5956            \n  Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5957            \n    SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5958            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5959            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5960            \n          Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5961            \n            Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5962            \n              TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5963            \n    SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5964            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5965            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5966            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5967            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5968            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5969        );
5970
5971        indie_query_plan_compare(query, expected).await;
5972    }
5973
5974    #[tokio::test]
5975    async fn only_equals_is_supported_for_special_matcher() {
5976        let queries = &[
5977            "some_alt_metric{__schema__!=\"greptime_private\"}",
5978            "some_alt_metric{__schema__=~\"lalala\"}",
5979            "some_alt_metric{__database__!=\"greptime_private\"}",
5980            "some_alt_metric{__database__=~\"lalala\"}",
5981        ];
5982
5983        for query in queries {
5984            let prom_expr = parser::parse(query).unwrap();
5985            let eval_stmt = EvalStmt {
5986                expr: prom_expr,
5987                start: UNIX_EPOCH,
5988                end: UNIX_EPOCH
5989                    .checked_add(Duration::from_secs(100_000))
5990                    .unwrap(),
5991                interval: Duration::from_secs(5),
5992                lookback_delta: Duration::from_secs(1),
5993            };
5994
5995            let table_provider = build_test_table_provider(
5996                &[
5997                    (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5998                    (
5999                        "greptime_private".to_string(),
6000                        "some_alt_metric".to_string(),
6001                    ),
6002                ],
6003                1,
6004                1,
6005            )
6006            .await;
6007
6008            let plan =
6009                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6010                    .await;
6011            assert!(plan.is_err(), "query: {:?}", query);
6012        }
6013    }
6014
6015    #[tokio::test]
6016    async fn test_non_ms_precision() {
6017        let catalog_list = MemoryCatalogManager::with_default_setup();
6018        let columns = vec![
6019            ColumnSchema::new(
6020                "tag".to_string(),
6021                ConcreteDataType::string_datatype(),
6022                false,
6023            ),
6024            ColumnSchema::new(
6025                "timestamp".to_string(),
6026                ConcreteDataType::timestamp_nanosecond_datatype(),
6027                false,
6028            )
6029            .with_time_index(true),
6030            ColumnSchema::new(
6031                "field".to_string(),
6032                ConcreteDataType::float64_datatype(),
6033                true,
6034            ),
6035        ];
6036        let schema = Arc::new(Schema::new(columns));
6037        let table_meta = TableMetaBuilder::empty()
6038            .schema(schema)
6039            .primary_key_indices(vec![0])
6040            .value_indices(vec![2])
6041            .next_column_id(1024)
6042            .build()
6043            .unwrap();
6044        let table_info = TableInfoBuilder::default()
6045            .name("metrics".to_string())
6046            .meta(table_meta)
6047            .build()
6048            .unwrap();
6049        let table = EmptyTable::from_table_info(&table_info);
6050        assert!(
6051            catalog_list
6052                .register_table_sync(RegisterTableRequest {
6053                    catalog: DEFAULT_CATALOG_NAME.to_string(),
6054                    schema: DEFAULT_SCHEMA_NAME.to_string(),
6055                    table_name: "metrics".to_string(),
6056                    table_id: 1024,
6057                    table,
6058                })
6059                .is_ok()
6060        );
6061
6062        let plan = PromPlanner::stmt_to_plan(
6063            DfTableSourceProvider::new(
6064                catalog_list.clone(),
6065                false,
6066                QueryContext::arc(),
6067                DummyDecoder::arc(),
6068                true,
6069            ),
6070            &EvalStmt {
6071                expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
6072                start: UNIX_EPOCH,
6073                end: UNIX_EPOCH
6074                    .checked_add(Duration::from_secs(100_000))
6075                    .unwrap(),
6076                interval: Duration::from_secs(5),
6077                lookback_delta: Duration::from_secs(1),
6078            },
6079            &build_query_engine_state(),
6080        )
6081        .await
6082        .unwrap();
6083        assert_eq!(
6084            plan.display_indent_schema().to_string(),
6085            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6086            \n  PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6087            \n    Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6088            \n      Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6089            \n        Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6090            \n          TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
6091        );
6092        let plan = PromPlanner::stmt_to_plan(
6093            DfTableSourceProvider::new(
6094                catalog_list.clone(),
6095                false,
6096                QueryContext::arc(),
6097                DummyDecoder::arc(),
6098                true,
6099            ),
6100            &EvalStmt {
6101                expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
6102                start: UNIX_EPOCH,
6103                end: UNIX_EPOCH
6104                    .checked_add(Duration::from_secs(100_000))
6105                    .unwrap(),
6106                interval: Duration::from_secs(5),
6107                lookback_delta: Duration::from_secs(1),
6108            },
6109            &build_query_engine_state(),
6110        )
6111        .await
6112        .unwrap();
6113        assert_eq!(
6114            plan.display_indent_schema().to_string(),
6115            "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(ms), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
6116            \n  Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(ms), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
6117            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(ms), timestamp_range:Dictionary(Int64, Timestamp(ms))]\
6118            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6119            \n        PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6120            \n          Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6121            \n            Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-4999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6122            \n              Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
6123            \n                TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
6124        );
6125    }
6126
6127    #[tokio::test]
6128    async fn test_nonexistent_label() {
6129        // template
6130        let mut eval_stmt = EvalStmt {
6131            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6132            start: UNIX_EPOCH,
6133            end: UNIX_EPOCH
6134                .checked_add(Duration::from_secs(100_000))
6135                .unwrap(),
6136            interval: Duration::from_secs(5),
6137            lookback_delta: Duration::from_secs(1),
6138        };
6139
6140        let case = r#"some_metric{nonexistent="hi"}"#;
6141        let prom_expr = parser::parse(case).unwrap();
6142        eval_stmt.expr = prom_expr;
6143        let table_provider = build_test_table_provider(
6144            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
6145            3,
6146            3,
6147        )
6148        .await;
6149        // Should be ok
6150        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6151            .await
6152            .unwrap();
6153    }
6154
6155    #[tokio::test]
6156    async fn test_label_join() {
6157        let prom_expr = parser::parse(
6158            "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
6159        )
6160        .unwrap();
6161        let eval_stmt = EvalStmt {
6162            expr: prom_expr,
6163            start: UNIX_EPOCH,
6164            end: UNIX_EPOCH
6165                .checked_add(Duration::from_secs(100_000))
6166                .unwrap(),
6167            interval: Duration::from_secs(5),
6168            lookback_delta: Duration::from_secs(1),
6169        };
6170
6171        let table_provider =
6172            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
6173                .await;
6174        let plan =
6175            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6176                .await
6177                .unwrap();
6178
6179        let expected = r#"
6180Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
6181  Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
6182    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6183      PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6184        Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6185          Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6186            TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"#;
6187
6188        let ret = plan.display_indent_schema().to_string();
6189        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
6190    }
6191
6192    #[tokio::test]
6193    async fn test_label_replace() {
6194        let prom_expr = parser::parse(
6195            "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
6196        )
6197        .unwrap();
6198        let eval_stmt = EvalStmt {
6199            expr: prom_expr,
6200            start: UNIX_EPOCH,
6201            end: UNIX_EPOCH
6202                .checked_add(Duration::from_secs(100_000))
6203                .unwrap(),
6204            interval: Duration::from_secs(5),
6205            lookback_delta: Duration::from_secs(1),
6206        };
6207
6208        let table_provider =
6209            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
6210                .await;
6211        let plan =
6212            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6213                .await
6214                .unwrap();
6215
6216        let expected = r#"
6217Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6218  Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6219    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6220      PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6221        Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6222          Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6223            TableScan: up [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"#;
6224
6225        let ret = plan.display_indent_schema().to_string();
6226        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
6227    }
6228
6229    #[tokio::test]
6230    async fn test_matchers_to_expr() {
6231        let mut eval_stmt = EvalStmt {
6232            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6233            start: UNIX_EPOCH,
6234            end: UNIX_EPOCH
6235                .checked_add(Duration::from_secs(100_000))
6236                .unwrap(),
6237            interval: Duration::from_secs(5),
6238            lookback_delta: Duration::from_secs(1),
6239        };
6240        let case =
6241            r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
6242
6243        let prom_expr = parser::parse(case).unwrap();
6244        eval_stmt.expr = prom_expr;
6245        let table_provider = build_test_table_provider(
6246            &[(
6247                DEFAULT_SCHEMA_NAME.to_string(),
6248                "prometheus_tsdb_head_series".to_string(),
6249            )],
6250            3,
6251            3,
6252        )
6253        .await;
6254        let plan =
6255            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6256                .await
6257                .unwrap();
6258        let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6259        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6260        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6261        \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6262        \n        Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6263        \n          Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6264        \n            TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
6265        assert_eq!(plan.display_indent_schema().to_string(), expected);
6266    }
6267
6268    #[tokio::test]
6269    async fn test_topk_expr() {
6270        let mut eval_stmt = EvalStmt {
6271            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6272            start: UNIX_EPOCH,
6273            end: UNIX_EPOCH
6274                .checked_add(Duration::from_secs(100_000))
6275                .unwrap(),
6276            interval: Duration::from_secs(5),
6277            lookback_delta: Duration::from_secs(1),
6278        };
6279        let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6280
6281        let prom_expr = parser::parse(case).unwrap();
6282        eval_stmt.expr = prom_expr;
6283        let table_provider = build_test_table_provider_with_fields(
6284            &[
6285                (
6286                    DEFAULT_SCHEMA_NAME.to_string(),
6287                    "prometheus_tsdb_head_series".to_string(),
6288                ),
6289                (
6290                    DEFAULT_SCHEMA_NAME.to_string(),
6291                    "http_server_requests_seconds_count".to_string(),
6292                ),
6293            ],
6294            &["ip"],
6295        )
6296        .await;
6297
6298        let plan =
6299            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6300                .await
6301                .unwrap();
6302        let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(ms)]\
6303        \n  Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6304        \n    Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6305        \n      WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6306        \n        Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6307        \n          Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6308        \n            PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6309        \n              PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6310        \n                Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6311        \n                  Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6312        \n                    TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6313
6314        assert_eq!(plan.display_indent_schema().to_string(), expected);
6315    }
6316
6317    #[tokio::test]
6318    async fn test_count_values_expr() {
6319        let mut eval_stmt = EvalStmt {
6320            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6321            start: UNIX_EPOCH,
6322            end: UNIX_EPOCH
6323                .checked_add(Duration::from_secs(100_000))
6324                .unwrap(),
6325            interval: Duration::from_secs(5),
6326            lookback_delta: Duration::from_secs(1),
6327        };
6328        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6329
6330        let prom_expr = parser::parse(case).unwrap();
6331        eval_stmt.expr = prom_expr;
6332        let table_provider = build_test_table_provider_with_fields(
6333            &[
6334                (
6335                    DEFAULT_SCHEMA_NAME.to_string(),
6336                    "prometheus_tsdb_head_series".to_string(),
6337                ),
6338                (
6339                    DEFAULT_SCHEMA_NAME.to_string(),
6340                    "http_server_requests_seconds_count".to_string(),
6341                ),
6342            ],
6343            &["ip"],
6344        )
6345        .await;
6346
6347        let plan =
6348            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6349                .await
6350                .unwrap();
6351        let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N]\
6352        \n  Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]\
6353        \n    Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]\
6354        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
6355        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6356        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6357        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6358        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6359        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6360
6361        assert_eq!(plan.display_indent_schema().to_string(), expected);
6362    }
6363
6364    #[tokio::test]
6365    async fn test_value_alias() {
6366        let mut eval_stmt = EvalStmt {
6367            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6368            start: UNIX_EPOCH,
6369            end: UNIX_EPOCH
6370                .checked_add(Duration::from_secs(100_000))
6371                .unwrap(),
6372            interval: Duration::from_secs(5),
6373            lookback_delta: Duration::from_secs(1),
6374        };
6375        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6376
6377        let prom_expr = parser::parse(case).unwrap();
6378        eval_stmt.expr = prom_expr;
6379        eval_stmt = QueryLanguageParser::apply_alias_extension(eval_stmt, "my_series");
6380        let table_provider = build_test_table_provider_with_fields(
6381            &[
6382                (
6383                    DEFAULT_SCHEMA_NAME.to_string(),
6384                    "prometheus_tsdb_head_series".to_string(),
6385                ),
6386                (
6387                    DEFAULT_SCHEMA_NAME.to_string(),
6388                    "http_server_requests_seconds_count".to_string(),
6389                ),
6390            ],
6391            &["ip"],
6392        )
6393        .await;
6394
6395        let plan =
6396            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6397                .await
6398                .unwrap();
6399        let expected = r#"
6400Projection: count(prometheus_tsdb_head_series.greptime_value) AS my_series, prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [my_series:Int64, ip:Utf8, greptime_timestamp:Timestamp(ms)]
6401  Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N]
6402    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]
6403      Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]
6404        Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
6405          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6406            PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6407              Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6408                Filter: prometheus_tsdb_head_series.ip ~ Utf8("^(?:(10.0.160.237:8080|10.0.160.237:9090))$") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6409                  TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]"#;
6410        assert_eq!(format!("\n{}", plan.display_indent_schema()), expected);
6411    }
6412
6413    #[tokio::test]
6414    async fn test_quantile_expr() {
6415        let mut eval_stmt = EvalStmt {
6416            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6417            start: UNIX_EPOCH,
6418            end: UNIX_EPOCH
6419                .checked_add(Duration::from_secs(100_000))
6420                .unwrap(),
6421            interval: Duration::from_secs(5),
6422            lookback_delta: Duration::from_secs(1),
6423        };
6424        let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6425
6426        let prom_expr = parser::parse(case).unwrap();
6427        eval_stmt.expr = prom_expr;
6428        let table_provider = build_test_table_provider_with_fields(
6429            &[
6430                (
6431                    DEFAULT_SCHEMA_NAME.to_string(),
6432                    "prometheus_tsdb_head_series".to_string(),
6433                ),
6434                (
6435                    DEFAULT_SCHEMA_NAME.to_string(),
6436                    "http_server_requests_seconds_count".to_string(),
6437                ),
6438            ],
6439            &["ip"],
6440        )
6441        .await;
6442
6443        let plan =
6444            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6445                .await
6446                .unwrap();
6447        let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(ms), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6448        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(ms), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6449        \n    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6450        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6451        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6452        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6453        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6454        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6455        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6456
6457        assert_eq!(plan.display_indent_schema().to_string(), expected);
6458    }
6459
6460    #[tokio::test]
6461    async fn test_or_not_exists_table_label() {
6462        let mut eval_stmt = EvalStmt {
6463            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6464            start: UNIX_EPOCH,
6465            end: UNIX_EPOCH
6466                .checked_add(Duration::from_secs(100_000))
6467                .unwrap(),
6468            interval: Duration::from_secs(5),
6469            lookback_delta: Duration::from_secs(1),
6470        };
6471        let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
6472
6473        let prom_expr = parser::parse(case).unwrap();
6474        eval_stmt.expr = prom_expr;
6475        let table_provider = build_test_table_provider_with_fields(
6476            &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
6477            &["job"],
6478        )
6479        .await;
6480
6481        let plan =
6482            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6483                .await
6484                .unwrap();
6485        let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6486  SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6487    Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6488      Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(ms), sum(metric_exists.greptime_value):Float64;N]
6489        Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(ms), sum(metric_exists.greptime_value):Float64;N]
6490          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6491            PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6492              Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6493                Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-999, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100000000, None) [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6494                  TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6495  SubqueryAlias:  [greptime_timestamp:Timestamp(ms), job:Utf8;N, sum(.value):Float64;N]
6496    Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(ms), job:Utf8;N, sum(.value):Float64;N]
6497      Sort: .time ASC NULLS LAST [time:Timestamp(ms), sum(.value):Float64;N]
6498        Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(ms), sum(.value):Float64;N]
6499          EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(ms), value:Float64;N]
6500            TableScan: dummy [time:Timestamp(ms), value:Float64;N]"#;
6501
6502        assert_eq!(plan.display_indent_schema().to_string(), expected);
6503    }
6504
6505    #[tokio::test]
6506    async fn test_histogram_quantile_missing_le_column() {
6507        let mut eval_stmt = EvalStmt {
6508            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6509            start: UNIX_EPOCH,
6510            end: UNIX_EPOCH
6511                .checked_add(Duration::from_secs(100_000))
6512                .unwrap(),
6513            interval: Duration::from_secs(5),
6514            lookback_delta: Duration::from_secs(1),
6515        };
6516
6517        // Test case: histogram_quantile with a table that doesn't have 'le' column
6518        let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
6519
6520        let prom_expr = parser::parse(case).unwrap();
6521        eval_stmt.expr = prom_expr;
6522
6523        // Create a table provider with a table that doesn't have 'le' column
6524        let table_provider = build_test_table_provider_with_fields(
6525            &[(
6526                DEFAULT_SCHEMA_NAME.to_string(),
6527                "non_existent_histogram_bucket".to_string(),
6528            )],
6529            &["pod", "instance"], // Note: no 'le' column
6530        )
6531        .await;
6532
6533        // Should return empty result instead of error
6534        let result =
6535            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6536                .await;
6537
6538        // This should succeed now (returning empty result) instead of failing with "Cannot find column le"
6539        assert!(
6540            result.is_ok(),
6541            "Expected successful plan creation with empty result, but got error: {:?}",
6542            result.err()
6543        );
6544
6545        // Verify that the result is an EmptyRelation
6546        let plan = result.unwrap();
6547        match plan {
6548            LogicalPlan::EmptyRelation(_) => {
6549                // This is what we expect
6550            }
6551            _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
6552        }
6553    }
6554}