query/promql/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::greptime_value;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
32use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
33use datafusion::functions_aggregate::sum::sum_udaf;
34use datafusion::functions_aggregate::variance::var_pop_udaf;
35use datafusion::functions_window::row_number::RowNumber;
36use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
37use datafusion::logical_expr::expr_rewriter::normalize_cols;
38use datafusion::logical_expr::{
39    BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
40    ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
41};
42use datafusion::prelude as df_prelude;
43use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
44use datafusion::scalar::ScalarValue;
45use datafusion::sql::TableReference;
46use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{
51    ExprSchemable, Literal, Projection, SortExpr, TableScan, TableSource, col, lit,
52};
53use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
54use datatypes::data_type::ConcreteDataType;
55use itertools::Itertools;
56use once_cell::sync::Lazy;
57use promql::extension_plan::{
58    Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
59    ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
60};
61use promql::functions::{
62    AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
63    Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
64    QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
65    quantile_udaf,
66};
67use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
68use promql_parser::parser::token::TokenType;
69use promql_parser::parser::value::ValueType;
70use promql_parser::parser::{
71    AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
72    Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
73    Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
74    VectorSelector, token,
75};
76use regex::{self, Regex};
77use snafu::{OptionExt, ResultExt, ensure};
78use store_api::metric_engine_consts::{
79    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
80    METRIC_ENGINE_NAME, is_metric_engine_internal_column,
81};
82use table::table::adapter::DfTableProviderAdapter;
83
84use crate::parser::{
85    ALIAS_NODE_NAME, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, AliasExpr, EXPLAIN_NODE_NAME,
86    EXPLAIN_VERBOSE_NODE_NAME,
87};
88use crate::promql::error::{
89    CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
90    ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
91    InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
92    MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
93    Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
94    UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
95    UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
96    ZeroRangeSelectorSnafu,
97};
98use crate::query_engine::QueryEngineState;
99
100/// `time()` function in PromQL.
101const SPECIAL_TIME_FUNCTION: &str = "time";
102/// `scalar()` function in PromQL.
103const SCALAR_FUNCTION: &str = "scalar";
104/// `absent()` function in PromQL
105const SPECIAL_ABSENT_FUNCTION: &str = "absent";
106/// `histogram_quantile` function in PromQL
107const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
108/// `vector` function in PromQL
109const SPECIAL_VECTOR_FUNCTION: &str = "vector";
110/// `le` column for conventional histogram.
111const LE_COLUMN_NAME: &str = "le";
112
113/// Static regex for validating label names according to Prometheus specification.
114/// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
115static LABEL_NAME_REGEX: Lazy<Regex> =
116    Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
117
118const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
119
120/// default value column name for empty metric
121const DEFAULT_FIELD_COLUMN: &str = "value";
122
123/// Special modifier to project field columns under multi-field mode
124const FIELD_COLUMN_MATCHER: &str = "__field__";
125
126/// Special modifier for cross schema query
127const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
128const DB_COLUMN_MATCHER: &str = "__database__";
129
130/// Threshold for scatter scan mode
131const MAX_SCATTER_POINTS: i64 = 400;
132
133/// Interval 1 hour in millisecond
134const INTERVAL_1H: i64 = 60 * 60 * 1000;
135
136#[derive(Default, Debug, Clone)]
137struct PromPlannerContext {
138    // query parameters
139    start: Millisecond,
140    end: Millisecond,
141    interval: Millisecond,
142    lookback_delta: Millisecond,
143
144    // planner states
145    table_name: Option<String>,
146    time_index_column: Option<String>,
147    field_columns: Vec<String>,
148    tag_columns: Vec<String>,
149    /// Use metric engine internal series identifier column (`__tsid`) as series key.
150    ///
151    /// This is enabled only when the underlying scan can provide `__tsid` (`UInt64`). The planner
152    /// uses it internally (e.g. as the series key for [`SeriesDivide`]) and strips it from the
153    /// final output.
154    use_tsid: bool,
155    /// The matcher for field columns `__field__`.
156    field_column_matcher: Option<Vec<Matcher>>,
157    /// The matcher for selectors (normal matchers).
158    selector_matcher: Vec<Matcher>,
159    schema_name: Option<String>,
160    /// The range in millisecond of range selector. None if there is no range selector.
161    range: Option<Millisecond>,
162}
163
164impl PromPlannerContext {
165    fn from_eval_stmt(stmt: &EvalStmt) -> Self {
166        Self {
167            start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
168            end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
169            interval: stmt.interval.as_millis() as _,
170            lookback_delta: stmt.lookback_delta.as_millis() as _,
171            ..Default::default()
172        }
173    }
174
175    /// Reset all planner states
176    fn reset(&mut self) {
177        self.table_name = None;
178        self.time_index_column = None;
179        self.field_columns = vec![];
180        self.tag_columns = vec![];
181        self.use_tsid = false;
182        self.field_column_matcher = None;
183        self.selector_matcher.clear();
184        self.schema_name = None;
185        self.range = None;
186    }
187
188    /// Reset table name and schema to empty
189    fn reset_table_name_and_schema(&mut self) {
190        self.table_name = Some(String::new());
191        self.schema_name = None;
192        self.use_tsid = false;
193    }
194
195    /// Check if `le` is present in tag columns
196    fn has_le_tag(&self) -> bool {
197        self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
198    }
199}
200
201pub struct PromPlanner {
202    table_provider: DfTableSourceProvider,
203    ctx: PromPlannerContext,
204}
205
206impl PromPlanner {
207    pub async fn stmt_to_plan(
208        table_provider: DfTableSourceProvider,
209        stmt: &EvalStmt,
210        query_engine_state: &QueryEngineState,
211    ) -> Result<LogicalPlan> {
212        let mut planner = Self {
213            table_provider,
214            ctx: PromPlannerContext::from_eval_stmt(stmt),
215        };
216
217        let plan = planner
218            .prom_expr_to_plan(&stmt.expr, query_engine_state)
219            .await?;
220
221        // Never leak internal series identifier to output.
222        planner.strip_tsid_column(plan)
223    }
224
225    pub async fn prom_expr_to_plan(
226        &mut self,
227        prom_expr: &PromExpr,
228        query_engine_state: &QueryEngineState,
229    ) -> Result<LogicalPlan> {
230        self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
231            .await
232    }
233
234    /**
235    Converts a PromQL expression to a logical plan.
236
237    NOTE:
238        The `timestamp_fn` indicates whether the PromQL `timestamp()` function is being evaluated in the current context.
239        If `true`, the planner generates a logical plan that projects the timestamp (time index) column
240        as the value column for each input row, implementing the PromQL `timestamp()` function semantics.
241        If `false`, the planner generates the standard logical plan for the given PromQL expression.
242    */
243    #[async_recursion]
244    async fn prom_expr_to_plan_inner(
245        &mut self,
246        prom_expr: &PromExpr,
247        timestamp_fn: bool,
248        query_engine_state: &QueryEngineState,
249    ) -> Result<LogicalPlan> {
250        let res = match prom_expr {
251            PromExpr::Aggregate(expr) => {
252                self.prom_aggr_expr_to_plan(query_engine_state, expr)
253                    .await?
254            }
255            PromExpr::Unary(expr) => {
256                self.prom_unary_expr_to_plan(query_engine_state, expr)
257                    .await?
258            }
259            PromExpr::Binary(expr) => {
260                self.prom_binary_expr_to_plan(query_engine_state, expr)
261                    .await?
262            }
263            PromExpr::Paren(ParenExpr { expr }) => {
264                self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
265                    .await?
266            }
267            PromExpr::Subquery(expr) => {
268                self.prom_subquery_expr_to_plan(query_engine_state, expr)
269                    .await?
270            }
271            PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
272            PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
273            PromExpr::VectorSelector(selector) => {
274                self.prom_vector_selector_to_plan(selector, timestamp_fn)
275                    .await?
276            }
277            PromExpr::MatrixSelector(selector) => {
278                self.prom_matrix_selector_to_plan(selector).await?
279            }
280            PromExpr::Call(expr) => {
281                self.prom_call_expr_to_plan(query_engine_state, expr)
282                    .await?
283            }
284            PromExpr::Extension(expr) => {
285                self.prom_ext_expr_to_plan(query_engine_state, expr).await?
286            }
287        };
288
289        Ok(res)
290    }
291
292    async fn prom_subquery_expr_to_plan(
293        &mut self,
294        query_engine_state: &QueryEngineState,
295        subquery_expr: &SubqueryExpr,
296    ) -> Result<LogicalPlan> {
297        let SubqueryExpr {
298            expr, range, step, ..
299        } = subquery_expr;
300
301        let current_interval = self.ctx.interval;
302        if let Some(step) = step {
303            self.ctx.interval = step.as_millis() as _;
304        }
305        let current_start = self.ctx.start;
306        self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
307        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
308        self.ctx.interval = current_interval;
309        self.ctx.start = current_start;
310
311        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
312        let range_ms = range.as_millis() as _;
313        self.ctx.range = Some(range_ms);
314
315        let manipulate = RangeManipulate::new(
316            self.ctx.start,
317            self.ctx.end,
318            self.ctx.interval,
319            range_ms,
320            self.ctx
321                .time_index_column
322                .clone()
323                .expect("time index should be set in `setup_context`"),
324            self.ctx.field_columns.clone(),
325            input,
326        )
327        .context(DataFusionPlanningSnafu)?;
328
329        Ok(LogicalPlan::Extension(Extension {
330            node: Arc::new(manipulate),
331        }))
332    }
333
334    async fn prom_aggr_expr_to_plan(
335        &mut self,
336        query_engine_state: &QueryEngineState,
337        aggr_expr: &AggregateExpr,
338    ) -> Result<LogicalPlan> {
339        let AggregateExpr {
340            op,
341            expr,
342            modifier,
343            param,
344        } = aggr_expr;
345
346        let mut input = self.prom_expr_to_plan(expr, query_engine_state).await?;
347        let input_has_tsid = input.schema().fields().iter().any(|field| {
348            field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
349                && field.data_type() == &ArrowDataType::UInt64
350        });
351
352        // `__tsid` based scan projection may prune tag columns. Ensure tags referenced in
353        // aggregation modifiers (`by`/`without`) are available before planning group keys.
354        let required_group_tags = match modifier {
355            None => BTreeSet::new(),
356            Some(LabelModifier::Include(labels)) => labels
357                .labels
358                .iter()
359                .filter(|label| !is_metric_engine_internal_column(label.as_str()))
360                .cloned()
361                .collect(),
362            Some(LabelModifier::Exclude(labels)) => {
363                let mut all_tags = self.collect_row_key_tag_columns_from_plan(&input)?;
364                for label in &labels.labels {
365                    let _ = all_tags.remove(label);
366                }
367                all_tags
368            }
369        };
370
371        if !required_group_tags.is_empty()
372            && required_group_tags
373                .iter()
374                .any(|tag| Self::find_case_sensitive_column(input.schema(), tag.as_str()).is_none())
375        {
376            input = self.ensure_tag_columns_available(input, &required_group_tags)?;
377            self.refresh_tag_columns_from_schema(input.schema());
378        }
379
380        match (*op).id() {
381            token::T_TOPK | token::T_BOTTOMK => {
382                self.prom_topk_bottomk_to_plan(aggr_expr, input).await
383            }
384            _ => {
385                // When `__tsid` is available, tag columns may have been pruned from the input plan.
386                // For `keep_tsid` decision we should compare against the full row-key label set,
387                // otherwise we may incorrectly treat label-reducing aggregates as preserving labels.
388                let input_tag_columns = if input_has_tsid {
389                    self.collect_row_key_tag_columns_from_plan(&input)?
390                        .into_iter()
391                        .collect::<Vec<_>>()
392                } else {
393                    self.ctx.tag_columns.clone()
394                };
395                // calculate columns to group by
396                // Need to append time index column into group by columns
397                let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
398                // convert op and value columns to aggregate exprs
399                let (mut aggr_exprs, prev_field_exprs) =
400                    self.create_aggregate_exprs(*op, param, &input)?;
401
402                let keep_tsid = op.id() != token::T_COUNT_VALUES
403                    && input_has_tsid
404                    && input_tag_columns.iter().collect::<HashSet<_>>()
405                        == self.ctx.tag_columns.iter().collect::<HashSet<_>>();
406
407                if keep_tsid {
408                    aggr_exprs.push(
409                        first_value(
410                            DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)),
411                            vec![],
412                        )
413                        .alias(DATA_SCHEMA_TSID_COLUMN_NAME),
414                    );
415                }
416                self.ctx.use_tsid = keep_tsid;
417
418                // create plan
419                let builder = LogicalPlanBuilder::from(input);
420                let builder = if op.id() == token::T_COUNT_VALUES {
421                    let label = Self::get_param_value_as_str(*op, param)?;
422                    // `count_values` must be grouped by fields,
423                    // and project the fields to the new label.
424                    group_exprs.extend(prev_field_exprs.clone());
425                    let project_fields = self
426                        .create_field_column_exprs()?
427                        .into_iter()
428                        .chain(self.create_tag_column_exprs()?)
429                        .chain(Some(self.create_time_index_column_expr()?))
430                        .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
431
432                    builder
433                        .aggregate(group_exprs.clone(), aggr_exprs)
434                        .context(DataFusionPlanningSnafu)?
435                        .project(project_fields)
436                        .context(DataFusionPlanningSnafu)?
437                } else {
438                    builder
439                        .aggregate(group_exprs.clone(), aggr_exprs)
440                        .context(DataFusionPlanningSnafu)?
441                };
442
443                let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
444
445                builder
446                    .sort(sort_expr)
447                    .context(DataFusionPlanningSnafu)?
448                    .build()
449                    .context(DataFusionPlanningSnafu)
450            }
451        }
452    }
453
454    /// Create logical plan for PromQL topk and bottomk expr.
455    async fn prom_topk_bottomk_to_plan(
456        &mut self,
457        aggr_expr: &AggregateExpr,
458        input: LogicalPlan,
459    ) -> Result<LogicalPlan> {
460        let AggregateExpr {
461            op,
462            param,
463            modifier,
464            ..
465        } = aggr_expr;
466
467        let input_has_tsid = input.schema().fields().iter().any(|field| {
468            field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
469                && field.data_type() == &ArrowDataType::UInt64
470        });
471        self.ctx.use_tsid = input_has_tsid;
472
473        let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
474
475        let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
476
477        // convert op and value columns to window exprs.
478        let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
479
480        let rank_columns: Vec<_> = window_exprs
481            .iter()
482            .map(|expr| expr.schema_name().to_string())
483            .collect();
484
485        // Create ranks filter with `Operator::Or`.
486        // Safety: at least one rank column
487        let filter: DfExpr = rank_columns
488            .iter()
489            .fold(None, |expr, rank| {
490                let predicate = DfExpr::BinaryExpr(BinaryExpr {
491                    left: Box::new(col(rank)),
492                    op: Operator::LtEq,
493                    right: Box::new(val.clone()),
494                });
495
496                match expr {
497                    None => Some(predicate),
498                    Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
499                        left: Box::new(expr),
500                        op: Operator::Or,
501                        right: Box::new(predicate),
502                    })),
503                }
504            })
505            .unwrap();
506
507        let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
508
509        let mut new_group_exprs = group_exprs.clone();
510        // Order by ranks
511        new_group_exprs.extend(rank_columns);
512
513        let group_sort_expr = new_group_exprs
514            .into_iter()
515            .map(|expr| expr.sort(true, false));
516
517        let project_fields = self
518            .create_field_column_exprs()?
519            .into_iter()
520            .chain(self.create_tag_column_exprs()?)
521            .chain(
522                self.ctx
523                    .use_tsid
524                    .then_some(DfExpr::Column(Column::from_name(
525                        DATA_SCHEMA_TSID_COLUMN_NAME,
526                    )))
527                    .into_iter(),
528            )
529            .chain(Some(self.create_time_index_column_expr()?));
530
531        LogicalPlanBuilder::from(input)
532            .window(window_exprs)
533            .context(DataFusionPlanningSnafu)?
534            .filter(filter)
535            .context(DataFusionPlanningSnafu)?
536            .sort(group_sort_expr)
537            .context(DataFusionPlanningSnafu)?
538            .project(project_fields)
539            .context(DataFusionPlanningSnafu)?
540            .build()
541            .context(DataFusionPlanningSnafu)
542    }
543
544    async fn prom_unary_expr_to_plan(
545        &mut self,
546        query_engine_state: &QueryEngineState,
547        unary_expr: &UnaryExpr,
548    ) -> Result<LogicalPlan> {
549        let UnaryExpr { expr } = unary_expr;
550        // Unary Expr in PromQL implys the `-` operator
551        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
552        self.projection_for_each_field_column(input, |col| {
553            Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
554        })
555    }
556
557    async fn prom_binary_expr_to_plan(
558        &mut self,
559        query_engine_state: &QueryEngineState,
560        binary_expr: &PromBinaryExpr,
561    ) -> Result<LogicalPlan> {
562        let PromBinaryExpr {
563            lhs,
564            rhs,
565            op,
566            modifier,
567        } = binary_expr;
568
569        // if set to true, comparison operator will return 0/1 (for true/false) instead of
570        // filter on the result column
571        let should_return_bool = if let Some(m) = modifier {
572            m.return_bool
573        } else {
574            false
575        };
576        let is_comparison_op = Self::is_token_a_comparison_op(*op);
577
578        // we should build a filter plan here if the op is comparison op and need not
579        // to return 0/1. Otherwise, we should build a projection plan
580        match (
581            Self::try_build_literal_expr(lhs),
582            Self::try_build_literal_expr(rhs),
583        ) {
584            (Some(lhs), Some(rhs)) => {
585                self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
586                self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
587                self.ctx.reset_table_name_and_schema();
588                let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
589                let mut field_expr = field_expr_builder(lhs, rhs)?;
590
591                if is_comparison_op && should_return_bool {
592                    field_expr = DfExpr::Cast(Cast {
593                        expr: Box::new(field_expr),
594                        data_type: ArrowDataType::Float64,
595                    });
596                }
597
598                Ok(LogicalPlan::Extension(Extension {
599                    node: Arc::new(
600                        EmptyMetric::new(
601                            self.ctx.start,
602                            self.ctx.end,
603                            self.ctx.interval,
604                            SPECIAL_TIME_FUNCTION.to_string(),
605                            DEFAULT_FIELD_COLUMN.to_string(),
606                            Some(field_expr),
607                        )
608                        .context(DataFusionPlanningSnafu)?,
609                    ),
610                }))
611            }
612            // lhs is a literal, rhs is a column
613            (Some(mut expr), None) => {
614                let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
615                // check if the literal is a special time expr
616                if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
617                    expr = time_expr
618                }
619                let bin_expr_builder = |col: &String| {
620                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
621                    let mut binary_expr =
622                        binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
623
624                    if is_comparison_op && should_return_bool {
625                        binary_expr = DfExpr::Cast(Cast {
626                            expr: Box::new(binary_expr),
627                            data_type: ArrowDataType::Float64,
628                        });
629                    }
630                    Ok(binary_expr)
631                };
632                if is_comparison_op && !should_return_bool {
633                    self.filter_on_field_column(input, bin_expr_builder)
634                } else {
635                    self.projection_for_each_field_column(input, bin_expr_builder)
636                }
637            }
638            // lhs is a column, rhs is a literal
639            (None, Some(mut expr)) => {
640                let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
641                // check if the literal is a special time expr
642                if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
643                    expr = time_expr
644                }
645                let bin_expr_builder = |col: &String| {
646                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
647                    let mut binary_expr =
648                        binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
649
650                    if is_comparison_op && should_return_bool {
651                        binary_expr = DfExpr::Cast(Cast {
652                            expr: Box::new(binary_expr),
653                            data_type: ArrowDataType::Float64,
654                        });
655                    }
656                    Ok(binary_expr)
657                };
658                if is_comparison_op && !should_return_bool {
659                    self.filter_on_field_column(input, bin_expr_builder)
660                } else {
661                    self.projection_for_each_field_column(input, bin_expr_builder)
662                }
663            }
664            // both are columns. join them on time index
665            (None, None) => {
666                let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
667                let left_field_columns = self.ctx.field_columns.clone();
668                let left_time_index_column = self.ctx.time_index_column.clone();
669                let mut left_table_ref = self
670                    .table_ref()
671                    .unwrap_or_else(|_| TableReference::bare(""));
672                let left_context = self.ctx.clone();
673
674                let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
675                let right_field_columns = self.ctx.field_columns.clone();
676                let right_time_index_column = self.ctx.time_index_column.clone();
677                let mut right_table_ref = self
678                    .table_ref()
679                    .unwrap_or_else(|_| TableReference::bare(""));
680                let right_context = self.ctx.clone();
681
682                // TODO(ruihang): avoid join if left and right are the same table
683
684                // set op has "special" join semantics
685                if Self::is_token_a_set_op(*op) {
686                    return self.set_op_on_non_field_columns(
687                        left_input,
688                        right_input,
689                        left_context,
690                        right_context,
691                        *op,
692                        modifier,
693                    );
694                }
695
696                // normal join
697                if left_table_ref == right_table_ref {
698                    // rename table references to avoid ambiguity
699                    left_table_ref = TableReference::bare("lhs");
700                    right_table_ref = TableReference::bare("rhs");
701                    // `self.ctx` have ctx in right plan, if right plan have no tag,
702                    // we use left plan ctx as the ctx for subsequent calculations,
703                    // to avoid case like `host + scalar(...)`
704                    // we need preserve tag column on `host` table in subsequent projection,
705                    // which only show in left plan ctx.
706                    if self.ctx.tag_columns.is_empty() {
707                        self.ctx = left_context.clone();
708                        self.ctx.table_name = Some("lhs".to_string());
709                    } else {
710                        self.ctx.table_name = Some("rhs".to_string());
711                    }
712                }
713                let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
714
715                let join_plan = self.join_on_non_field_columns(
716                    left_input,
717                    right_input,
718                    left_table_ref.clone(),
719                    right_table_ref.clone(),
720                    left_time_index_column,
721                    right_time_index_column,
722                    // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
723                    // under this case we only join on time index
724                    left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
725                    modifier,
726                )?;
727                let join_plan_schema = join_plan.schema().clone();
728
729                let bin_expr_builder = |_: &String| {
730                    let (left_col_name, right_col_name) = field_columns.next().unwrap();
731                    let left_col = join_plan_schema
732                        .qualified_field_with_name(Some(&left_table_ref), left_col_name)
733                        .context(DataFusionPlanningSnafu)?
734                        .into();
735                    let right_col = join_plan_schema
736                        .qualified_field_with_name(Some(&right_table_ref), right_col_name)
737                        .context(DataFusionPlanningSnafu)?
738                        .into();
739
740                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
741                    let mut binary_expr =
742                        binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
743                    if is_comparison_op && should_return_bool {
744                        binary_expr = DfExpr::Cast(Cast {
745                            expr: Box::new(binary_expr),
746                            data_type: ArrowDataType::Float64,
747                        });
748                    }
749                    Ok(binary_expr)
750                };
751                if is_comparison_op && !should_return_bool {
752                    // PromQL comparison operators without `bool` are filters:
753                    //   - keep the instant-vector side sample values
754                    //   - drop samples where the comparison is false
755                    //
756                    // So we filter on the join result and then project only the side that should
757                    // be preserved according to PromQL semantics.
758                    let filtered = self.filter_on_field_column(join_plan, bin_expr_builder)?;
759                    let (project_table_ref, project_context) =
760                        match (lhs.value_type(), rhs.value_type()) {
761                            (ValueType::Scalar, ValueType::Vector) => {
762                                (&right_table_ref, &right_context)
763                            }
764                            _ => (&left_table_ref, &left_context),
765                        };
766                    self.project_binary_join_side(filtered, project_table_ref, project_context)
767                } else {
768                    self.projection_for_each_field_column(join_plan, bin_expr_builder)
769                }
770            }
771        }
772    }
773
774    fn project_binary_join_side(
775        &mut self,
776        input: LogicalPlan,
777        table_ref: &TableReference,
778        context: &PromPlannerContext,
779    ) -> Result<LogicalPlan> {
780        let schema = input.schema();
781
782        let mut project_exprs =
783            Vec::with_capacity(context.tag_columns.len() + context.field_columns.len() + 2);
784
785        // Project time index from the chosen side.
786        if let Some(time_index_column) = &context.time_index_column {
787            let time_index_col = schema
788                .qualified_field_with_name(Some(table_ref), time_index_column)
789                .context(DataFusionPlanningSnafu)?
790                .into();
791            project_exprs.push(DfExpr::Column(time_index_col));
792        }
793
794        // Project field columns from the chosen side.
795        for field_column in &context.field_columns {
796            let field_col = schema
797                .qualified_field_with_name(Some(table_ref), field_column)
798                .context(DataFusionPlanningSnafu)?
799                .into();
800            project_exprs.push(DfExpr::Column(field_col));
801        }
802
803        // Project tag columns from the chosen side.
804        for tag_column in &context.tag_columns {
805            let tag_col = schema
806                .qualified_field_with_name(Some(table_ref), tag_column)
807                .context(DataFusionPlanningSnafu)?
808                .into();
809            project_exprs.push(DfExpr::Column(tag_col));
810        }
811
812        // Preserve `__tsid` if present, so it can still be used internally downstream. It's
813        // stripped from the final output anyway.
814        if context.use_tsid
815            && let Ok(tsid_col) =
816                schema.qualified_field_with_name(Some(table_ref), DATA_SCHEMA_TSID_COLUMN_NAME)
817        {
818            project_exprs.push(DfExpr::Column(tsid_col.into()));
819        }
820
821        let plan = LogicalPlanBuilder::from(input)
822            .project(project_exprs)
823            .context(DataFusionPlanningSnafu)?
824            .build()
825            .context(DataFusionPlanningSnafu)?;
826
827        // Update context to reflect the projected schema. Don't keep a table qualifier since
828        // the result is a derived expression.
829        self.ctx = context.clone();
830        self.ctx.table_name = None;
831        self.ctx.schema_name = None;
832
833        Ok(plan)
834    }
835
836    fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
837        let NumberLiteral { val } = number_literal;
838        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
839        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
840        self.ctx.reset_table_name_and_schema();
841        let literal_expr = df_prelude::lit(*val);
842
843        let plan = LogicalPlan::Extension(Extension {
844            node: Arc::new(
845                EmptyMetric::new(
846                    self.ctx.start,
847                    self.ctx.end,
848                    self.ctx.interval,
849                    SPECIAL_TIME_FUNCTION.to_string(),
850                    DEFAULT_FIELD_COLUMN.to_string(),
851                    Some(literal_expr),
852                )
853                .context(DataFusionPlanningSnafu)?,
854            ),
855        });
856        Ok(plan)
857    }
858
859    fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
860        let StringLiteral { val } = string_literal;
861        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
862        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
863        self.ctx.reset_table_name_and_schema();
864        let literal_expr = df_prelude::lit(val.clone());
865
866        let plan = LogicalPlan::Extension(Extension {
867            node: Arc::new(
868                EmptyMetric::new(
869                    self.ctx.start,
870                    self.ctx.end,
871                    self.ctx.interval,
872                    SPECIAL_TIME_FUNCTION.to_string(),
873                    DEFAULT_FIELD_COLUMN.to_string(),
874                    Some(literal_expr),
875                )
876                .context(DataFusionPlanningSnafu)?,
877            ),
878        });
879        Ok(plan)
880    }
881
882    async fn prom_vector_selector_to_plan(
883        &mut self,
884        vector_selector: &VectorSelector,
885        timestamp_fn: bool,
886    ) -> Result<LogicalPlan> {
887        let VectorSelector {
888            name,
889            offset,
890            matchers,
891            at: _,
892        } = vector_selector;
893        let matchers = self.preprocess_label_matchers(matchers, name)?;
894        if let Some(empty_plan) = self.setup_context().await? {
895            return Ok(empty_plan);
896        }
897        let normalize = self
898            .selector_to_series_normalize_plan(offset, matchers, false)
899            .await?;
900
901        let normalize = if timestamp_fn {
902            // If evaluating the PromQL `timestamp()` function, project the time index column as the value column
903            // before wrapping with [`InstantManipulate`], so the output matches PromQL's `timestamp()` semantics.
904            self.create_timestamp_func_plan(normalize)?
905        } else {
906            normalize
907        };
908
909        let manipulate = InstantManipulate::new(
910            self.ctx.start,
911            self.ctx.end,
912            self.ctx.lookback_delta,
913            self.ctx.interval,
914            self.ctx
915                .time_index_column
916                .clone()
917                .expect("time index should be set in `setup_context`"),
918            self.ctx.field_columns.first().cloned(),
919            normalize,
920        );
921        Ok(LogicalPlan::Extension(Extension {
922            node: Arc::new(manipulate),
923        }))
924    }
925
926    /// Builds a projection plan for the PromQL `timestamp()` function.
927    /// Projects the time index column as the value column for each row.
928    ///
929    /// # Arguments
930    /// * `normalize` - Input [`LogicalPlan`] for the normalized series.
931    ///
932    /// # Returns
933    /// Returns a [`Result<LogicalPlan>`] where the resulting logical plan projects the timestamp
934    /// column as the value column, along with the original tag and time index columns.
935    ///
936    /// # Timestamp vs. Time Function
937    ///
938    /// - **Timestamp Function (`timestamp()`)**: In PromQL, the `timestamp()` function returns the
939    ///   timestamp (time index) of each sample as the value column.
940    ///
941    /// - **Time Function (`time()`)**: The `time()` function returns the evaluation time of the query
942    ///   as a scalar value.
943    ///
944    /// # Side Effects
945    /// Updates the planner context's field columns to the timestamp column name.
946    ///
947    fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
948        let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
949            .alias(DEFAULT_FIELD_COLUMN);
950        self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
951        let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
952        project_exprs.push(self.create_time_index_column_expr()?);
953        project_exprs.push(time_expr);
954        project_exprs.extend(self.create_tag_column_exprs()?);
955
956        LogicalPlanBuilder::from(normalize)
957            .project(project_exprs)
958            .context(DataFusionPlanningSnafu)?
959            .build()
960            .context(DataFusionPlanningSnafu)
961    }
962
963    async fn prom_matrix_selector_to_plan(
964        &mut self,
965        matrix_selector: &MatrixSelector,
966    ) -> Result<LogicalPlan> {
967        let MatrixSelector { vs, range } = matrix_selector;
968        let VectorSelector {
969            name,
970            offset,
971            matchers,
972            ..
973        } = vs;
974        let matchers = self.preprocess_label_matchers(matchers, name)?;
975        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
976        let range_ms = range.as_millis() as _;
977        self.ctx.range = Some(range_ms);
978
979        // Some functions like rate may require special fields in the RangeManipulate plan
980        // so we can't skip RangeManipulate.
981        let normalize = match self.setup_context().await? {
982            Some(empty_plan) => empty_plan,
983            None => {
984                self.selector_to_series_normalize_plan(offset, matchers, true)
985                    .await?
986            }
987        };
988        let manipulate = RangeManipulate::new(
989            self.ctx.start,
990            self.ctx.end,
991            self.ctx.interval,
992            // TODO(ruihang): convert via Timestamp datatypes to support different time units
993            range_ms,
994            self.ctx
995                .time_index_column
996                .clone()
997                .expect("time index should be set in `setup_context`"),
998            self.ctx.field_columns.clone(),
999            normalize,
1000        )
1001        .context(DataFusionPlanningSnafu)?;
1002
1003        Ok(LogicalPlan::Extension(Extension {
1004            node: Arc::new(manipulate),
1005        }))
1006    }
1007
1008    async fn prom_call_expr_to_plan(
1009        &mut self,
1010        query_engine_state: &QueryEngineState,
1011        call_expr: &Call,
1012    ) -> Result<LogicalPlan> {
1013        let Call { func, args } = call_expr;
1014        // some special functions that are not expression but a plan
1015        match func.name {
1016            SPECIAL_HISTOGRAM_QUANTILE => {
1017                return self.create_histogram_plan(args, query_engine_state).await;
1018            }
1019            SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
1020            SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
1021            SPECIAL_ABSENT_FUNCTION => {
1022                return self.create_absent_plan(args, query_engine_state).await;
1023            }
1024            _ => {}
1025        }
1026
1027        // transform function arguments
1028        let args = self.create_function_args(&args.args)?;
1029        let input = if let Some(prom_expr) = &args.input {
1030            self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
1031                .await?
1032        } else {
1033            self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1034            self.ctx.reset_table_name_and_schema();
1035            self.ctx.tag_columns = vec![];
1036            self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1037            LogicalPlan::Extension(Extension {
1038                node: Arc::new(
1039                    EmptyMetric::new(
1040                        self.ctx.start,
1041                        self.ctx.end,
1042                        self.ctx.interval,
1043                        SPECIAL_TIME_FUNCTION.to_string(),
1044                        DEFAULT_FIELD_COLUMN.to_string(),
1045                        None,
1046                    )
1047                    .context(DataFusionPlanningSnafu)?,
1048                ),
1049            })
1050        };
1051        let (mut func_exprs, new_tags) =
1052            self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
1053        func_exprs.insert(0, self.create_time_index_column_expr()?);
1054        func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
1055
1056        let builder = LogicalPlanBuilder::from(input)
1057            .project(func_exprs)
1058            .context(DataFusionPlanningSnafu)?
1059            .filter(self.create_empty_values_filter_expr()?)
1060            .context(DataFusionPlanningSnafu)?;
1061
1062        let builder = match func.name {
1063            "sort" => builder
1064                .sort(self.create_field_columns_sort_exprs(true))
1065                .context(DataFusionPlanningSnafu)?,
1066            "sort_desc" => builder
1067                .sort(self.create_field_columns_sort_exprs(false))
1068                .context(DataFusionPlanningSnafu)?,
1069            "sort_by_label" => builder
1070                .sort(Self::create_sort_exprs_by_tags(
1071                    func.name,
1072                    args.literals,
1073                    true,
1074                )?)
1075                .context(DataFusionPlanningSnafu)?,
1076            "sort_by_label_desc" => builder
1077                .sort(Self::create_sort_exprs_by_tags(
1078                    func.name,
1079                    args.literals,
1080                    false,
1081                )?)
1082                .context(DataFusionPlanningSnafu)?,
1083
1084            _ => builder,
1085        };
1086
1087        // Update context tags after building plan
1088        // We can't push them before planning, because they won't exist until projection.
1089        for tag in new_tags {
1090            self.ctx.tag_columns.push(tag);
1091        }
1092
1093        let plan = builder.build().context(DataFusionPlanningSnafu)?;
1094        common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
1095
1096        Ok(plan)
1097    }
1098
1099    async fn prom_ext_expr_to_plan(
1100        &mut self,
1101        query_engine_state: &QueryEngineState,
1102        ext_expr: &promql_parser::parser::ast::Extension,
1103    ) -> Result<LogicalPlan> {
1104        // let promql_parser::parser::ast::Extension { expr } = ext_expr;
1105        let expr = &ext_expr.expr;
1106        let children = expr.children();
1107        let plan = self
1108            .prom_expr_to_plan(&children[0], query_engine_state)
1109            .await?;
1110        // Wrapper for the explanation/analyze of the existing plan
1111        // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
1112        // if `analyze` is true, runs the actual plan and produces
1113        // information about metrics during run.
1114        // if `verbose` is true, prints out additional details when VERBOSE keyword is specified
1115        match expr.name() {
1116            ANALYZE_NODE_NAME => LogicalPlanBuilder::from(plan)
1117                .explain(false, true)
1118                .unwrap()
1119                .build()
1120                .context(DataFusionPlanningSnafu),
1121            ANALYZE_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1122                .explain(true, true)
1123                .unwrap()
1124                .build()
1125                .context(DataFusionPlanningSnafu),
1126            EXPLAIN_NODE_NAME => LogicalPlanBuilder::from(plan)
1127                .explain(false, false)
1128                .unwrap()
1129                .build()
1130                .context(DataFusionPlanningSnafu),
1131            EXPLAIN_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1132                .explain(true, false)
1133                .unwrap()
1134                .build()
1135                .context(DataFusionPlanningSnafu),
1136            ALIAS_NODE_NAME => {
1137                let alias = expr
1138                    .as_any()
1139                    .downcast_ref::<AliasExpr>()
1140                    .context(UnexpectedPlanExprSnafu {
1141                        desc: "Expected AliasExpr",
1142                    })?
1143                    .alias
1144                    .clone();
1145                self.apply_alias(plan, alias)
1146            }
1147            _ => LogicalPlanBuilder::empty(true)
1148                .build()
1149                .context(DataFusionPlanningSnafu),
1150        }
1151    }
1152
1153    /// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].
1154    /// Returns a new [Matchers] that doesn't contain metric name matcher.
1155    ///
1156    /// Each call to this function means new selector is started. Thus, the context will be reset
1157    /// at first.
1158    ///
1159    /// Name rule:
1160    /// - if `name` is some, then the matchers MUST NOT contain `__name__` matcher.
1161    /// - if `name` is none, then the matchers MAY contain NONE OR MULTIPLE `__name__` matchers.
1162    #[allow(clippy::mutable_key_type)]
1163    fn preprocess_label_matchers(
1164        &mut self,
1165        label_matchers: &Matchers,
1166        name: &Option<String>,
1167    ) -> Result<Matchers> {
1168        self.ctx.reset();
1169
1170        let metric_name;
1171        if let Some(name) = name.clone() {
1172            metric_name = Some(name);
1173            ensure!(
1174                label_matchers.find_matchers(METRIC_NAME).is_empty(),
1175                MultipleMetricMatchersSnafu
1176            );
1177        } else {
1178            let mut matches = label_matchers.find_matchers(METRIC_NAME);
1179            ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1180            ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1181            ensure!(
1182                matches[0].op == MatchOp::Equal,
1183                UnsupportedMatcherOpSnafu {
1184                    matcher_op: matches[0].op.to_string(),
1185                    matcher: METRIC_NAME
1186                }
1187            );
1188            metric_name = matches.pop().map(|m| m.value);
1189        }
1190
1191        self.ctx.table_name = metric_name;
1192
1193        let mut matchers = HashSet::new();
1194        for matcher in &label_matchers.matchers {
1195            // TODO(ruihang): support other metric match ops
1196            if matcher.name == FIELD_COLUMN_MATCHER {
1197                self.ctx
1198                    .field_column_matcher
1199                    .get_or_insert_default()
1200                    .push(matcher.clone());
1201            } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1202                ensure!(
1203                    matcher.op == MatchOp::Equal,
1204                    UnsupportedMatcherOpSnafu {
1205                        matcher: matcher.name.clone(),
1206                        matcher_op: matcher.op.to_string(),
1207                    }
1208                );
1209                self.ctx.schema_name = Some(matcher.value.clone());
1210            } else if matcher.name != METRIC_NAME {
1211                self.ctx.selector_matcher.push(matcher.clone());
1212                let _ = matchers.insert(matcher.clone());
1213            }
1214        }
1215
1216        Ok(Matchers::new(matchers.into_iter().collect()))
1217    }
1218
1219    async fn selector_to_series_normalize_plan(
1220        &mut self,
1221        offset: &Option<Offset>,
1222        label_matchers: Matchers,
1223        is_range_selector: bool,
1224    ) -> Result<LogicalPlan> {
1225        // make table scan plan
1226        let table_ref = self.table_ref()?;
1227        let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1228        let table_schema = table_scan.schema();
1229
1230        // make filter exprs
1231        let offset_duration = match offset {
1232            Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1233            Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1234            None => 0,
1235        };
1236        let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1237        if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1238            scan_filters.push(time_index_filter);
1239        }
1240        table_scan = LogicalPlanBuilder::from(table_scan)
1241            .filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
1242            .context(DataFusionPlanningSnafu)?
1243            .build()
1244            .context(DataFusionPlanningSnafu)?;
1245
1246        // make a projection plan if there is any `__field__` matcher
1247        if let Some(field_matchers) = &self.ctx.field_column_matcher {
1248            let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1249            // opt-in set
1250            let mut result_set = HashSet::new();
1251            // opt-out set
1252            let mut reverse_set = HashSet::new();
1253            for matcher in field_matchers {
1254                match &matcher.op {
1255                    MatchOp::Equal => {
1256                        if col_set.contains(&matcher.value) {
1257                            let _ = result_set.insert(matcher.value.clone());
1258                        } else {
1259                            return Err(ColumnNotFoundSnafu {
1260                                col: matcher.value.clone(),
1261                            }
1262                            .build());
1263                        }
1264                    }
1265                    MatchOp::NotEqual => {
1266                        if col_set.contains(&matcher.value) {
1267                            let _ = reverse_set.insert(matcher.value.clone());
1268                        } else {
1269                            return Err(ColumnNotFoundSnafu {
1270                                col: matcher.value.clone(),
1271                            }
1272                            .build());
1273                        }
1274                    }
1275                    MatchOp::Re(regex) => {
1276                        for col in &self.ctx.field_columns {
1277                            if regex.is_match(col) {
1278                                let _ = result_set.insert(col.clone());
1279                            }
1280                        }
1281                    }
1282                    MatchOp::NotRe(regex) => {
1283                        for col in &self.ctx.field_columns {
1284                            if regex.is_match(col) {
1285                                let _ = reverse_set.insert(col.clone());
1286                            }
1287                        }
1288                    }
1289                }
1290            }
1291            // merge two set
1292            if result_set.is_empty() {
1293                result_set = col_set.into_iter().cloned().collect();
1294            }
1295            for col in reverse_set {
1296                let _ = result_set.remove(&col);
1297            }
1298
1299            // mask the field columns in context using computed result set
1300            self.ctx.field_columns = self
1301                .ctx
1302                .field_columns
1303                .drain(..)
1304                .filter(|col| result_set.contains(col))
1305                .collect();
1306
1307            let exprs = result_set
1308                .into_iter()
1309                .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1310                .chain(self.create_tag_column_exprs()?)
1311                .chain(
1312                    self.ctx
1313                        .use_tsid
1314                        .then_some(DfExpr::Column(Column::new_unqualified(
1315                            DATA_SCHEMA_TSID_COLUMN_NAME,
1316                        )))
1317                        .into_iter(),
1318                )
1319                .chain(Some(self.create_time_index_column_expr()?))
1320                .collect::<Vec<_>>();
1321
1322            // reuse this variable for simplicity
1323            table_scan = LogicalPlanBuilder::from(table_scan)
1324                .project(exprs)
1325                .context(DataFusionPlanningSnafu)?
1326                .build()
1327                .context(DataFusionPlanningSnafu)?;
1328        }
1329
1330        // make sort plan
1331        let series_key_columns = if self.ctx.use_tsid {
1332            vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
1333        } else {
1334            self.ctx.tag_columns.clone()
1335        };
1336
1337        let sort_exprs = if self.ctx.use_tsid {
1338            vec![
1339                DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true),
1340                self.create_time_index_column_expr()?.sort(true, true),
1341            ]
1342        } else {
1343            self.create_tag_and_time_index_column_sort_exprs()?
1344        };
1345
1346        let sort_plan = LogicalPlanBuilder::from(table_scan)
1347            .sort(sort_exprs)
1348            .context(DataFusionPlanningSnafu)?
1349            .build()
1350            .context(DataFusionPlanningSnafu)?;
1351
1352        // make divide plan
1353        let time_index_column =
1354            self.ctx
1355                .time_index_column
1356                .clone()
1357                .with_context(|| TimeIndexNotFoundSnafu {
1358                    table: table_ref.to_string(),
1359                })?;
1360        let divide_plan = LogicalPlan::Extension(Extension {
1361            node: Arc::new(SeriesDivide::new(
1362                series_key_columns.clone(),
1363                time_index_column,
1364                sort_plan,
1365            )),
1366        });
1367
1368        // make series_normalize plan
1369        if !is_range_selector && offset_duration == 0 {
1370            return Ok(divide_plan);
1371        }
1372        let series_normalize = SeriesNormalize::new(
1373            offset_duration,
1374            self.ctx
1375                .time_index_column
1376                .clone()
1377                .with_context(|| TimeIndexNotFoundSnafu {
1378                    table: table_ref.to_quoted_string(),
1379                })?,
1380            is_range_selector,
1381            series_key_columns,
1382            divide_plan,
1383        );
1384        let logical_plan = LogicalPlan::Extension(Extension {
1385            node: Arc::new(series_normalize),
1386        });
1387
1388        Ok(logical_plan)
1389    }
1390
1391    /// Convert [LabelModifier] to [Column] exprs for aggregation.
1392    /// Timestamp column and tag columns will be included.
1393    ///
1394    /// # Side effect
1395    ///
1396    /// This method will also change the tag columns in ctx if `update_ctx` is true.
1397    fn agg_modifier_to_col(
1398        &mut self,
1399        input_schema: &DFSchemaRef,
1400        modifier: &Option<LabelModifier>,
1401        update_ctx: bool,
1402    ) -> Result<Vec<DfExpr>> {
1403        match modifier {
1404            None => {
1405                if update_ctx {
1406                    self.ctx.tag_columns.clear();
1407                }
1408                Ok(vec![self.create_time_index_column_expr()?])
1409            }
1410            Some(LabelModifier::Include(labels)) => {
1411                if update_ctx {
1412                    self.ctx.tag_columns.clear();
1413                }
1414                let mut exprs = Vec::with_capacity(labels.labels.len());
1415                for label in &labels.labels {
1416                    if is_metric_engine_internal_column(label) {
1417                        continue;
1418                    }
1419                    // nonexistence label will be ignored
1420                    if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label)
1421                    {
1422                        exprs.push(DfExpr::Column(Column::from_name(column_name.clone())));
1423
1424                        if update_ctx {
1425                            // update the tag columns in context
1426                            self.ctx.tag_columns.push(column_name);
1427                        }
1428                    }
1429                }
1430                // add timestamp column
1431                exprs.push(self.create_time_index_column_expr()?);
1432
1433                Ok(exprs)
1434            }
1435            Some(LabelModifier::Exclude(labels)) => {
1436                let mut all_fields = input_schema
1437                    .fields()
1438                    .iter()
1439                    .map(|f| f.name())
1440                    .collect::<BTreeSet<_>>();
1441
1442                // Exclude metric engine internal columns (not PromQL labels) from the implicit
1443                // "without" label set.
1444                all_fields.retain(|col| !is_metric_engine_internal_column(col.as_str()));
1445
1446                // remove "without"-ed fields
1447                // nonexistence label will be ignored
1448                for label in &labels.labels {
1449                    let _ = all_fields.remove(label);
1450                }
1451
1452                // remove time index and value fields
1453                if let Some(time_index) = &self.ctx.time_index_column {
1454                    let _ = all_fields.remove(time_index);
1455                }
1456                for value in &self.ctx.field_columns {
1457                    let _ = all_fields.remove(value);
1458                }
1459
1460                if update_ctx {
1461                    // change the tag columns in context
1462                    self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1463                }
1464
1465                // collect remaining fields and convert to col expr
1466                let mut exprs = all_fields
1467                    .into_iter()
1468                    .map(|c| DfExpr::Column(Column::from(c)))
1469                    .collect::<Vec<_>>();
1470
1471                // add timestamp column
1472                exprs.push(self.create_time_index_column_expr()?);
1473
1474                Ok(exprs)
1475            }
1476        }
1477    }
1478
1479    // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
1480    pub fn matchers_to_expr(
1481        label_matchers: Matchers,
1482        table_schema: &DFSchemaRef,
1483    ) -> Result<Vec<DfExpr>> {
1484        let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1485        for matcher in label_matchers.matchers {
1486            if matcher.name == SCHEMA_COLUMN_MATCHER
1487                || matcher.name == DB_COLUMN_MATCHER
1488                || matcher.name == FIELD_COLUMN_MATCHER
1489            {
1490                continue;
1491            }
1492
1493            let column_name = Self::find_case_sensitive_column(table_schema, matcher.name.as_str());
1494            let col = if let Some(column_name) = column_name {
1495                DfExpr::Column(Column::from_name(column_name))
1496            } else {
1497                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
1498                    .alias(matcher.name.clone())
1499            };
1500            let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1501            let expr = match matcher.op {
1502                MatchOp::Equal => col.eq(lit),
1503                MatchOp::NotEqual => col.not_eq(lit),
1504                MatchOp::Re(re) => {
1505                    // TODO(ruihang): a more programmatic way to handle this in datafusion
1506
1507                    // This is a hack to handle `.+` and `.*`, and is not strictly correct
1508                    // `.` doesn't match newline (`\n`). Given this is in PromQL context,
1509                    // most of the time it's fine.
1510                    if re.as_str() == "^(?:.*)$" {
1511                        continue;
1512                    }
1513                    if re.as_str() == "^(?:.+)$" {
1514                        col.not_eq(DfExpr::Literal(
1515                            ScalarValue::Utf8(Some(String::new())),
1516                            None,
1517                        ))
1518                    } else {
1519                        DfExpr::BinaryExpr(BinaryExpr {
1520                            left: Box::new(col),
1521                            op: Operator::RegexMatch,
1522                            right: Box::new(DfExpr::Literal(
1523                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1524                                None,
1525                            )),
1526                        })
1527                    }
1528                }
1529                MatchOp::NotRe(re) => {
1530                    if re.as_str() == "^(?:.*)$" {
1531                        DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1532                    } else if re.as_str() == "^(?:.+)$" {
1533                        col.eq(DfExpr::Literal(
1534                            ScalarValue::Utf8(Some(String::new())),
1535                            None,
1536                        ))
1537                    } else {
1538                        DfExpr::BinaryExpr(BinaryExpr {
1539                            left: Box::new(col),
1540                            op: Operator::RegexNotMatch,
1541                            right: Box::new(DfExpr::Literal(
1542                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1543                                None,
1544                            )),
1545                        })
1546                    }
1547                }
1548            };
1549            exprs.push(expr);
1550        }
1551
1552        Ok(exprs)
1553    }
1554
1555    fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option<String> {
1556        if is_metric_engine_internal_column(column) {
1557            return None;
1558        }
1559        schema
1560            .fields()
1561            .iter()
1562            .find(|field| field.name() == column)
1563            .map(|field| field.name().clone())
1564    }
1565
1566    fn table_from_source(&self, source: &Arc<dyn TableSource>) -> Result<table::TableRef> {
1567        Ok(source
1568            .as_any()
1569            .downcast_ref::<DefaultTableSource>()
1570            .context(UnknownTableSnafu)?
1571            .table_provider
1572            .as_any()
1573            .downcast_ref::<DfTableProviderAdapter>()
1574            .context(UnknownTableSnafu)?
1575            .table())
1576    }
1577
1578    fn table_ref(&self) -> Result<TableReference> {
1579        let table_name = self
1580            .ctx
1581            .table_name
1582            .clone()
1583            .context(TableNameNotFoundSnafu)?;
1584
1585        // set schema name if `__schema__` is given
1586        let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1587            TableReference::partial(schema_name.as_str(), table_name.as_str())
1588        } else {
1589            TableReference::bare(table_name.as_str())
1590        };
1591
1592        Ok(table_ref)
1593    }
1594
1595    fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1596        let start = self.ctx.start;
1597        let end = self.ctx.end;
1598        if end < start {
1599            return InvalidTimeRangeSnafu { start, end }.fail();
1600        }
1601        let lookback_delta = self.ctx.lookback_delta;
1602        let range = self.ctx.range.unwrap_or_default();
1603        let interval = self.ctx.interval;
1604        let time_index_expr = self.create_time_index_column_expr()?;
1605        let num_points = (end - start) / interval;
1606
1607        // Prometheus semantics:
1608        // - Instant selector lookback: (eval_ts - lookback_delta, eval_ts]
1609        // - Range selector:           (eval_ts - range, eval_ts]
1610        //
1611        // So samples positioned exactly at the lower boundary must be excluded. We align the scan
1612        // lower bound with Prometheus by shifting it forward by 1ms (millisecond granularity),
1613        // while still using a `>=` filter.
1614        let selector_window = if range == 0 { lookback_delta } else { range };
1615        let lower_exclusive_adjustment = if selector_window > 0 { 1 } else { 0 };
1616
1617        // Scan a continuous time range
1618        if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1619            let single_time_range = time_index_expr
1620                .clone()
1621                .gt_eq(DfExpr::Literal(
1622                    ScalarValue::TimestampMillisecond(
1623                        Some(
1624                            self.ctx.start - offset_duration - selector_window
1625                                + lower_exclusive_adjustment,
1626                        ),
1627                        None,
1628                    ),
1629                    None,
1630                ))
1631                .and(time_index_expr.lt_eq(DfExpr::Literal(
1632                    ScalarValue::TimestampMillisecond(Some(self.ctx.end - offset_duration), None),
1633                    None,
1634                )));
1635            return Ok(Some(single_time_range));
1636        }
1637
1638        // Otherwise scan scatter ranges separately
1639        let mut filters = Vec::with_capacity(num_points as usize + 1);
1640        for timestamp in (start..=end).step_by(interval as usize) {
1641            filters.push(
1642                time_index_expr
1643                    .clone()
1644                    .gt_eq(DfExpr::Literal(
1645                        ScalarValue::TimestampMillisecond(
1646                            Some(
1647                                timestamp - offset_duration - selector_window
1648                                    + lower_exclusive_adjustment,
1649                            ),
1650                            None,
1651                        ),
1652                        None,
1653                    ))
1654                    .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1655                        ScalarValue::TimestampMillisecond(Some(timestamp - offset_duration), None),
1656                        None,
1657                    ))),
1658            )
1659        }
1660
1661        Ok(filters.into_iter().reduce(DfExpr::or))
1662    }
1663
1664    /// Create a table scan plan and a filter plan with given filter.
1665    ///
1666    /// # Panic
1667    /// If the filter is empty
1668    async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1669        let provider = self
1670            .table_provider
1671            .resolve_table(table_ref.clone())
1672            .await
1673            .context(CatalogSnafu)?;
1674
1675        let logical_table = self.table_from_source(&provider)?;
1676
1677        // Try to rewrite the table scan to physical table scan if possible.
1678        let mut maybe_phy_table_ref = table_ref.clone();
1679        let mut scan_provider = provider;
1680        let mut table_id_filter: Option<u32> = None;
1681
1682        // If it's a metric engine logical table, scan its physical table directly and filter by
1683        // `__table_id = logical_table_id` to get access to internal columns like `__tsid`.
1684        if logical_table.table_info().meta.engine == METRIC_ENGINE_NAME
1685            && let Some(physical_table_name) = logical_table
1686                .table_info()
1687                .meta
1688                .options
1689                .extra_options
1690                .get(LOGICAL_TABLE_METADATA_KEY)
1691        {
1692            let physical_table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1693                TableReference::partial(schema_name.as_str(), physical_table_name.as_str())
1694            } else {
1695                TableReference::bare(physical_table_name.as_str())
1696            };
1697
1698            let physical_provider = match self
1699                .table_provider
1700                .resolve_table(physical_table_ref.clone())
1701                .await
1702            {
1703                Ok(provider) => provider,
1704                Err(e) if e.status_code() == StatusCode::TableNotFound => {
1705                    // Fall back to scanning the logical table. It still works, but without
1706                    // `__tsid` optimization.
1707                    scan_provider.clone()
1708                }
1709                Err(e) => return Err(e).context(CatalogSnafu),
1710            };
1711
1712            if !Arc::ptr_eq(&physical_provider, &scan_provider) {
1713                // Only rewrite when internal columns exist in physical schema.
1714                let physical_table = self.table_from_source(&physical_provider)?;
1715
1716                let has_table_id = physical_table
1717                    .schema()
1718                    .column_schema_by_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
1719                    .is_some();
1720                let has_tsid = physical_table
1721                    .schema()
1722                    .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1723                    .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1724
1725                if has_table_id && has_tsid {
1726                    scan_provider = physical_provider;
1727                    maybe_phy_table_ref = physical_table_ref;
1728                    table_id_filter = Some(logical_table.table_info().ident.table_id);
1729                }
1730            }
1731        }
1732
1733        let scan_table = self.table_from_source(&scan_provider)?;
1734
1735        let use_tsid = table_id_filter.is_some()
1736            && scan_table
1737                .schema()
1738                .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1739                .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1740        self.ctx.use_tsid = use_tsid;
1741
1742        let all_table_tags = self.ctx.tag_columns.clone();
1743
1744        let scan_tag_columns = if use_tsid {
1745            let mut scan_tags = self.ctx.tag_columns.clone();
1746            for matcher in &self.ctx.selector_matcher {
1747                if is_metric_engine_internal_column(&matcher.name) {
1748                    continue;
1749                }
1750                if all_table_tags.iter().any(|tag| tag == &matcher.name) {
1751                    scan_tags.push(matcher.name.clone());
1752                }
1753            }
1754            scan_tags.sort_unstable();
1755            scan_tags.dedup();
1756            scan_tags
1757        } else {
1758            self.ctx.tag_columns.clone()
1759        };
1760
1761        let is_time_index_ms = scan_table
1762            .schema()
1763            .timestamp_column()
1764            .with_context(|| TimeIndexNotFoundSnafu {
1765                table: maybe_phy_table_ref.to_quoted_string(),
1766            })?
1767            .data_type
1768            == ConcreteDataType::timestamp_millisecond_datatype();
1769
1770        let scan_projection = if table_id_filter.is_some() {
1771            let mut required_columns = HashSet::new();
1772            required_columns.insert(DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string());
1773            required_columns.insert(self.ctx.time_index_column.clone().with_context(|| {
1774                TimeIndexNotFoundSnafu {
1775                    table: maybe_phy_table_ref.to_quoted_string(),
1776                }
1777            })?);
1778            for col in &scan_tag_columns {
1779                required_columns.insert(col.clone());
1780            }
1781            for col in &self.ctx.field_columns {
1782                required_columns.insert(col.clone());
1783            }
1784            if use_tsid {
1785                required_columns.insert(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
1786            }
1787
1788            let arrow_schema = scan_table.schema().arrow_schema().clone();
1789            Some(
1790                arrow_schema
1791                    .fields()
1792                    .iter()
1793                    .enumerate()
1794                    .filter(|(_, field)| required_columns.contains(field.name().as_str()))
1795                    .map(|(idx, _)| idx)
1796                    .collect::<Vec<_>>(),
1797            )
1798        } else {
1799            None
1800        };
1801
1802        let mut scan_plan =
1803            LogicalPlanBuilder::scan(maybe_phy_table_ref.clone(), scan_provider, scan_projection)
1804                .context(DataFusionPlanningSnafu)?
1805                .build()
1806                .context(DataFusionPlanningSnafu)?;
1807
1808        if let Some(table_id) = table_id_filter {
1809            scan_plan = LogicalPlanBuilder::from(scan_plan)
1810                .filter(
1811                    DfExpr::Column(Column::from_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME))
1812                        .eq(lit(table_id)),
1813                )
1814                .context(DataFusionPlanningSnafu)?
1815                .alias(table_ref.clone()) // rename the relation back to logical table's name after filtering
1816                .context(DataFusionPlanningSnafu)?
1817                .build()
1818                .context(DataFusionPlanningSnafu)?;
1819        }
1820
1821        if !is_time_index_ms {
1822            // cast to ms if time_index not in Millisecond precision
1823            let expr: Vec<_> = self
1824                .create_field_column_exprs()?
1825                .into_iter()
1826                .chain(
1827                    scan_tag_columns
1828                        .iter()
1829                        .map(|tag| DfExpr::Column(Column::from_name(tag))),
1830                )
1831                .chain(
1832                    self.ctx
1833                        .use_tsid
1834                        .then_some(DfExpr::Column(Column::new(
1835                            Some(table_ref.clone()),
1836                            DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
1837                        )))
1838                        .into_iter(),
1839                )
1840                .chain(Some(DfExpr::Alias(Alias {
1841                    expr: Box::new(DfExpr::Cast(Cast {
1842                        expr: Box::new(self.create_time_index_column_expr()?),
1843                        data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1844                    })),
1845                    relation: Some(table_ref.clone()),
1846                    name: self
1847                        .ctx
1848                        .time_index_column
1849                        .as_ref()
1850                        .with_context(|| TimeIndexNotFoundSnafu {
1851                            table: table_ref.to_quoted_string(),
1852                        })?
1853                        .clone(),
1854                    metadata: None,
1855                })))
1856                .collect::<Vec<_>>();
1857            scan_plan = LogicalPlanBuilder::from(scan_plan)
1858                .project(expr)
1859                .context(DataFusionPlanningSnafu)?
1860                .build()
1861                .context(DataFusionPlanningSnafu)?;
1862        } else if table_id_filter.is_some() {
1863            // Drop the internal `__table_id` column after filtering.
1864            let project_exprs = self
1865                .create_field_column_exprs()?
1866                .into_iter()
1867                .chain(
1868                    scan_tag_columns
1869                        .iter()
1870                        .map(|tag| DfExpr::Column(Column::from_name(tag))),
1871                )
1872                .chain(
1873                    self.ctx
1874                        .use_tsid
1875                        .then_some(DfExpr::Column(Column::from_name(
1876                            DATA_SCHEMA_TSID_COLUMN_NAME,
1877                        )))
1878                        .into_iter(),
1879                )
1880                .chain(Some(self.create_time_index_column_expr()?))
1881                .collect::<Vec<_>>();
1882
1883            scan_plan = LogicalPlanBuilder::from(scan_plan)
1884                .project(project_exprs)
1885                .context(DataFusionPlanningSnafu)?
1886                .build()
1887                .context(DataFusionPlanningSnafu)?;
1888        }
1889
1890        let result = LogicalPlanBuilder::from(scan_plan)
1891            .build()
1892            .context(DataFusionPlanningSnafu)?;
1893        Ok(result)
1894    }
1895
1896    fn collect_row_key_tag_columns_from_plan(
1897        &self,
1898        plan: &LogicalPlan,
1899    ) -> Result<BTreeSet<String>> {
1900        fn walk(
1901            planner: &PromPlanner,
1902            plan: &LogicalPlan,
1903            out: &mut BTreeSet<String>,
1904        ) -> Result<()> {
1905            if let LogicalPlan::TableScan(scan) = plan {
1906                let table = planner.table_from_source(&scan.source)?;
1907                for col in table.table_info().meta.row_key_column_names() {
1908                    if col != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
1909                        && col != DATA_SCHEMA_TSID_COLUMN_NAME
1910                        && !is_metric_engine_internal_column(col)
1911                    {
1912                        out.insert(col.clone());
1913                    }
1914                }
1915            }
1916
1917            for input in plan.inputs() {
1918                walk(planner, input, out)?;
1919            }
1920            Ok(())
1921        }
1922
1923        let mut out = BTreeSet::new();
1924        walk(self, plan, &mut out)?;
1925        Ok(out)
1926    }
1927
1928    fn ensure_tag_columns_available(
1929        &self,
1930        plan: LogicalPlan,
1931        required_tags: &BTreeSet<String>,
1932    ) -> Result<LogicalPlan> {
1933        if required_tags.is_empty() {
1934            return Ok(plan);
1935        }
1936
1937        struct Rewriter {
1938            required_tags: BTreeSet<String>,
1939        }
1940
1941        impl TreeNodeRewriter for Rewriter {
1942            type Node = LogicalPlan;
1943
1944            fn f_up(
1945                &mut self,
1946                node: Self::Node,
1947            ) -> datafusion_common::Result<Transformed<Self::Node>> {
1948                match node {
1949                    LogicalPlan::TableScan(scan) => {
1950                        let schema = scan.source.schema();
1951                        let mut projection = match scan.projection.clone() {
1952                            Some(p) => p,
1953                            None => {
1954                                // Scanning all columns already covers required tags.
1955                                return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1956                            }
1957                        };
1958
1959                        let mut changed = false;
1960                        for tag in &self.required_tags {
1961                            if let Some((idx, _)) = schema
1962                                .fields()
1963                                .iter()
1964                                .enumerate()
1965                                .find(|(_, field)| field.name() == tag)
1966                                && !projection.contains(&idx)
1967                            {
1968                                projection.push(idx);
1969                                changed = true;
1970                            }
1971                        }
1972
1973                        if !changed {
1974                            return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1975                        }
1976
1977                        projection.sort_unstable();
1978                        projection.dedup();
1979
1980                        let new_scan = TableScan::try_new(
1981                            scan.table_name.clone(),
1982                            scan.source.clone(),
1983                            Some(projection),
1984                            scan.filters,
1985                            scan.fetch,
1986                        )?;
1987                        Ok(Transformed::yes(LogicalPlan::TableScan(new_scan)))
1988                    }
1989                    LogicalPlan::Projection(proj) => {
1990                        let input_schema = proj.input.schema();
1991
1992                        let existing = proj
1993                            .schema
1994                            .fields()
1995                            .iter()
1996                            .map(|f| f.name().as_str())
1997                            .collect::<HashSet<_>>();
1998
1999                        let mut expr = proj.expr.clone();
2000                        let mut has_changed = false;
2001                        for tag in &self.required_tags {
2002                            if existing.contains(tag.as_str()) {
2003                                continue;
2004                            }
2005
2006                            if let Some(idx) = input_schema.index_of_column_by_name(None, tag) {
2007                                expr.push(DfExpr::Column(Column::from(
2008                                    input_schema.qualified_field(idx),
2009                                )));
2010                                has_changed = true;
2011                            }
2012                        }
2013
2014                        if !has_changed {
2015                            return Ok(Transformed::no(LogicalPlan::Projection(proj)));
2016                        }
2017
2018                        let new_proj = Projection::try_new(expr, proj.input)?;
2019                        Ok(Transformed::yes(LogicalPlan::Projection(new_proj)))
2020                    }
2021                    other => Ok(Transformed::no(other)),
2022                }
2023            }
2024        }
2025
2026        let mut rewriter = Rewriter {
2027            required_tags: required_tags.clone(),
2028        };
2029        let rewritten = plan
2030            .rewrite(&mut rewriter)
2031            .context(DataFusionPlanningSnafu)?;
2032        Ok(rewritten.data)
2033    }
2034
2035    fn refresh_tag_columns_from_schema(&mut self, schema: &DFSchemaRef) {
2036        let time_index = self.ctx.time_index_column.as_deref();
2037        let field_columns = self.ctx.field_columns.iter().collect::<HashSet<_>>();
2038
2039        let mut tags = schema
2040            .fields()
2041            .iter()
2042            .map(|f| f.name())
2043            .filter(|name| Some(name.as_str()) != time_index)
2044            .filter(|name| !field_columns.contains(name))
2045            .filter(|name| !is_metric_engine_internal_column(name))
2046            .cloned()
2047            .collect::<Vec<_>>();
2048        tags.sort_unstable();
2049        tags.dedup();
2050        self.ctx.tag_columns = tags;
2051    }
2052
2053    /// Setup [PromPlannerContext]'s state fields.
2054    ///
2055    /// Returns a logical plan for an empty metric.
2056    async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
2057        let table_ref = self.table_ref()?;
2058        let source = match self.table_provider.resolve_table(table_ref.clone()).await {
2059            Err(e) if e.status_code() == StatusCode::TableNotFound => {
2060                let plan = self.setup_context_for_empty_metric()?;
2061                return Ok(Some(plan));
2062            }
2063            res => res.context(CatalogSnafu)?,
2064        };
2065        let table = self.table_from_source(&source)?;
2066
2067        // set time index column name
2068        let time_index = table
2069            .schema()
2070            .timestamp_column()
2071            .with_context(|| TimeIndexNotFoundSnafu {
2072                table: table_ref.to_quoted_string(),
2073            })?
2074            .name
2075            .clone();
2076        self.ctx.time_index_column = Some(time_index);
2077
2078        // set values columns
2079        let values = table
2080            .table_info()
2081            .meta
2082            .field_column_names()
2083            .cloned()
2084            .collect();
2085        self.ctx.field_columns = values;
2086
2087        // set primary key (tag) columns
2088        let tags = table
2089            .table_info()
2090            .meta
2091            .row_key_column_names()
2092            .filter(|col| {
2093                // remove metric engine's internal columns
2094                col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
2095            })
2096            .cloned()
2097            .collect();
2098        self.ctx.tag_columns = tags;
2099
2100        self.ctx.use_tsid = false;
2101
2102        Ok(None)
2103    }
2104
2105    /// Setup [PromPlannerContext]'s state fields for a non existent table
2106    /// without any rows.
2107    fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
2108        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2109        self.ctx.reset_table_name_and_schema();
2110        self.ctx.tag_columns = vec![];
2111        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
2112        self.ctx.use_tsid = false;
2113
2114        // The table doesn't have any data, so we set start to 0 and end to -1.
2115        let plan = LogicalPlan::Extension(Extension {
2116            node: Arc::new(
2117                EmptyMetric::new(
2118                    0,
2119                    -1,
2120                    self.ctx.interval,
2121                    SPECIAL_TIME_FUNCTION.to_string(),
2122                    DEFAULT_FIELD_COLUMN.to_string(),
2123                    Some(lit(0.0f64)),
2124                )
2125                .context(DataFusionPlanningSnafu)?,
2126            ),
2127        });
2128        Ok(plan)
2129    }
2130
2131    // TODO(ruihang): insert column expr
2132    fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
2133        let mut result = FunctionArgs::default();
2134
2135        for arg in args {
2136            // First try to parse as literal expression (including binary expressions like 100.0 + 3.0)
2137            if let Some(expr) = Self::try_build_literal_expr(arg) {
2138                result.literals.push(expr);
2139            } else {
2140                // If not a literal, treat as vector input
2141                match arg.as_ref() {
2142                    PromExpr::Subquery(_)
2143                    | PromExpr::VectorSelector(_)
2144                    | PromExpr::MatrixSelector(_)
2145                    | PromExpr::Extension(_)
2146                    | PromExpr::Aggregate(_)
2147                    | PromExpr::Paren(_)
2148                    | PromExpr::Call(_)
2149                    | PromExpr::Binary(_)
2150                    | PromExpr::Unary(_) => {
2151                        if result.input.replace(*arg.clone()).is_some() {
2152                            MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
2153                        }
2154                    }
2155
2156                    _ => {
2157                        let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
2158                        result.literals.push(expr);
2159                    }
2160                }
2161            }
2162        }
2163
2164        Ok(result)
2165    }
2166
2167    /// Creates function expressions for projection and returns the expressions and new tags.
2168    ///
2169    /// # Side Effects
2170    ///
2171    /// This method will update [PromPlannerContext]'s fields and tags if needed.
2172    fn create_function_expr(
2173        &mut self,
2174        func: &Function,
2175        other_input_exprs: Vec<DfExpr>,
2176        query_engine_state: &QueryEngineState,
2177    ) -> Result<(Vec<DfExpr>, Vec<String>)> {
2178        // TODO(ruihang): check function args list
2179        let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
2180
2181        // TODO(ruihang): set this according to in-param list
2182        let field_column_pos = 0;
2183        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2184        // New labels after executing the function, e.g. `label_replace` etc.
2185        let mut new_tags = vec![];
2186        let scalar_func = match func.name {
2187            "increase" => ScalarFunc::ExtrapolateUdf(
2188                Arc::new(Increase::scalar_udf()),
2189                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2190            ),
2191            "rate" => ScalarFunc::ExtrapolateUdf(
2192                Arc::new(Rate::scalar_udf()),
2193                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2194            ),
2195            "delta" => ScalarFunc::ExtrapolateUdf(
2196                Arc::new(Delta::scalar_udf()),
2197                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2198            ),
2199            "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
2200            "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
2201            "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
2202            "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
2203            "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
2204            "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
2205            "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
2206            "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
2207            "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
2208            "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
2209            "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
2210            "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
2211            "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
2212            "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
2213            "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
2214            "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
2215            "predict_linear" => {
2216                other_input_exprs[0] = DfExpr::Cast(Cast {
2217                    expr: Box::new(other_input_exprs[0].clone()),
2218                    data_type: ArrowDataType::Int64,
2219                });
2220                ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
2221            }
2222            "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
2223            "time" => {
2224                exprs.push(build_special_time_expr(
2225                    self.ctx.time_index_column.as_ref().unwrap(),
2226                ));
2227                ScalarFunc::GeneratedExpr
2228            }
2229            "minute" => {
2230                // date_part('minute', time_index)
2231                let expr = self.date_part_on_time_index("minute")?;
2232                exprs.push(expr);
2233                ScalarFunc::GeneratedExpr
2234            }
2235            "hour" => {
2236                // date_part('hour', time_index)
2237                let expr = self.date_part_on_time_index("hour")?;
2238                exprs.push(expr);
2239                ScalarFunc::GeneratedExpr
2240            }
2241            "month" => {
2242                // date_part('month', time_index)
2243                let expr = self.date_part_on_time_index("month")?;
2244                exprs.push(expr);
2245                ScalarFunc::GeneratedExpr
2246            }
2247            "year" => {
2248                // date_part('year', time_index)
2249                let expr = self.date_part_on_time_index("year")?;
2250                exprs.push(expr);
2251                ScalarFunc::GeneratedExpr
2252            }
2253            "day_of_month" => {
2254                // date_part('day', time_index)
2255                let expr = self.date_part_on_time_index("day")?;
2256                exprs.push(expr);
2257                ScalarFunc::GeneratedExpr
2258            }
2259            "day_of_week" => {
2260                // date_part('dow', time_index)
2261                let expr = self.date_part_on_time_index("dow")?;
2262                exprs.push(expr);
2263                ScalarFunc::GeneratedExpr
2264            }
2265            "day_of_year" => {
2266                // date_part('doy', time_index)
2267                let expr = self.date_part_on_time_index("doy")?;
2268                exprs.push(expr);
2269                ScalarFunc::GeneratedExpr
2270            }
2271            "days_in_month" => {
2272                // date_part(
2273                //     'days',
2274                //     (date_trunc('month', <TIME INDEX>::date) + interval '1 month - 1 day')
2275                // );
2276                let day_lit_expr = "day".lit();
2277                let month_lit_expr = "month".lit();
2278                let interval_1month_lit_expr =
2279                    DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
2280                let interval_1day_lit_expr = DfExpr::Literal(
2281                    ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
2282                    None,
2283                );
2284                let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
2285                    left: Box::new(interval_1month_lit_expr),
2286                    op: Operator::Minus,
2287                    right: Box::new(interval_1day_lit_expr),
2288                });
2289                let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
2290                    func: datafusion_functions::datetime::date_trunc(),
2291                    args: vec![month_lit_expr, self.create_time_index_column_expr()?],
2292                });
2293                let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
2294                    left: Box::new(date_trunc_expr),
2295                    op: Operator::Plus,
2296                    right: Box::new(the_1month_minus_1day_expr),
2297                });
2298                let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
2299                    func: datafusion_functions::datetime::date_part(),
2300                    args: vec![day_lit_expr, date_trunc_plus_interval_expr],
2301                });
2302
2303                exprs.push(date_part_expr);
2304                ScalarFunc::GeneratedExpr
2305            }
2306
2307            "label_join" => {
2308                let (concat_expr, dst_label) = Self::build_concat_labels_expr(
2309                    &mut other_input_exprs,
2310                    &self.ctx,
2311                    query_engine_state,
2312                )?;
2313
2314                // Reserve the current field columns except the `dst_label`.
2315                for value in &self.ctx.field_columns {
2316                    if *value != dst_label {
2317                        let expr = DfExpr::Column(Column::from_name(value));
2318                        exprs.push(expr);
2319                    }
2320                }
2321
2322                // Remove it from tag columns if exists to avoid duplicated column names
2323                self.ctx.tag_columns.retain(|tag| *tag != dst_label);
2324                new_tags.push(dst_label);
2325                // Add the new label expr to evaluate
2326                exprs.push(concat_expr);
2327
2328                ScalarFunc::GeneratedExpr
2329            }
2330            "label_replace" => {
2331                if let Some((replace_expr, dst_label)) = self
2332                    .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
2333                {
2334                    // Reserve the current field columns except the `dst_label`.
2335                    for value in &self.ctx.field_columns {
2336                        if *value != dst_label {
2337                            let expr = DfExpr::Column(Column::from_name(value));
2338                            exprs.push(expr);
2339                        }
2340                    }
2341
2342                    ensure!(
2343                        !self.ctx.tag_columns.contains(&dst_label),
2344                        SameLabelSetSnafu
2345                    );
2346                    new_tags.push(dst_label);
2347                    // Add the new label expr to evaluate
2348                    exprs.push(replace_expr);
2349                } else {
2350                    // Keep the current field columns
2351                    for value in &self.ctx.field_columns {
2352                        let expr = DfExpr::Column(Column::from_name(value));
2353                        exprs.push(expr);
2354                    }
2355                }
2356
2357                ScalarFunc::GeneratedExpr
2358            }
2359            "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
2360                // These functions are not expression but a part of plan,
2361                // they are processed by `prom_call_expr_to_plan`.
2362                for value in &self.ctx.field_columns {
2363                    let expr = DfExpr::Column(Column::from_name(value));
2364                    exprs.push(expr);
2365                }
2366
2367                ScalarFunc::GeneratedExpr
2368            }
2369            "round" => {
2370                if other_input_exprs.is_empty() {
2371                    other_input_exprs.push_front(0.0f64.lit());
2372                }
2373                ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
2374            }
2375            "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
2376            "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
2377            "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
2378            "pi" => {
2379                // pi functions doesn't accepts any arguments, needs special processing
2380                let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2381                    func: datafusion::functions::math::pi(),
2382                    args: vec![],
2383                });
2384                exprs.push(fn_expr);
2385
2386                ScalarFunc::GeneratedExpr
2387            }
2388            _ => {
2389                if let Some(f) = query_engine_state
2390                    .session_state()
2391                    .scalar_functions()
2392                    .get(func.name)
2393                {
2394                    ScalarFunc::DataFusionBuiltin(f.clone())
2395                } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
2396                    let func_state = query_engine_state.function_state();
2397                    let query_ctx = self.table_provider.query_ctx();
2398
2399                    ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
2400                        state: func_state,
2401                        query_ctx: query_ctx.clone(),
2402                    })))
2403                } else if let Some(f) = datafusion_functions::math::functions()
2404                    .iter()
2405                    .find(|f| f.name() == func.name)
2406                {
2407                    ScalarFunc::DataFusionUdf(f.clone())
2408                } else {
2409                    return UnsupportedExprSnafu {
2410                        name: func.name.to_string(),
2411                    }
2412                    .fail();
2413                }
2414            }
2415        };
2416
2417        for value in &self.ctx.field_columns {
2418            let col_expr = DfExpr::Column(Column::from_name(value));
2419
2420            match scalar_func.clone() {
2421                ScalarFunc::DataFusionBuiltin(func) => {
2422                    other_input_exprs.insert(field_column_pos, col_expr);
2423                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2424                        func,
2425                        args: other_input_exprs.clone().into(),
2426                    });
2427                    exprs.push(fn_expr);
2428                    let _ = other_input_exprs.remove(field_column_pos);
2429                }
2430                ScalarFunc::DataFusionUdf(func) => {
2431                    let args = itertools::chain!(
2432                        other_input_exprs.iter().take(field_column_pos).cloned(),
2433                        std::iter::once(col_expr),
2434                        other_input_exprs.iter().skip(field_column_pos).cloned()
2435                    )
2436                    .collect_vec();
2437                    exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
2438                }
2439                ScalarFunc::Udf(func) => {
2440                    let ts_range_expr = DfExpr::Column(Column::from_name(
2441                        RangeManipulate::build_timestamp_range_name(
2442                            self.ctx.time_index_column.as_ref().unwrap(),
2443                        ),
2444                    ));
2445                    other_input_exprs.insert(field_column_pos, ts_range_expr);
2446                    other_input_exprs.insert(field_column_pos + 1, col_expr);
2447                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2448                        func,
2449                        args: other_input_exprs.clone().into(),
2450                    });
2451                    exprs.push(fn_expr);
2452                    let _ = other_input_exprs.remove(field_column_pos + 1);
2453                    let _ = other_input_exprs.remove(field_column_pos);
2454                }
2455                ScalarFunc::ExtrapolateUdf(func, range_length) => {
2456                    let ts_range_expr = DfExpr::Column(Column::from_name(
2457                        RangeManipulate::build_timestamp_range_name(
2458                            self.ctx.time_index_column.as_ref().unwrap(),
2459                        ),
2460                    ));
2461                    other_input_exprs.insert(field_column_pos, ts_range_expr);
2462                    other_input_exprs.insert(field_column_pos + 1, col_expr);
2463                    other_input_exprs
2464                        .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
2465                    other_input_exprs.push_back(lit(range_length));
2466                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2467                        func,
2468                        args: other_input_exprs.clone().into(),
2469                    });
2470                    exprs.push(fn_expr);
2471                    let _ = other_input_exprs.pop_back();
2472                    let _ = other_input_exprs.remove(field_column_pos + 2);
2473                    let _ = other_input_exprs.remove(field_column_pos + 1);
2474                    let _ = other_input_exprs.remove(field_column_pos);
2475                }
2476                ScalarFunc::GeneratedExpr => {}
2477            }
2478        }
2479
2480        // Update value columns' name, and alias them to remove qualifiers
2481        // For label functions such as `label_join`, `label_replace`, etc.,
2482        // we keep the fields unchanged.
2483        if !matches!(func.name, "label_join" | "label_replace") {
2484            let mut new_field_columns = Vec::with_capacity(exprs.len());
2485
2486            exprs = exprs
2487                .into_iter()
2488                .map(|expr| {
2489                    let display_name = expr.schema_name().to_string();
2490                    new_field_columns.push(display_name.clone());
2491                    Ok(expr.alias(display_name))
2492                })
2493                .collect::<std::result::Result<Vec<_>, _>>()
2494                .context(DataFusionPlanningSnafu)?;
2495
2496            self.ctx.field_columns = new_field_columns;
2497        }
2498
2499        Ok((exprs, new_tags))
2500    }
2501
2502    /// Validate label name according to Prometheus specification.
2503    /// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
2504    /// Additionally, label names starting with double underscores are reserved for internal use.
2505    fn validate_label_name(label_name: &str) -> Result<()> {
2506        // Check if label name starts with double underscores (reserved)
2507        if label_name.starts_with("__") {
2508            return InvalidDestinationLabelNameSnafu { label_name }.fail();
2509        }
2510        // Check if label name matches the required pattern
2511        if !LABEL_NAME_REGEX.is_match(label_name) {
2512            return InvalidDestinationLabelNameSnafu { label_name }.fail();
2513        }
2514
2515        Ok(())
2516    }
2517
2518    /// Build expr for `label_replace` function
2519    fn build_regexp_replace_label_expr(
2520        &self,
2521        other_input_exprs: &mut VecDeque<DfExpr>,
2522        query_engine_state: &QueryEngineState,
2523    ) -> Result<Option<(DfExpr, String)>> {
2524        // label_replace(vector, dst_label, replacement, src_label, regex)
2525        let dst_label = match other_input_exprs.pop_front() {
2526            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2527            other => UnexpectedPlanExprSnafu {
2528                desc: format!("expected dst_label string literal, but found {:?}", other),
2529            }
2530            .fail()?,
2531        };
2532
2533        // Validate the destination label name
2534        Self::validate_label_name(&dst_label)?;
2535        let replacement = match other_input_exprs.pop_front() {
2536            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
2537            other => UnexpectedPlanExprSnafu {
2538                desc: format!("expected replacement string literal, but found {:?}", other),
2539            }
2540            .fail()?,
2541        };
2542        let src_label = match other_input_exprs.pop_front() {
2543            Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2544            other => UnexpectedPlanExprSnafu {
2545                desc: format!("expected src_label string literal, but found {:?}", other),
2546            }
2547            .fail()?,
2548        };
2549
2550        let regex = match other_input_exprs.pop_front() {
2551            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2552            other => UnexpectedPlanExprSnafu {
2553                desc: format!("expected regex string literal, but found {:?}", other),
2554            }
2555            .fail()?,
2556        };
2557
2558        // Validate the regex before using it
2559        // doc: https://prometheus.io/docs/prometheus/latest/querying/functions/#label_replace
2560        regex::Regex::new(&regex).map_err(|_| {
2561            InvalidRegularExpressionSnafu {
2562                regex: regex.clone(),
2563            }
2564            .build()
2565        })?;
2566
2567        // If the src_label exists and regex is empty, keep everything unchanged.
2568        if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2569            return Ok(None);
2570        }
2571
2572        // If the src_label doesn't exists, and
2573        if !self.ctx.tag_columns.contains(&src_label) {
2574            if replacement.is_empty() {
2575                // the replacement is empty, keep everything unchanged.
2576                return Ok(None);
2577            } else {
2578                // the replacement is not empty, always adds dst_label with replacement value.
2579                return Ok(Some((
2580                    // alias literal `replacement` as dst_label
2581                    lit(replacement).alias(&dst_label),
2582                    dst_label,
2583                )));
2584            }
2585        }
2586
2587        // Preprocess the regex:
2588        // https://github.com/prometheus/prometheus/blob/d902abc50d6652ba8fe9a81ff8e5cce936114eba/promql/functions.go#L1575C32-L1575C37
2589        let regex = format!("^(?s:{regex})$");
2590
2591        let session_state = query_engine_state.session_state();
2592        let func = session_state
2593            .scalar_functions()
2594            .get("regexp_replace")
2595            .context(UnsupportedExprSnafu {
2596                name: "regexp_replace",
2597            })?;
2598
2599        // regexp_replace(src_label, regex, replacement)
2600        let args = vec![
2601            if src_label.is_empty() {
2602                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2603            } else {
2604                DfExpr::Column(Column::from_name(src_label))
2605            },
2606            DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2607            DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2608        ];
2609
2610        Ok(Some((
2611            DfExpr::ScalarFunction(ScalarFunction {
2612                func: func.clone(),
2613                args,
2614            })
2615            .alias(&dst_label),
2616            dst_label,
2617        )))
2618    }
2619
2620    /// Build expr for `label_join` function
2621    fn build_concat_labels_expr(
2622        other_input_exprs: &mut VecDeque<DfExpr>,
2623        ctx: &PromPlannerContext,
2624        query_engine_state: &QueryEngineState,
2625    ) -> Result<(DfExpr, String)> {
2626        // label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)
2627
2628        let dst_label = match other_input_exprs.pop_front() {
2629            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2630            other => UnexpectedPlanExprSnafu {
2631                desc: format!("expected dst_label string literal, but found {:?}", other),
2632            }
2633            .fail()?,
2634        };
2635        let separator = match other_input_exprs.pop_front() {
2636            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2637            other => UnexpectedPlanExprSnafu {
2638                desc: format!("expected separator string literal, but found {:?}", other),
2639            }
2640            .fail()?,
2641        };
2642
2643        // Create a set of available columns (tag columns + field columns + time index column)
2644        let available_columns: HashSet<&str> = ctx
2645            .tag_columns
2646            .iter()
2647            .chain(ctx.field_columns.iter())
2648            .chain(ctx.time_index_column.as_ref())
2649            .map(|s| s.as_str())
2650            .collect();
2651
2652        let src_labels = other_input_exprs
2653            .iter()
2654            .map(|expr| {
2655                // Cast source label into column or null literal
2656                match expr {
2657                    DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2658                        if label.is_empty() {
2659                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2660                        } else if available_columns.contains(label.as_str()) {
2661                            // Label exists in the table schema
2662                            Ok(DfExpr::Column(Column::from_name(label)))
2663                        } else {
2664                            // Label doesn't exist, treat as empty string (null)
2665                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2666                        }
2667                    }
2668                    other => UnexpectedPlanExprSnafu {
2669                        desc: format!(
2670                            "expected source label string literal, but found {:?}",
2671                            other
2672                        ),
2673                    }
2674                    .fail(),
2675                }
2676            })
2677            .collect::<Result<Vec<_>>>()?;
2678        ensure!(
2679            !src_labels.is_empty(),
2680            FunctionInvalidArgumentSnafu {
2681                fn_name: "label_join"
2682            }
2683        );
2684
2685        let session_state = query_engine_state.session_state();
2686        let func = session_state
2687            .scalar_functions()
2688            .get("concat_ws")
2689            .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2690
2691        // concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
2692        let mut args = Vec::with_capacity(1 + src_labels.len());
2693        args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2694        args.extend(src_labels);
2695
2696        Ok((
2697            DfExpr::ScalarFunction(ScalarFunction {
2698                func: func.clone(),
2699                args,
2700            })
2701            .alias(&dst_label),
2702            dst_label,
2703        ))
2704    }
2705
2706    fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2707        Ok(DfExpr::Column(Column::from_name(
2708            self.ctx
2709                .time_index_column
2710                .clone()
2711                .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2712        )))
2713    }
2714
2715    fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2716        let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2717        for tag in &self.ctx.tag_columns {
2718            let expr = DfExpr::Column(Column::from_name(tag));
2719            result.push(expr);
2720        }
2721        Ok(result)
2722    }
2723
2724    fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2725        let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2726        for field in &self.ctx.field_columns {
2727            let expr = DfExpr::Column(Column::from_name(field));
2728            result.push(expr);
2729        }
2730        Ok(result)
2731    }
2732
2733    fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2734        let mut result = self
2735            .ctx
2736            .tag_columns
2737            .iter()
2738            .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2739            .collect::<Vec<_>>();
2740        result.push(self.create_time_index_column_expr()?.sort(true, true));
2741        Ok(result)
2742    }
2743
2744    fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2745        self.ctx
2746            .field_columns
2747            .iter()
2748            .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2749            .collect::<Vec<_>>()
2750    }
2751
2752    fn create_sort_exprs_by_tags(
2753        func: &str,
2754        tags: Vec<DfExpr>,
2755        asc: bool,
2756    ) -> Result<Vec<SortExpr>> {
2757        ensure!(
2758            !tags.is_empty(),
2759            FunctionInvalidArgumentSnafu { fn_name: func }
2760        );
2761
2762        tags.iter()
2763            .map(|col| match col {
2764                DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2765                    Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2766                }
2767                other => UnexpectedPlanExprSnafu {
2768                    desc: format!("expected label string literal, but found {:?}", other),
2769                }
2770                .fail(),
2771            })
2772            .collect::<Result<Vec<_>>>()
2773    }
2774
2775    fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2776        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2777        for value in &self.ctx.field_columns {
2778            let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2779            exprs.push(expr);
2780        }
2781
2782        // This error context should be computed lazily: the planner may set `ctx.table_name` to
2783        // `None` for derived expressions (e.g. after projecting the LHS of a vector-vector
2784        // comparison filter). Eagerly calling `table_ref()?` here can turn a valid plan into
2785        // a `TableNameNotFound` error even when `conjunction(exprs)` succeeds.
2786        conjunction(exprs).with_context(|| ValueNotFoundSnafu {
2787            table: self
2788                .table_ref()
2789                .map(|t| t.to_quoted_string())
2790                .unwrap_or_else(|_| "unknown".to_string()),
2791        })
2792    }
2793
2794    /// Creates a set of DataFusion `DfExpr::AggregateFunction` expressions for each value column using the specified aggregate function.
2795    ///
2796    /// # Side Effects
2797    ///
2798    /// This method modifies the value columns in the context by replacing them with the new columns
2799    /// created by the aggregate function application.
2800    ///
2801    /// # Returns
2802    ///
2803    /// Returns a tuple of `(aggregate_expressions, previous_field_expressions)` where:
2804    /// - `aggregate_expressions`: Expressions that apply the aggregate function to the original fields
2805    /// - `previous_field_expressions`: Original field expressions before aggregation. This is non-empty
2806    ///   only when the operation is `count_values`, as this operation requires preserving the original
2807    ///   values for grouping.
2808    ///
2809    fn create_aggregate_exprs(
2810        &mut self,
2811        op: TokenType,
2812        param: &Option<Box<PromExpr>>,
2813        input_plan: &LogicalPlan,
2814    ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2815        let mut non_col_args = Vec::new();
2816        let is_group_agg = op.id() == token::T_GROUP;
2817        if is_group_agg {
2818            ensure!(
2819                self.ctx.field_columns.len() == 1,
2820                MultiFieldsNotSupportedSnafu {
2821                    operator: "group()"
2822                }
2823            );
2824        }
2825        let aggr = match op.id() {
2826            token::T_SUM => sum_udaf(),
2827            token::T_QUANTILE => {
2828                let q =
2829                    Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2830                non_col_args.push(q);
2831                quantile_udaf()
2832            }
2833            token::T_AVG => avg_udaf(),
2834            token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2835            token::T_MIN => min_udaf(),
2836            token::T_MAX => max_udaf(),
2837            // PromQL's `group()` aggregator produces 1 for each group.
2838            // Use `max(1.0)` (per-group) to match semantics and output type (Float64).
2839            token::T_GROUP => max_udaf(),
2840            token::T_STDDEV => stddev_pop_udaf(),
2841            token::T_STDVAR => var_pop_udaf(),
2842            token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2843                name: format!("{op:?}"),
2844            }
2845            .fail()?,
2846            _ => UnexpectedTokenSnafu { token: op }.fail()?,
2847        };
2848
2849        // perform aggregate operation to each value column
2850        let exprs: Vec<DfExpr> = self
2851            .ctx
2852            .field_columns
2853            .iter()
2854            .map(|col| {
2855                if is_group_agg {
2856                    aggr.call(vec![lit(1_f64)])
2857                } else {
2858                    non_col_args.push(DfExpr::Column(Column::from_name(col)));
2859                    let expr = aggr.call(non_col_args.clone());
2860                    non_col_args.pop();
2861                    expr
2862                }
2863            })
2864            .collect::<Vec<_>>();
2865
2866        // if the aggregator is `count_values`, it must be grouped by current fields.
2867        let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2868            let prev_field_exprs: Vec<_> = self
2869                .ctx
2870                .field_columns
2871                .iter()
2872                .map(|col| DfExpr::Column(Column::from_name(col)))
2873                .collect();
2874
2875            ensure!(
2876                self.ctx.field_columns.len() == 1,
2877                UnsupportedExprSnafu {
2878                    name: "count_values on multi-value input"
2879                }
2880            );
2881
2882            prev_field_exprs
2883        } else {
2884            vec![]
2885        };
2886
2887        // update value column name according to the aggregators,
2888        let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2889
2890        let normalized_exprs =
2891            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2892        for expr in normalized_exprs {
2893            new_field_columns.push(expr.schema_name().to_string());
2894        }
2895        self.ctx.field_columns = new_field_columns;
2896
2897        Ok((exprs, prev_field_exprs))
2898    }
2899
2900    fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2901        let param = param
2902            .as_deref()
2903            .with_context(|| FunctionInvalidArgumentSnafu {
2904                fn_name: op.to_string(),
2905            })?;
2906        let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2907            return FunctionInvalidArgumentSnafu {
2908                fn_name: op.to_string(),
2909            }
2910            .fail();
2911        };
2912
2913        Ok(val)
2914    }
2915
2916    fn get_param_as_literal_expr(
2917        param: &Option<Box<PromExpr>>,
2918        op: Option<TokenType>,
2919        expected_type: Option<ArrowDataType>,
2920    ) -> Result<DfExpr> {
2921        let prom_param = param.as_deref().with_context(|| {
2922            if let Some(op) = op {
2923                FunctionInvalidArgumentSnafu {
2924                    fn_name: op.to_string(),
2925                }
2926            } else {
2927                FunctionInvalidArgumentSnafu {
2928                    fn_name: "unknown".to_string(),
2929                }
2930            }
2931        })?;
2932
2933        let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2934            if let Some(op) = op {
2935                FunctionInvalidArgumentSnafu {
2936                    fn_name: op.to_string(),
2937                }
2938            } else {
2939                FunctionInvalidArgumentSnafu {
2940                    fn_name: "unknown".to_string(),
2941                }
2942            }
2943        })?;
2944
2945        // check if the type is expected
2946        if let Some(expected_type) = expected_type {
2947            // literal should not have reference to column
2948            let expr_type = expr
2949                .get_type(&DFSchema::empty())
2950                .context(DataFusionPlanningSnafu)?;
2951            if expected_type != expr_type {
2952                return FunctionInvalidArgumentSnafu {
2953                    fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2954                }
2955                .fail();
2956            }
2957        }
2958
2959        Ok(expr)
2960    }
2961
2962    /// Create [DfExpr::WindowFunction] expr for each value column with given window function.
2963    ///
2964    fn create_window_exprs(
2965        &mut self,
2966        op: TokenType,
2967        group_exprs: Vec<DfExpr>,
2968        input_plan: &LogicalPlan,
2969    ) -> Result<Vec<DfExpr>> {
2970        ensure!(
2971            self.ctx.field_columns.len() == 1,
2972            UnsupportedExprSnafu {
2973                name: "topk or bottomk on multi-value input"
2974            }
2975        );
2976
2977        assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2978
2979        let asc = matches!(op.id(), token::T_BOTTOMK);
2980
2981        let tag_sort_exprs = self
2982            .create_tag_column_exprs()?
2983            .into_iter()
2984            .map(|expr| expr.sort(asc, true));
2985
2986        // perform window operation to each value column
2987        let exprs: Vec<DfExpr> = self
2988            .ctx
2989            .field_columns
2990            .iter()
2991            .map(|col| {
2992                let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2993                // Order by value in the specific order
2994                sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2995                // Then tags if the values are equal,
2996                // Try to ensure the relative stability of the output results.
2997                sort_exprs.extend(tag_sort_exprs.clone());
2998
2999                DfExpr::WindowFunction(Box::new(WindowFunction {
3000                    fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
3001                    params: WindowFunctionParams {
3002                        args: vec![],
3003                        partition_by: group_exprs.clone(),
3004                        order_by: sort_exprs,
3005                        window_frame: WindowFrame::new(Some(true)),
3006                        null_treatment: None,
3007                        distinct: false,
3008                        filter: None,
3009                    },
3010                }))
3011            })
3012            .collect();
3013
3014        let normalized_exprs =
3015            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
3016        Ok(normalized_exprs)
3017    }
3018
3019    /// Try to build a [f64] from [PromExpr].
3020    #[deprecated(
3021        note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
3022    )]
3023    fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
3024        match expr {
3025            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
3026            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
3027            PromExpr::Unary(UnaryExpr { expr, .. }) => {
3028                Self::try_build_float_literal(expr).map(|f| -f)
3029            }
3030            PromExpr::StringLiteral(_)
3031            | PromExpr::Binary(_)
3032            | PromExpr::VectorSelector(_)
3033            | PromExpr::MatrixSelector(_)
3034            | PromExpr::Call(_)
3035            | PromExpr::Extension(_)
3036            | PromExpr::Aggregate(_)
3037            | PromExpr::Subquery(_) => None,
3038        }
3039    }
3040
3041    /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
3042    async fn create_histogram_plan(
3043        &mut self,
3044        args: &PromFunctionArgs,
3045        query_engine_state: &QueryEngineState,
3046    ) -> Result<LogicalPlan> {
3047        if args.args.len() != 2 {
3048            return FunctionInvalidArgumentSnafu {
3049                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3050            }
3051            .fail();
3052        }
3053        #[allow(deprecated)]
3054        let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
3055            FunctionInvalidArgumentSnafu {
3056                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3057            }
3058        })?;
3059
3060        let input = args.args[1].as_ref().clone();
3061        let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
3062        // `histogram_quantile` folds buckets across `le`, so `__tsid` (which includes `le`) is not
3063        // a stable series identifier anymore. Also, HistogramFold infers label columns from the
3064        // input schema and must not treat `__tsid` as a label column.
3065        let input_plan = self.strip_tsid_column(input_plan)?;
3066        self.ctx.use_tsid = false;
3067
3068        if !self.ctx.has_le_tag() {
3069            // Return empty result instead of error when 'le' column is not found
3070            // This handles the case when histogram metrics don't exist
3071            return Ok(LogicalPlan::EmptyRelation(
3072                datafusion::logical_expr::EmptyRelation {
3073                    produce_one_row: false,
3074                    schema: Arc::new(DFSchema::empty()),
3075                },
3076            ));
3077        }
3078        let time_index_column =
3079            self.ctx
3080                .time_index_column
3081                .clone()
3082                .with_context(|| TimeIndexNotFoundSnafu {
3083                    table: self.ctx.table_name.clone().unwrap_or_default(),
3084                })?;
3085        // FIXME(ruihang): support multi fields
3086        let field_column = self
3087            .ctx
3088            .field_columns
3089            .first()
3090            .with_context(|| FunctionInvalidArgumentSnafu {
3091                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3092            })?
3093            .clone();
3094        // remove le column from tag columns
3095        self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
3096
3097        Ok(LogicalPlan::Extension(Extension {
3098            node: Arc::new(
3099                HistogramFold::new(
3100                    LE_COLUMN_NAME.to_string(),
3101                    field_column,
3102                    time_index_column,
3103                    phi,
3104                    input_plan,
3105                )
3106                .context(DataFusionPlanningSnafu)?,
3107            ),
3108        }))
3109    }
3110
3111    /// Create a [SPECIAL_VECTOR_FUNCTION] plan
3112    async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
3113        if args.args.len() != 1 {
3114            return FunctionInvalidArgumentSnafu {
3115                fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
3116            }
3117            .fail();
3118        }
3119        let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
3120
3121        // reuse `SPECIAL_TIME_FUNCTION` as name of time index column
3122        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
3123        self.ctx.reset_table_name_and_schema();
3124        self.ctx.tag_columns = vec![];
3125        self.ctx.field_columns = vec![greptime_value().to_string()];
3126        Ok(LogicalPlan::Extension(Extension {
3127            node: Arc::new(
3128                EmptyMetric::new(
3129                    self.ctx.start,
3130                    self.ctx.end,
3131                    self.ctx.interval,
3132                    SPECIAL_TIME_FUNCTION.to_string(),
3133                    greptime_value().to_string(),
3134                    Some(lit),
3135                )
3136                .context(DataFusionPlanningSnafu)?,
3137            ),
3138        }))
3139    }
3140
3141    /// Create a [SCALAR_FUNCTION] plan
3142    async fn create_scalar_plan(
3143        &mut self,
3144        args: &PromFunctionArgs,
3145        query_engine_state: &QueryEngineState,
3146    ) -> Result<LogicalPlan> {
3147        ensure!(
3148            args.len() == 1,
3149            FunctionInvalidArgumentSnafu {
3150                fn_name: SCALAR_FUNCTION
3151            }
3152        );
3153        let input = self
3154            .prom_expr_to_plan(&args.args[0], query_engine_state)
3155            .await?;
3156        ensure!(
3157            self.ctx.field_columns.len() == 1,
3158            MultiFieldsNotSupportedSnafu {
3159                operator: SCALAR_FUNCTION
3160            },
3161        );
3162        let scalar_plan = LogicalPlan::Extension(Extension {
3163            node: Arc::new(
3164                ScalarCalculate::new(
3165                    self.ctx.start,
3166                    self.ctx.end,
3167                    self.ctx.interval,
3168                    input,
3169                    self.ctx.time_index_column.as_ref().unwrap(),
3170                    &self.ctx.tag_columns,
3171                    &self.ctx.field_columns[0],
3172                    self.ctx.table_name.as_deref(),
3173                )
3174                .context(PromqlPlanNodeSnafu)?,
3175            ),
3176        });
3177        // scalar plan have no tag columns
3178        self.ctx.tag_columns.clear();
3179        self.ctx.field_columns.clear();
3180        self.ctx
3181            .field_columns
3182            .push(scalar_plan.schema().field(1).name().clone());
3183        Ok(scalar_plan)
3184    }
3185
3186    /// Create a [SPECIAL_ABSENT_FUNCTION] plan
3187    async fn create_absent_plan(
3188        &mut self,
3189        args: &PromFunctionArgs,
3190        query_engine_state: &QueryEngineState,
3191    ) -> Result<LogicalPlan> {
3192        if args.args.len() != 1 {
3193            return FunctionInvalidArgumentSnafu {
3194                fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
3195            }
3196            .fail();
3197        }
3198        let input = self
3199            .prom_expr_to_plan(&args.args[0], query_engine_state)
3200            .await?;
3201
3202        let time_index_expr = self.create_time_index_column_expr()?;
3203        let first_field_expr =
3204            self.create_field_column_exprs()?
3205                .pop()
3206                .with_context(|| ValueNotFoundSnafu {
3207                    table: self.ctx.table_name.clone().unwrap_or_default(),
3208                })?;
3209        let first_value_expr = first_value(first_field_expr, vec![]);
3210
3211        let ordered_aggregated_input = LogicalPlanBuilder::from(input)
3212            .aggregate(
3213                vec![time_index_expr.clone()],
3214                vec![first_value_expr.clone()],
3215            )
3216            .context(DataFusionPlanningSnafu)?
3217            .sort(vec![time_index_expr.sort(true, false)])
3218            .context(DataFusionPlanningSnafu)?
3219            .build()
3220            .context(DataFusionPlanningSnafu)?;
3221
3222        let fake_labels = self
3223            .ctx
3224            .selector_matcher
3225            .iter()
3226            .filter_map(|matcher| match matcher.op {
3227                MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
3228                _ => None,
3229            })
3230            .collect::<Vec<_>>();
3231
3232        // Create the absent plan
3233        let absent_plan = LogicalPlan::Extension(Extension {
3234            node: Arc::new(
3235                Absent::try_new(
3236                    self.ctx.start,
3237                    self.ctx.end,
3238                    self.ctx.interval,
3239                    self.ctx.time_index_column.as_ref().unwrap().clone(),
3240                    self.ctx.field_columns[0].clone(),
3241                    fake_labels,
3242                    ordered_aggregated_input,
3243                )
3244                .context(DataFusionPlanningSnafu)?,
3245            ),
3246        });
3247
3248        Ok(absent_plan)
3249    }
3250
3251    /// Try to build a DataFusion Literal Expression from PromQL Expr, return
3252    /// `None` if the input is not a literal expression.
3253    fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
3254        match expr {
3255            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
3256            PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
3257            PromExpr::VectorSelector(_)
3258            | PromExpr::MatrixSelector(_)
3259            | PromExpr::Extension(_)
3260            | PromExpr::Aggregate(_)
3261            | PromExpr::Subquery(_) => None,
3262            PromExpr::Call(Call { func, .. }) => {
3263                if func.name == SPECIAL_TIME_FUNCTION {
3264                    // For time() function, don't treat it as a literal
3265                    // Let it be handled as a regular function call
3266                    None
3267                } else {
3268                    None
3269                }
3270            }
3271            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
3272            // TODO(ruihang): support Unary operator
3273            PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
3274            PromExpr::Binary(PromBinaryExpr {
3275                lhs,
3276                rhs,
3277                op,
3278                modifier,
3279            }) => {
3280                let lhs = Self::try_build_literal_expr(lhs)?;
3281                let rhs = Self::try_build_literal_expr(rhs)?;
3282                let is_comparison_op = Self::is_token_a_comparison_op(*op);
3283                let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
3284                let expr = expr_builder(lhs, rhs).ok()?;
3285
3286                let should_return_bool = if let Some(m) = modifier {
3287                    m.return_bool
3288                } else {
3289                    false
3290                };
3291                if is_comparison_op && should_return_bool {
3292                    Some(DfExpr::Cast(Cast {
3293                        expr: Box::new(expr),
3294                        data_type: ArrowDataType::Float64,
3295                    }))
3296                } else {
3297                    Some(expr)
3298                }
3299            }
3300        }
3301    }
3302
3303    fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
3304        match expr {
3305            PromExpr::Call(Call { func, .. }) => {
3306                if func.name == SPECIAL_TIME_FUNCTION
3307                    && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
3308                {
3309                    Some(build_special_time_expr(time_index_col))
3310                } else {
3311                    None
3312                }
3313            }
3314            _ => None,
3315        }
3316    }
3317
3318    /// Return a lambda to build binary expression from token.
3319    /// Because some binary operator are function in DataFusion like `atan2` or `^`.
3320    #[allow(clippy::type_complexity)]
3321    fn prom_token_to_binary_expr_builder(
3322        token: TokenType,
3323    ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
3324        match token.id() {
3325            token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
3326            token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
3327            token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
3328            token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
3329            token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
3330            token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
3331            token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
3332            token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
3333            token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
3334            token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
3335            token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
3336            token::T_POW => Ok(Box::new(|lhs, rhs| {
3337                Ok(DfExpr::ScalarFunction(ScalarFunction {
3338                    func: datafusion_functions::math::power(),
3339                    args: vec![lhs, rhs],
3340                }))
3341            })),
3342            token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
3343                Ok(DfExpr::ScalarFunction(ScalarFunction {
3344                    func: datafusion_functions::math::atan2(),
3345                    args: vec![lhs, rhs],
3346                }))
3347            })),
3348            _ => UnexpectedTokenSnafu { token }.fail(),
3349        }
3350    }
3351
3352    /// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
3353    fn is_token_a_comparison_op(token: TokenType) -> bool {
3354        matches!(
3355            token.id(),
3356            token::T_EQLC
3357                | token::T_NEQ
3358                | token::T_GTR
3359                | token::T_LSS
3360                | token::T_GTE
3361                | token::T_LTE
3362        )
3363    }
3364
3365    /// Check if the given op is a set operator (UNION, INTERSECT and EXCEPT in SQL).
3366    fn is_token_a_set_op(token: TokenType) -> bool {
3367        matches!(
3368            token.id(),
3369            token::T_LAND // INTERSECT
3370                | token::T_LOR // UNION
3371                | token::T_LUNLESS // EXCEPT
3372        )
3373    }
3374
3375    /// Build a inner join on time index column and tag columns to concat two logical plans.
3376    /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
3377    #[allow(clippy::too_many_arguments)]
3378    fn join_on_non_field_columns(
3379        &self,
3380        left: LogicalPlan,
3381        right: LogicalPlan,
3382        left_table_ref: TableReference,
3383        right_table_ref: TableReference,
3384        left_time_index_column: Option<String>,
3385        right_time_index_column: Option<String>,
3386        only_join_time_index: bool,
3387        modifier: &Option<BinModifier>,
3388    ) -> Result<LogicalPlan> {
3389        let mut left_tag_columns = if only_join_time_index {
3390            BTreeSet::new()
3391        } else {
3392            self.ctx
3393                .tag_columns
3394                .iter()
3395                .cloned()
3396                .collect::<BTreeSet<_>>()
3397        };
3398        let mut right_tag_columns = left_tag_columns.clone();
3399
3400        // apply modifier
3401        if let Some(modifier) = modifier {
3402            // apply label modifier
3403            if let Some(matching) = &modifier.matching {
3404                match matching {
3405                    // keeps columns mentioned in `on`
3406                    LabelModifier::Include(on) => {
3407                        let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
3408                        left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
3409                        right_tag_columns =
3410                            right_tag_columns.intersection(&mask).cloned().collect();
3411                    }
3412                    // removes columns memtioned in `ignoring`
3413                    LabelModifier::Exclude(ignoring) => {
3414                        // doesn't check existence of label
3415                        for label in &ignoring.labels {
3416                            let _ = left_tag_columns.remove(label);
3417                            let _ = right_tag_columns.remove(label);
3418                        }
3419                    }
3420                }
3421            }
3422        }
3423
3424        // push time index column if it exists
3425        if let (Some(left_time_index_column), Some(right_time_index_column)) =
3426            (left_time_index_column, right_time_index_column)
3427        {
3428            left_tag_columns.insert(left_time_index_column);
3429            right_tag_columns.insert(right_time_index_column);
3430        }
3431
3432        let right = LogicalPlanBuilder::from(right)
3433            .alias(right_table_ref)
3434            .context(DataFusionPlanningSnafu)?
3435            .build()
3436            .context(DataFusionPlanningSnafu)?;
3437
3438        // Inner Join on time index column to concat two operator
3439        LogicalPlanBuilder::from(left)
3440            .alias(left_table_ref)
3441            .context(DataFusionPlanningSnafu)?
3442            .join_detailed(
3443                right,
3444                JoinType::Inner,
3445                (
3446                    left_tag_columns
3447                        .into_iter()
3448                        .map(Column::from_name)
3449                        .collect::<Vec<_>>(),
3450                    right_tag_columns
3451                        .into_iter()
3452                        .map(Column::from_name)
3453                        .collect::<Vec<_>>(),
3454                ),
3455                None,
3456                NullEquality::NullEqualsNull,
3457            )
3458            .context(DataFusionPlanningSnafu)?
3459            .build()
3460            .context(DataFusionPlanningSnafu)
3461    }
3462
3463    /// Build a set operator (AND/OR/UNLESS)
3464    fn set_op_on_non_field_columns(
3465        &mut self,
3466        left: LogicalPlan,
3467        mut right: LogicalPlan,
3468        left_context: PromPlannerContext,
3469        right_context: PromPlannerContext,
3470        op: TokenType,
3471        modifier: &Option<BinModifier>,
3472    ) -> Result<LogicalPlan> {
3473        let mut left_tag_col_set = left_context
3474            .tag_columns
3475            .iter()
3476            .cloned()
3477            .collect::<HashSet<_>>();
3478        let mut right_tag_col_set = right_context
3479            .tag_columns
3480            .iter()
3481            .cloned()
3482            .collect::<HashSet<_>>();
3483
3484        if matches!(op.id(), token::T_LOR) {
3485            return self.or_operator(
3486                left,
3487                right,
3488                left_tag_col_set,
3489                right_tag_col_set,
3490                left_context,
3491                right_context,
3492                modifier,
3493            );
3494        }
3495
3496        // apply modifier
3497        if let Some(modifier) = modifier {
3498            // one-to-many and many-to-one are not supported
3499            ensure!(
3500                matches!(
3501                    modifier.card,
3502                    VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
3503                ),
3504                UnsupportedVectorMatchSnafu {
3505                    name: modifier.card.clone(),
3506                },
3507            );
3508            // apply label modifier
3509            if let Some(matching) = &modifier.matching {
3510                match matching {
3511                    // keeps columns mentioned in `on`
3512                    LabelModifier::Include(on) => {
3513                        let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
3514                        left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
3515                        right_tag_col_set =
3516                            right_tag_col_set.intersection(&mask).cloned().collect();
3517                    }
3518                    // removes columns memtioned in `ignoring`
3519                    LabelModifier::Exclude(ignoring) => {
3520                        // doesn't check existence of label
3521                        for label in &ignoring.labels {
3522                            let _ = left_tag_col_set.remove(label);
3523                            let _ = right_tag_col_set.remove(label);
3524                        }
3525                    }
3526                }
3527            }
3528        }
3529        // ensure two sides have the same tag columns
3530        if !matches!(op.id(), token::T_LOR) {
3531            ensure!(
3532                left_tag_col_set == right_tag_col_set,
3533                CombineTableColumnMismatchSnafu {
3534                    left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
3535                    right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
3536                }
3537            )
3538        };
3539        let left_time_index = left_context.time_index_column.clone().unwrap();
3540        let right_time_index = right_context.time_index_column.clone().unwrap();
3541        let join_keys = left_tag_col_set
3542            .iter()
3543            .cloned()
3544            .chain([left_time_index.clone()])
3545            .collect::<Vec<_>>();
3546        self.ctx.time_index_column = Some(left_time_index.clone());
3547        self.ctx.use_tsid = left_context.use_tsid;
3548
3549        // alias right time index column if necessary
3550        if left_context.time_index_column != right_context.time_index_column {
3551            let right_project_exprs = right
3552                .schema()
3553                .fields()
3554                .iter()
3555                .map(|field| {
3556                    if field.name() == &right_time_index {
3557                        DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
3558                    } else {
3559                        DfExpr::Column(Column::from_name(field.name()))
3560                    }
3561                })
3562                .collect::<Vec<_>>();
3563
3564            right = LogicalPlanBuilder::from(right)
3565                .project(right_project_exprs)
3566                .context(DataFusionPlanningSnafu)?
3567                .build()
3568                .context(DataFusionPlanningSnafu)?;
3569        }
3570
3571        ensure!(
3572            left_context.field_columns.len() == 1,
3573            MultiFieldsNotSupportedSnafu {
3574                operator: "AND operator"
3575            }
3576        );
3577        // Update the field column in context.
3578        // The AND/UNLESS operator only keep the field column in left input.
3579        let left_field_col = left_context.field_columns.first().unwrap();
3580        self.ctx.field_columns = vec![left_field_col.clone()];
3581
3582        // Generate join plan.
3583        // All set operations in PromQL are "distinct"
3584        match op.id() {
3585            token::T_LAND => LogicalPlanBuilder::from(left)
3586                .distinct()
3587                .context(DataFusionPlanningSnafu)?
3588                .join_detailed(
3589                    right,
3590                    JoinType::LeftSemi,
3591                    (join_keys.clone(), join_keys),
3592                    None,
3593                    NullEquality::NullEqualsNull,
3594                )
3595                .context(DataFusionPlanningSnafu)?
3596                .build()
3597                .context(DataFusionPlanningSnafu),
3598            token::T_LUNLESS => LogicalPlanBuilder::from(left)
3599                .distinct()
3600                .context(DataFusionPlanningSnafu)?
3601                .join_detailed(
3602                    right,
3603                    JoinType::LeftAnti,
3604                    (join_keys.clone(), join_keys),
3605                    None,
3606                    NullEquality::NullEqualsNull,
3607                )
3608                .context(DataFusionPlanningSnafu)?
3609                .build()
3610                .context(DataFusionPlanningSnafu),
3611            token::T_LOR => {
3612                // OR is handled at the beginning of this function, as it cannot
3613                // be expressed using JOIN like AND and UNLESS.
3614                unreachable!()
3615            }
3616            _ => UnexpectedTokenSnafu { token: op }.fail(),
3617        }
3618    }
3619
3620    // TODO(ruihang): change function name
3621    #[allow(clippy::too_many_arguments)]
3622    fn or_operator(
3623        &mut self,
3624        left: LogicalPlan,
3625        right: LogicalPlan,
3626        left_tag_cols_set: HashSet<String>,
3627        right_tag_cols_set: HashSet<String>,
3628        left_context: PromPlannerContext,
3629        right_context: PromPlannerContext,
3630        modifier: &Option<BinModifier>,
3631    ) -> Result<LogicalPlan> {
3632        // checks
3633        ensure!(
3634            left_context.field_columns.len() == right_context.field_columns.len(),
3635            CombineTableColumnMismatchSnafu {
3636                left: left_context.field_columns.clone(),
3637                right: right_context.field_columns.clone()
3638            }
3639        );
3640        ensure!(
3641            left_context.field_columns.len() == 1,
3642            MultiFieldsNotSupportedSnafu {
3643                operator: "OR operator"
3644            }
3645        );
3646
3647        // prepare hash sets
3648        let all_tags = left_tag_cols_set
3649            .union(&right_tag_cols_set)
3650            .cloned()
3651            .collect::<HashSet<_>>();
3652        let tags_not_in_left = all_tags
3653            .difference(&left_tag_cols_set)
3654            .cloned()
3655            .collect::<Vec<_>>();
3656        let tags_not_in_right = all_tags
3657            .difference(&right_tag_cols_set)
3658            .cloned()
3659            .collect::<Vec<_>>();
3660        let left_qualifier = left.schema().qualified_field(0).0.cloned();
3661        let right_qualifier = right.schema().qualified_field(0).0.cloned();
3662        let left_qualifier_string = left_qualifier
3663            .as_ref()
3664            .map(|l| l.to_string())
3665            .unwrap_or_default();
3666        let right_qualifier_string = right_qualifier
3667            .as_ref()
3668            .map(|r| r.to_string())
3669            .unwrap_or_default();
3670        let left_time_index_column =
3671            left_context
3672                .time_index_column
3673                .clone()
3674                .with_context(|| TimeIndexNotFoundSnafu {
3675                    table: left_qualifier_string.clone(),
3676                })?;
3677        let right_time_index_column =
3678            right_context
3679                .time_index_column
3680                .clone()
3681                .with_context(|| TimeIndexNotFoundSnafu {
3682                    table: right_qualifier_string.clone(),
3683                })?;
3684        // Take the name of first field column. The length is checked above.
3685        let left_field_col = left_context.field_columns.first().unwrap();
3686        let right_field_col = right_context.field_columns.first().unwrap();
3687        let left_has_tsid = left
3688            .schema()
3689            .fields()
3690            .iter()
3691            .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3692        let right_has_tsid = right
3693            .schema()
3694            .fields()
3695            .iter()
3696            .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3697
3698        // step 0: fill all columns in output schema
3699        let mut all_columns_set = left
3700            .schema()
3701            .fields()
3702            .iter()
3703            .chain(right.schema().fields().iter())
3704            .map(|field| field.name().clone())
3705            .collect::<HashSet<_>>();
3706        // Keep `__tsid` only when both sides contain it, otherwise it may break schema alignment
3707        // (e.g. `unknown_metric or some_metric`).
3708        if !(left_has_tsid && right_has_tsid) {
3709            all_columns_set.remove(DATA_SCHEMA_TSID_COLUMN_NAME);
3710        }
3711        // remove time index column
3712        all_columns_set.remove(&left_time_index_column);
3713        all_columns_set.remove(&right_time_index_column);
3714        // remove field column in the right
3715        if left_field_col != right_field_col {
3716            all_columns_set.remove(right_field_col);
3717        }
3718        let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3719        // sort to ensure the generated schema is not volatile
3720        all_columns.sort_unstable();
3721        // use left time index column name as the result time index column name
3722        all_columns.insert(0, left_time_index_column.clone());
3723
3724        // step 1: align schema using project, fill non-exist columns with null
3725        let left_proj_exprs = all_columns.iter().map(|col| {
3726            if tags_not_in_left.contains(col) {
3727                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3728            } else {
3729                DfExpr::Column(Column::new(None::<String>, col))
3730            }
3731        });
3732        let right_time_index_expr = DfExpr::Column(Column::new(
3733            right_qualifier.clone(),
3734            right_time_index_column,
3735        ))
3736        .alias(left_time_index_column.clone());
3737        // The field column in right side may not have qualifier (it may be removed by join operation),
3738        // so we need to find it from the schema.
3739        let right_qualifier_for_field = right
3740            .schema()
3741            .iter()
3742            .find(|(_, f)| f.name() == right_field_col)
3743            .map(|(q, _)| q)
3744            .with_context(|| ColumnNotFoundSnafu {
3745                col: right_field_col.clone(),
3746            })?
3747            .cloned();
3748
3749        // `skip(1)` to skip the time index column
3750        let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3751            // expr
3752            if col == left_field_col && left_field_col != right_field_col {
3753                // qualify field in right side if necessary to handle different field name
3754                DfExpr::Column(Column::new(
3755                    right_qualifier_for_field.clone(),
3756                    right_field_col,
3757                ))
3758            } else if tags_not_in_right.contains(col) {
3759                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3760            } else {
3761                DfExpr::Column(Column::new(None::<String>, col))
3762            }
3763        });
3764        let right_proj_exprs = [right_time_index_expr]
3765            .into_iter()
3766            .chain(right_proj_exprs_without_time_index);
3767
3768        let left_projected = LogicalPlanBuilder::from(left)
3769            .project(left_proj_exprs)
3770            .context(DataFusionPlanningSnafu)?
3771            .alias(left_qualifier_string.clone())
3772            .context(DataFusionPlanningSnafu)?
3773            .build()
3774            .context(DataFusionPlanningSnafu)?;
3775        let right_projected = LogicalPlanBuilder::from(right)
3776            .project(right_proj_exprs)
3777            .context(DataFusionPlanningSnafu)?
3778            .alias(right_qualifier_string.clone())
3779            .context(DataFusionPlanningSnafu)?
3780            .build()
3781            .context(DataFusionPlanningSnafu)?;
3782
3783        // step 2: compute match columns
3784        let mut match_columns = if let Some(modifier) = modifier
3785            && let Some(matching) = &modifier.matching
3786        {
3787            match matching {
3788                // keeps columns mentioned in `on`
3789                LabelModifier::Include(on) => on.labels.clone(),
3790                // removes columns memtioned in `ignoring`
3791                LabelModifier::Exclude(ignoring) => {
3792                    let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3793                    all_tags.difference(&ignoring).cloned().collect()
3794                }
3795            }
3796        } else {
3797            all_tags.iter().cloned().collect()
3798        };
3799        // sort to ensure the generated plan is not volatile
3800        match_columns.sort_unstable();
3801        // step 3: build `UnionDistinctOn` plan
3802        let schema = left_projected.schema().clone();
3803        let union_distinct_on = UnionDistinctOn::new(
3804            left_projected,
3805            right_projected,
3806            match_columns,
3807            left_time_index_column.clone(),
3808            schema,
3809        );
3810        let result = LogicalPlan::Extension(Extension {
3811            node: Arc::new(union_distinct_on),
3812        });
3813
3814        // step 4: update context
3815        self.ctx.time_index_column = Some(left_time_index_column);
3816        self.ctx.tag_columns = all_tags.into_iter().collect();
3817        self.ctx.field_columns = vec![left_field_col.clone()];
3818        self.ctx.use_tsid = left_has_tsid && right_has_tsid;
3819
3820        Ok(result)
3821    }
3822
3823    /// Build a projection that project and perform operation expr for every value columns.
3824    /// Non-value columns (tag and timestamp) will be preserved in the projection.
3825    ///
3826    /// # Side effect
3827    ///
3828    /// This function will update the value columns in the context. Those new column names
3829    /// don't contains qualifier.
3830    fn projection_for_each_field_column<F>(
3831        &mut self,
3832        input: LogicalPlan,
3833        name_to_expr: F,
3834    ) -> Result<LogicalPlan>
3835    where
3836        F: FnMut(&String) -> Result<DfExpr>,
3837    {
3838        let non_field_columns_iter = self
3839            .ctx
3840            .tag_columns
3841            .iter()
3842            .chain(self.ctx.time_index_column.iter())
3843            .map(|col| {
3844                Ok(DfExpr::Column(Column::new(
3845                    self.ctx.table_name.clone().map(TableReference::bare),
3846                    col,
3847                )))
3848            });
3849
3850        // build computation exprs
3851        let result_field_columns = self
3852            .ctx
3853            .field_columns
3854            .iter()
3855            .map(name_to_expr)
3856            .collect::<Result<Vec<_>>>()?;
3857
3858        // alias the computation exprs to remove qualifier
3859        self.ctx.field_columns = result_field_columns
3860            .iter()
3861            .map(|expr| expr.schema_name().to_string())
3862            .collect();
3863        let field_columns_iter = result_field_columns
3864            .into_iter()
3865            .zip(self.ctx.field_columns.iter())
3866            .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3867
3868        // chain non-field columns (unchanged) and field columns (applied computation then alias)
3869        let project_fields = non_field_columns_iter
3870            .chain(field_columns_iter)
3871            .collect::<Result<Vec<_>>>()?;
3872
3873        LogicalPlanBuilder::from(input)
3874            .project(project_fields)
3875            .context(DataFusionPlanningSnafu)?
3876            .build()
3877            .context(DataFusionPlanningSnafu)
3878    }
3879
3880    /// Build a filter plan that filter on value column. Notice that only one value column
3881    /// is expected.
3882    fn filter_on_field_column<F>(
3883        &self,
3884        input: LogicalPlan,
3885        mut name_to_expr: F,
3886    ) -> Result<LogicalPlan>
3887    where
3888        F: FnMut(&String) -> Result<DfExpr>,
3889    {
3890        ensure!(
3891            self.ctx.field_columns.len() == 1,
3892            UnsupportedExprSnafu {
3893                name: "filter on multi-value input"
3894            }
3895        );
3896
3897        let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3898
3899        LogicalPlanBuilder::from(input)
3900            .filter(field_column_filter)
3901            .context(DataFusionPlanningSnafu)?
3902            .build()
3903            .context(DataFusionPlanningSnafu)
3904    }
3905
3906    /// Generate an expr like `date_part("hour", <TIME_INDEX>)`. Caller should ensure the
3907    /// time index column in context is set
3908    fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3909        let input_expr = datafusion::logical_expr::col(
3910            self.ctx
3911                .time_index_column
3912                .as_ref()
3913                // table name doesn't matters here
3914                .with_context(|| TimeIndexNotFoundSnafu {
3915                    table: "<doesn't matter>",
3916                })?,
3917        );
3918        let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3919            func: datafusion_functions::datetime::date_part(),
3920            args: vec![date_part.lit(), input_expr],
3921        });
3922        Ok(fn_expr)
3923    }
3924
3925    fn strip_tsid_column(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
3926        let schema = plan.schema();
3927        if !schema
3928            .fields()
3929            .iter()
3930            .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
3931        {
3932            return Ok(plan);
3933        }
3934
3935        let project_exprs = schema
3936            .fields()
3937            .iter()
3938            .filter(|field| field.name() != DATA_SCHEMA_TSID_COLUMN_NAME)
3939            .map(|field| Ok(DfExpr::Column(Column::from_name(field.name().clone()))))
3940            .collect::<Result<Vec<_>>>()?;
3941
3942        LogicalPlanBuilder::from(plan)
3943            .project(project_exprs)
3944            .context(DataFusionPlanningSnafu)?
3945            .build()
3946            .context(DataFusionPlanningSnafu)
3947    }
3948
3949    /// Apply an alias to the query result by adding a projection with the alias name
3950    fn apply_alias(&mut self, plan: LogicalPlan, alias_name: String) -> Result<LogicalPlan> {
3951        let fields_expr = self.create_field_column_exprs()?;
3952
3953        // TODO(dennis): how to support multi-value aliasing?
3954        ensure!(
3955            fields_expr.len() == 1,
3956            UnsupportedExprSnafu {
3957                name: "alias on multi-value result"
3958            }
3959        );
3960
3961        let project_fields = fields_expr
3962            .into_iter()
3963            .map(|expr| expr.alias(&alias_name))
3964            .chain(self.create_tag_column_exprs()?)
3965            .chain(Some(self.create_time_index_column_expr()?));
3966
3967        LogicalPlanBuilder::from(plan)
3968            .project(project_fields)
3969            .context(DataFusionPlanningSnafu)?
3970            .build()
3971            .context(DataFusionPlanningSnafu)
3972    }
3973}
3974
3975#[derive(Default, Debug)]
3976struct FunctionArgs {
3977    input: Option<PromExpr>,
3978    literals: Vec<DfExpr>,
3979}
3980
3981/// Represents different types of scalar functions supported in PromQL expressions.
3982/// Each variant defines how the function should be processed and what arguments it expects.
3983#[derive(Debug, Clone)]
3984enum ScalarFunc {
3985    /// DataFusion's registered(including built-in) scalar functions (e.g., abs, sqrt, round, clamp).
3986    /// These are passed through directly to DataFusion's execution engine.
3987    /// Processing: Simple argument insertion at the specified position.
3988    DataFusionBuiltin(Arc<ScalarUdfDef>),
3989    /// User-defined functions registered in DataFusion's function registry.
3990    /// Similar to DataFusionBuiltin but for custom functions not built into DataFusion.
3991    /// Processing: Direct pass-through with argument positioning.
3992    DataFusionUdf(Arc<ScalarUdfDef>),
3993    /// PromQL-specific functions that operate on time series data with temporal context.
3994    /// These functions require both timestamp ranges and values to perform calculations.
3995    /// Processing: Automatically injects timestamp_range and value columns as first arguments.
3996    /// Examples: idelta, irate, resets, changes, deriv, *_over_time function
3997    Udf(Arc<ScalarUdfDef>),
3998    /// PromQL functions requiring extrapolation calculations with explicit range information.
3999    /// These functions need to know the time range length to perform rate calculations.
4000    /// The second field contains the range length in milliseconds.
4001    /// Processing: Injects timestamp_range, value, time_index columns and appends range_length.
4002    /// Examples: increase, rate, delta
4003    // TODO(ruihang): maybe merge with Udf later
4004    ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
4005    /// Functions that generate expressions directly without external UDF calls.
4006    /// The expression is constructed during function matching and requires no additional processing.
4007    /// Examples: time(), minute(), hour(), month(), year() and other date/time extractors
4008    GeneratedExpr,
4009}
4010
4011#[cfg(test)]
4012mod test {
4013    use std::time::{Duration, UNIX_EPOCH};
4014
4015    use catalog::RegisterTableRequest;
4016    use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
4017    use common_base::Plugins;
4018    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
4019    use common_query::prelude::greptime_timestamp;
4020    use common_query::test_util::DummyDecoder;
4021    use datatypes::prelude::ConcreteDataType;
4022    use datatypes::schema::{ColumnSchema, Schema};
4023    use promql_parser::label::Labels;
4024    use promql_parser::parser;
4025    use session::context::QueryContext;
4026    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
4027    use table::test_util::EmptyTable;
4028
4029    use super::*;
4030    use crate::options::QueryOptions;
4031    use crate::parser::QueryLanguageParser;
4032
4033    fn build_query_engine_state() -> QueryEngineState {
4034        QueryEngineState::new(
4035            new_memory_catalog_manager().unwrap(),
4036            None,
4037            None,
4038            None,
4039            None,
4040            None,
4041            false,
4042            Plugins::default(),
4043            QueryOptions::default(),
4044        )
4045    }
4046
4047    async fn build_test_table_provider(
4048        table_name_tuples: &[(String, String)],
4049        num_tag: usize,
4050        num_field: usize,
4051    ) -> DfTableSourceProvider {
4052        let catalog_list = MemoryCatalogManager::with_default_setup();
4053        for (schema_name, table_name) in table_name_tuples {
4054            let mut columns = vec![];
4055            for i in 0..num_tag {
4056                columns.push(ColumnSchema::new(
4057                    format!("tag_{i}"),
4058                    ConcreteDataType::string_datatype(),
4059                    false,
4060                ));
4061            }
4062            columns.push(
4063                ColumnSchema::new(
4064                    "timestamp".to_string(),
4065                    ConcreteDataType::timestamp_millisecond_datatype(),
4066                    false,
4067                )
4068                .with_time_index(true),
4069            );
4070            for i in 0..num_field {
4071                columns.push(ColumnSchema::new(
4072                    format!("field_{i}"),
4073                    ConcreteDataType::float64_datatype(),
4074                    true,
4075                ));
4076            }
4077            let schema = Arc::new(Schema::new(columns));
4078            let table_meta = TableMetaBuilder::empty()
4079                .schema(schema)
4080                .primary_key_indices((0..num_tag).collect())
4081                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4082                .next_column_id(1024)
4083                .build()
4084                .unwrap();
4085            let table_info = TableInfoBuilder::default()
4086                .name(table_name.clone())
4087                .meta(table_meta)
4088                .build()
4089                .unwrap();
4090            let table = EmptyTable::from_table_info(&table_info);
4091
4092            assert!(
4093                catalog_list
4094                    .register_table_sync(RegisterTableRequest {
4095                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4096                        schema: schema_name.clone(),
4097                        table_name: table_name.clone(),
4098                        table_id: 1024,
4099                        table,
4100                    })
4101                    .is_ok()
4102            );
4103        }
4104
4105        DfTableSourceProvider::new(
4106            catalog_list,
4107            false,
4108            QueryContext::arc(),
4109            DummyDecoder::arc(),
4110            false,
4111        )
4112    }
4113
4114    async fn build_test_table_provider_with_tsid(
4115        table_name_tuples: &[(String, String)],
4116        num_tag: usize,
4117        num_field: usize,
4118    ) -> DfTableSourceProvider {
4119        let catalog_list = MemoryCatalogManager::with_default_setup();
4120
4121        let physical_table_name = "phy";
4122        let physical_table_id = 999u32;
4123
4124        // Register a metric engine physical table with internal columns.
4125        {
4126            let mut columns = vec![
4127                ColumnSchema::new(
4128                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
4129                    ConcreteDataType::uint32_datatype(),
4130                    false,
4131                ),
4132                ColumnSchema::new(
4133                    DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
4134                    ConcreteDataType::uint64_datatype(),
4135                    false,
4136                ),
4137            ];
4138            for i in 0..num_tag {
4139                columns.push(ColumnSchema::new(
4140                    format!("tag_{i}"),
4141                    ConcreteDataType::string_datatype(),
4142                    false,
4143                ));
4144            }
4145            columns.push(
4146                ColumnSchema::new(
4147                    "timestamp".to_string(),
4148                    ConcreteDataType::timestamp_millisecond_datatype(),
4149                    false,
4150                )
4151                .with_time_index(true),
4152            );
4153            for i in 0..num_field {
4154                columns.push(ColumnSchema::new(
4155                    format!("field_{i}"),
4156                    ConcreteDataType::float64_datatype(),
4157                    true,
4158                ));
4159            }
4160
4161            let schema = Arc::new(Schema::new(columns));
4162            let primary_key_indices = (0..(2 + num_tag)).collect::<Vec<_>>();
4163            let table_meta = TableMetaBuilder::empty()
4164                .schema(schema)
4165                .primary_key_indices(primary_key_indices)
4166                .value_indices((2 + num_tag..2 + num_tag + 1 + num_field).collect())
4167                .engine(METRIC_ENGINE_NAME.to_string())
4168                .next_column_id(1024)
4169                .build()
4170                .unwrap();
4171            let table_info = TableInfoBuilder::default()
4172                .table_id(physical_table_id)
4173                .name(physical_table_name)
4174                .meta(table_meta)
4175                .build()
4176                .unwrap();
4177            let table = EmptyTable::from_table_info(&table_info);
4178
4179            assert!(
4180                catalog_list
4181                    .register_table_sync(RegisterTableRequest {
4182                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4183                        schema: DEFAULT_SCHEMA_NAME.to_string(),
4184                        table_name: physical_table_name.to_string(),
4185                        table_id: physical_table_id,
4186                        table,
4187                    })
4188                    .is_ok()
4189            );
4190        }
4191
4192        // Register metric engine logical tables without `__tsid`, referencing the physical table.
4193        for (idx, (schema_name, table_name)) in table_name_tuples.iter().enumerate() {
4194            let mut columns = vec![];
4195            for i in 0..num_tag {
4196                columns.push(ColumnSchema::new(
4197                    format!("tag_{i}"),
4198                    ConcreteDataType::string_datatype(),
4199                    false,
4200                ));
4201            }
4202            columns.push(
4203                ColumnSchema::new(
4204                    "timestamp".to_string(),
4205                    ConcreteDataType::timestamp_millisecond_datatype(),
4206                    false,
4207                )
4208                .with_time_index(true),
4209            );
4210            for i in 0..num_field {
4211                columns.push(ColumnSchema::new(
4212                    format!("field_{i}"),
4213                    ConcreteDataType::float64_datatype(),
4214                    true,
4215                ));
4216            }
4217
4218            let schema = Arc::new(Schema::new(columns));
4219            let mut options = table::requests::TableOptions::default();
4220            options.extra_options.insert(
4221                LOGICAL_TABLE_METADATA_KEY.to_string(),
4222                physical_table_name.to_string(),
4223            );
4224            let table_id = 1024u32 + idx as u32;
4225            let table_meta = TableMetaBuilder::empty()
4226                .schema(schema)
4227                .primary_key_indices((0..num_tag).collect())
4228                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4229                .engine(METRIC_ENGINE_NAME.to_string())
4230                .options(options)
4231                .next_column_id(1024)
4232                .build()
4233                .unwrap();
4234            let table_info = TableInfoBuilder::default()
4235                .table_id(table_id)
4236                .name(table_name.clone())
4237                .meta(table_meta)
4238                .build()
4239                .unwrap();
4240            let table = EmptyTable::from_table_info(&table_info);
4241
4242            assert!(
4243                catalog_list
4244                    .register_table_sync(RegisterTableRequest {
4245                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4246                        schema: schema_name.clone(),
4247                        table_name: table_name.clone(),
4248                        table_id,
4249                        table,
4250                    })
4251                    .is_ok()
4252            );
4253        }
4254
4255        DfTableSourceProvider::new(
4256            catalog_list,
4257            false,
4258            QueryContext::arc(),
4259            DummyDecoder::arc(),
4260            false,
4261        )
4262    }
4263
4264    async fn build_test_table_provider_with_fields(
4265        table_name_tuples: &[(String, String)],
4266        tags: &[&str],
4267    ) -> DfTableSourceProvider {
4268        let catalog_list = MemoryCatalogManager::with_default_setup();
4269        for (schema_name, table_name) in table_name_tuples {
4270            let mut columns = vec![];
4271            let num_tag = tags.len();
4272            for tag in tags {
4273                columns.push(ColumnSchema::new(
4274                    tag.to_string(),
4275                    ConcreteDataType::string_datatype(),
4276                    false,
4277                ));
4278            }
4279            columns.push(
4280                ColumnSchema::new(
4281                    greptime_timestamp().to_string(),
4282                    ConcreteDataType::timestamp_millisecond_datatype(),
4283                    false,
4284                )
4285                .with_time_index(true),
4286            );
4287            columns.push(ColumnSchema::new(
4288                greptime_value().to_string(),
4289                ConcreteDataType::float64_datatype(),
4290                true,
4291            ));
4292            let schema = Arc::new(Schema::new(columns));
4293            let table_meta = TableMetaBuilder::empty()
4294                .schema(schema)
4295                .primary_key_indices((0..num_tag).collect())
4296                .next_column_id(1024)
4297                .build()
4298                .unwrap();
4299            let table_info = TableInfoBuilder::default()
4300                .name(table_name.clone())
4301                .meta(table_meta)
4302                .build()
4303                .unwrap();
4304            let table = EmptyTable::from_table_info(&table_info);
4305
4306            assert!(
4307                catalog_list
4308                    .register_table_sync(RegisterTableRequest {
4309                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4310                        schema: schema_name.clone(),
4311                        table_name: table_name.clone(),
4312                        table_id: 1024,
4313                        table,
4314                    })
4315                    .is_ok()
4316            );
4317        }
4318
4319        DfTableSourceProvider::new(
4320            catalog_list,
4321            false,
4322            QueryContext::arc(),
4323            DummyDecoder::arc(),
4324            false,
4325        )
4326    }
4327
4328    // {
4329    //     input: `abs(some_metric{foo!="bar"})`,
4330    //     expected: &Call{
4331    //         Func: MustGetFunction("abs"),
4332    //         Args: Expressions{
4333    //             &VectorSelector{
4334    //                 Name: "some_metric",
4335    //                 LabelMatchers: []*labels.Matcher{
4336    //                     MustLabelMatcher(labels.MatchNotEqual, "foo", "bar"),
4337    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
4338    //                 },
4339    //             },
4340    //         },
4341    //     },
4342    // },
4343    async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
4344        let prom_expr =
4345            parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
4346        let eval_stmt = EvalStmt {
4347            expr: prom_expr,
4348            start: UNIX_EPOCH,
4349            end: UNIX_EPOCH
4350                .checked_add(Duration::from_secs(100_000))
4351                .unwrap(),
4352            interval: Duration::from_secs(5),
4353            lookback_delta: Duration::from_secs(1),
4354        };
4355
4356        let table_provider = build_test_table_provider(
4357            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4358            1,
4359            1,
4360        )
4361        .await;
4362        let plan =
4363            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4364                .await
4365                .unwrap();
4366
4367        let expected = String::from(
4368            "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4369            \n  Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4370            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4371            \n      PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4372            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4373	            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4374            \n            TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4375        ).replace("TEMPLATE", plan_name);
4376
4377        assert_eq!(plan.display_indent_schema().to_string(), expected);
4378    }
4379
4380    #[tokio::test]
4381    async fn single_abs() {
4382        do_single_instant_function_call("abs", "abs").await;
4383    }
4384
4385    #[tokio::test]
4386    #[should_panic]
4387    async fn single_absent() {
4388        do_single_instant_function_call("absent", "").await;
4389    }
4390
4391    #[tokio::test]
4392    async fn single_ceil() {
4393        do_single_instant_function_call("ceil", "ceil").await;
4394    }
4395
4396    #[tokio::test]
4397    async fn single_exp() {
4398        do_single_instant_function_call("exp", "exp").await;
4399    }
4400
4401    #[tokio::test]
4402    async fn single_ln() {
4403        do_single_instant_function_call("ln", "ln").await;
4404    }
4405
4406    #[tokio::test]
4407    async fn single_log2() {
4408        do_single_instant_function_call("log2", "log2").await;
4409    }
4410
4411    #[tokio::test]
4412    async fn single_log10() {
4413        do_single_instant_function_call("log10", "log10").await;
4414    }
4415
4416    #[tokio::test]
4417    #[should_panic]
4418    async fn single_scalar() {
4419        do_single_instant_function_call("scalar", "").await;
4420    }
4421
4422    #[tokio::test]
4423    #[should_panic]
4424    async fn single_sgn() {
4425        do_single_instant_function_call("sgn", "").await;
4426    }
4427
4428    #[tokio::test]
4429    #[should_panic]
4430    async fn single_sort() {
4431        do_single_instant_function_call("sort", "").await;
4432    }
4433
4434    #[tokio::test]
4435    #[should_panic]
4436    async fn single_sort_desc() {
4437        do_single_instant_function_call("sort_desc", "").await;
4438    }
4439
4440    #[tokio::test]
4441    async fn single_sqrt() {
4442        do_single_instant_function_call("sqrt", "sqrt").await;
4443    }
4444
4445    #[tokio::test]
4446    #[should_panic]
4447    async fn single_timestamp() {
4448        do_single_instant_function_call("timestamp", "").await;
4449    }
4450
4451    #[tokio::test]
4452    async fn single_acos() {
4453        do_single_instant_function_call("acos", "acos").await;
4454    }
4455
4456    #[tokio::test]
4457    #[should_panic]
4458    async fn single_acosh() {
4459        do_single_instant_function_call("acosh", "").await;
4460    }
4461
4462    #[tokio::test]
4463    async fn single_asin() {
4464        do_single_instant_function_call("asin", "asin").await;
4465    }
4466
4467    #[tokio::test]
4468    #[should_panic]
4469    async fn single_asinh() {
4470        do_single_instant_function_call("asinh", "").await;
4471    }
4472
4473    #[tokio::test]
4474    async fn single_atan() {
4475        do_single_instant_function_call("atan", "atan").await;
4476    }
4477
4478    #[tokio::test]
4479    #[should_panic]
4480    async fn single_atanh() {
4481        do_single_instant_function_call("atanh", "").await;
4482    }
4483
4484    #[tokio::test]
4485    async fn single_cos() {
4486        do_single_instant_function_call("cos", "cos").await;
4487    }
4488
4489    #[tokio::test]
4490    #[should_panic]
4491    async fn single_cosh() {
4492        do_single_instant_function_call("cosh", "").await;
4493    }
4494
4495    #[tokio::test]
4496    async fn single_sin() {
4497        do_single_instant_function_call("sin", "sin").await;
4498    }
4499
4500    #[tokio::test]
4501    #[should_panic]
4502    async fn single_sinh() {
4503        do_single_instant_function_call("sinh", "").await;
4504    }
4505
4506    #[tokio::test]
4507    async fn single_tan() {
4508        do_single_instant_function_call("tan", "tan").await;
4509    }
4510
4511    #[tokio::test]
4512    #[should_panic]
4513    async fn single_tanh() {
4514        do_single_instant_function_call("tanh", "").await;
4515    }
4516
4517    #[tokio::test]
4518    #[should_panic]
4519    async fn single_deg() {
4520        do_single_instant_function_call("deg", "").await;
4521    }
4522
4523    #[tokio::test]
4524    #[should_panic]
4525    async fn single_rad() {
4526        do_single_instant_function_call("rad", "").await;
4527    }
4528
4529    // {
4530    //     input: "avg by (foo)(some_metric)",
4531    //     expected: &AggregateExpr{
4532    //         Op: AVG,
4533    //         Expr: &VectorSelector{
4534    //             Name: "some_metric",
4535    //             LabelMatchers: []*labels.Matcher{
4536    //                 MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
4537    //             },
4538    //             PosRange: PositionRange{
4539    //                 Start: 13,
4540    //                 End:   24,
4541    //             },
4542    //         },
4543    //         Grouping: []string{"foo"},
4544    //         PosRange: PositionRange{
4545    //             Start: 0,
4546    //             End:   25,
4547    //         },
4548    //     },
4549    // },
4550    async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
4551        let prom_expr = parser::parse(&format!(
4552            "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
4553        ))
4554        .unwrap();
4555        let mut eval_stmt = EvalStmt {
4556            expr: prom_expr,
4557            start: UNIX_EPOCH,
4558            end: UNIX_EPOCH
4559                .checked_add(Duration::from_secs(100_000))
4560                .unwrap(),
4561            interval: Duration::from_secs(5),
4562            lookback_delta: Duration::from_secs(1),
4563        };
4564
4565        // test group by
4566        let table_provider = build_test_table_provider(
4567            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4568            2,
4569            2,
4570        )
4571        .await;
4572        let plan =
4573            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4574                .await
4575                .unwrap();
4576        let expected_no_without = String::from(
4577            "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4578            \n  Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4579            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4580            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4581            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4582	            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4583            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
4584        ).replace("TEMPLATE", plan_name);
4585        assert_eq!(
4586            plan.display_indent_schema().to_string(),
4587            expected_no_without
4588        );
4589
4590        // test group without
4591        if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
4592            *modifier = Some(LabelModifier::Exclude(Labels {
4593                labels: vec![String::from("tag_1")].into_iter().collect(),
4594            }));
4595        }
4596        let table_provider = build_test_table_provider(
4597            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4598            2,
4599            2,
4600        )
4601        .await;
4602        let plan =
4603            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4604                .await
4605                .unwrap();
4606        let expected_without = String::from(
4607            "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4608            \n  Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4609            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4610            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4611            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4612	            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4613            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
4614        ).replace("TEMPLATE", plan_name);
4615        assert_eq!(plan.display_indent_schema().to_string(), expected_without);
4616    }
4617
4618    #[tokio::test]
4619    async fn aggregate_sum() {
4620        do_aggregate_expr_plan("sum", "sum").await;
4621    }
4622
4623    #[tokio::test]
4624    async fn tsid_is_used_for_series_divide_when_available() {
4625        let prom_expr = parser::parse("some_metric").unwrap();
4626        let eval_stmt = EvalStmt {
4627            expr: prom_expr,
4628            start: UNIX_EPOCH,
4629            end: UNIX_EPOCH
4630                .checked_add(Duration::from_secs(100_000))
4631                .unwrap(),
4632            interval: Duration::from_secs(5),
4633            lookback_delta: Duration::from_secs(1),
4634        };
4635
4636        let table_provider = build_test_table_provider_with_tsid(
4637            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4638            1,
4639            1,
4640        )
4641        .await;
4642        let plan =
4643            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4644                .await
4645                .unwrap();
4646
4647        let plan_str = plan.display_indent_schema().to_string();
4648        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4649        assert!(plan_str.contains("__tsid ASC NULLS FIRST"));
4650        assert!(
4651            !plan
4652                .schema()
4653                .fields()
4654                .iter()
4655                .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4656        );
4657    }
4658
4659    #[tokio::test]
4660    async fn physical_table_name_is_not_leaked_in_plan() {
4661        let prom_expr = parser::parse("some_metric").unwrap();
4662        let eval_stmt = EvalStmt {
4663            expr: prom_expr,
4664            start: UNIX_EPOCH,
4665            end: UNIX_EPOCH
4666                .checked_add(Duration::from_secs(100_000))
4667                .unwrap(),
4668            interval: Duration::from_secs(5),
4669            lookback_delta: Duration::from_secs(1),
4670        };
4671
4672        let table_provider = build_test_table_provider_with_tsid(
4673            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4674            1,
4675            1,
4676        )
4677        .await;
4678        let plan =
4679            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4680                .await
4681                .unwrap();
4682
4683        let plan_str = plan.display_indent_schema().to_string();
4684        assert!(plan_str.contains("TableScan: phy"), "{plan}");
4685        assert!(plan_str.contains("SubqueryAlias: some_metric"));
4686        assert!(plan_str.contains("Filter: phy.__table_id = UInt32(1024)"));
4687        assert!(!plan_str.contains("TableScan: some_metric"));
4688    }
4689
4690    #[tokio::test]
4691    async fn sum_without_does_not_group_by_tsid() {
4692        let prom_expr = parser::parse("sum without (tag_0) (some_metric)").unwrap();
4693        let eval_stmt = EvalStmt {
4694            expr: prom_expr,
4695            start: UNIX_EPOCH,
4696            end: UNIX_EPOCH
4697                .checked_add(Duration::from_secs(100_000))
4698                .unwrap(),
4699            interval: Duration::from_secs(5),
4700            lookback_delta: Duration::from_secs(1),
4701        };
4702
4703        let table_provider = build_test_table_provider_with_tsid(
4704            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4705            1,
4706            1,
4707        )
4708        .await;
4709        let plan =
4710            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4711                .await
4712                .unwrap();
4713
4714        let plan_str = plan.display_indent_schema().to_string();
4715        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4716
4717        let aggr_line = plan_str
4718            .lines()
4719            .find(|line| line.contains("Aggregate: groupBy="))
4720            .unwrap();
4721        assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4722    }
4723
4724    #[tokio::test]
4725    async fn topk_without_does_not_partition_by_tsid() {
4726        let prom_expr = parser::parse("topk without (tag_0) (1, some_metric)").unwrap();
4727        let eval_stmt = EvalStmt {
4728            expr: prom_expr,
4729            start: UNIX_EPOCH,
4730            end: UNIX_EPOCH
4731                .checked_add(Duration::from_secs(100_000))
4732                .unwrap(),
4733            interval: Duration::from_secs(5),
4734            lookback_delta: Duration::from_secs(1),
4735        };
4736
4737        let table_provider = build_test_table_provider_with_tsid(
4738            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4739            1,
4740            1,
4741        )
4742        .await;
4743        let plan =
4744            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4745                .await
4746                .unwrap();
4747
4748        let plan_str = plan.display_indent_schema().to_string();
4749        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4750
4751        let window_line = plan_str
4752            .lines()
4753            .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
4754            .unwrap();
4755        let partition_by = window_line
4756            .split("PARTITION BY [")
4757            .nth(1)
4758            .and_then(|s| s.split("] ORDER BY").next())
4759            .unwrap();
4760        assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4761    }
4762
4763    #[tokio::test]
4764    async fn sum_by_does_not_group_by_tsid() {
4765        let prom_expr = parser::parse("sum by (__tsid) (some_metric)").unwrap();
4766        let eval_stmt = EvalStmt {
4767            expr: prom_expr,
4768            start: UNIX_EPOCH,
4769            end: UNIX_EPOCH
4770                .checked_add(Duration::from_secs(100_000))
4771                .unwrap(),
4772            interval: Duration::from_secs(5),
4773            lookback_delta: Duration::from_secs(1),
4774        };
4775
4776        let table_provider = build_test_table_provider_with_tsid(
4777            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4778            1,
4779            1,
4780        )
4781        .await;
4782        let plan =
4783            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4784                .await
4785                .unwrap();
4786
4787        let plan_str = plan.display_indent_schema().to_string();
4788        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4789
4790        let aggr_line = plan_str
4791            .lines()
4792            .find(|line| line.contains("Aggregate: groupBy="))
4793            .unwrap();
4794        assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4795    }
4796
4797    #[tokio::test]
4798    async fn topk_by_does_not_partition_by_tsid() {
4799        let prom_expr = parser::parse("topk by (__tsid) (1, some_metric)").unwrap();
4800        let eval_stmt = EvalStmt {
4801            expr: prom_expr,
4802            start: UNIX_EPOCH,
4803            end: UNIX_EPOCH
4804                .checked_add(Duration::from_secs(100_000))
4805                .unwrap(),
4806            interval: Duration::from_secs(5),
4807            lookback_delta: Duration::from_secs(1),
4808        };
4809
4810        let table_provider = build_test_table_provider_with_tsid(
4811            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4812            1,
4813            1,
4814        )
4815        .await;
4816        let plan =
4817            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4818                .await
4819                .unwrap();
4820
4821        let plan_str = plan.display_indent_schema().to_string();
4822        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4823
4824        let window_line = plan_str
4825            .lines()
4826            .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
4827            .unwrap();
4828        let partition_by = window_line
4829            .split("PARTITION BY [")
4830            .nth(1)
4831            .and_then(|s| s.split("] ORDER BY").next())
4832            .unwrap();
4833        assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4834    }
4835
4836    #[tokio::test]
4837    async fn selector_matcher_on_tsid_does_not_use_internal_column() {
4838        let prom_expr = parser::parse(r#"some_metric{__tsid="123"}"#).unwrap();
4839        let eval_stmt = EvalStmt {
4840            expr: prom_expr,
4841            start: UNIX_EPOCH,
4842            end: UNIX_EPOCH
4843                .checked_add(Duration::from_secs(100_000))
4844                .unwrap(),
4845            interval: Duration::from_secs(5),
4846            lookback_delta: Duration::from_secs(1),
4847        };
4848
4849        let table_provider = build_test_table_provider_with_tsid(
4850            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4851            1,
4852            1,
4853        )
4854        .await;
4855        let plan =
4856            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4857                .await
4858                .unwrap();
4859
4860        fn collect_filter_cols(plan: &LogicalPlan, out: &mut HashSet<Column>) {
4861            if let LogicalPlan::Filter(filter) = plan {
4862                datafusion_expr::utils::expr_to_columns(&filter.predicate, out).unwrap();
4863            }
4864            for input in plan.inputs() {
4865                collect_filter_cols(input, out);
4866            }
4867        }
4868
4869        let mut filter_cols = HashSet::new();
4870        collect_filter_cols(&plan, &mut filter_cols);
4871        assert!(
4872            !filter_cols
4873                .iter()
4874                .any(|c| c.name == DATA_SCHEMA_TSID_COLUMN_NAME)
4875        );
4876    }
4877
4878    #[tokio::test]
4879    async fn tsid_is_not_used_when_physical_table_is_missing() {
4880        let prom_expr = parser::parse("some_metric").unwrap();
4881        let eval_stmt = EvalStmt {
4882            expr: prom_expr,
4883            start: UNIX_EPOCH,
4884            end: UNIX_EPOCH
4885                .checked_add(Duration::from_secs(100_000))
4886                .unwrap(),
4887            interval: Duration::from_secs(5),
4888            lookback_delta: Duration::from_secs(1),
4889        };
4890
4891        let catalog_list = MemoryCatalogManager::with_default_setup();
4892
4893        // Register a metric engine logical table referencing a missing physical table.
4894        let mut columns = vec![ColumnSchema::new(
4895            "tag_0".to_string(),
4896            ConcreteDataType::string_datatype(),
4897            false,
4898        )];
4899        columns.push(
4900            ColumnSchema::new(
4901                "timestamp".to_string(),
4902                ConcreteDataType::timestamp_millisecond_datatype(),
4903                false,
4904            )
4905            .with_time_index(true),
4906        );
4907        columns.push(ColumnSchema::new(
4908            "field_0".to_string(),
4909            ConcreteDataType::float64_datatype(),
4910            true,
4911        ));
4912        let schema = Arc::new(Schema::new(columns));
4913        let mut options = table::requests::TableOptions::default();
4914        options
4915            .extra_options
4916            .insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "phy".to_string());
4917        let table_meta = TableMetaBuilder::empty()
4918            .schema(schema)
4919            .primary_key_indices(vec![0])
4920            .value_indices(vec![2])
4921            .engine(METRIC_ENGINE_NAME.to_string())
4922            .options(options)
4923            .next_column_id(1024)
4924            .build()
4925            .unwrap();
4926        let table_info = TableInfoBuilder::default()
4927            .table_id(1024)
4928            .name("some_metric")
4929            .meta(table_meta)
4930            .build()
4931            .unwrap();
4932        let table = EmptyTable::from_table_info(&table_info);
4933        catalog_list
4934            .register_table_sync(RegisterTableRequest {
4935                catalog: DEFAULT_CATALOG_NAME.to_string(),
4936                schema: DEFAULT_SCHEMA_NAME.to_string(),
4937                table_name: "some_metric".to_string(),
4938                table_id: 1024,
4939                table,
4940            })
4941            .unwrap();
4942
4943        let table_provider = DfTableSourceProvider::new(
4944            catalog_list,
4945            false,
4946            QueryContext::arc(),
4947            DummyDecoder::arc(),
4948            false,
4949        );
4950
4951        let plan =
4952            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4953                .await
4954                .unwrap();
4955
4956        let plan_str = plan.display_indent_schema().to_string();
4957        assert!(plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
4958        assert!(!plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4959    }
4960
4961    #[tokio::test]
4962    async fn tsid_is_carried_only_when_aggregate_preserves_label_set() {
4963        let prom_expr = parser::parse("sum by (tag_0) (some_metric)").unwrap();
4964        let eval_stmt = EvalStmt {
4965            expr: prom_expr,
4966            start: UNIX_EPOCH,
4967            end: UNIX_EPOCH
4968                .checked_add(Duration::from_secs(100_000))
4969                .unwrap(),
4970            interval: Duration::from_secs(5),
4971            lookback_delta: Duration::from_secs(1),
4972        };
4973
4974        let table_provider = build_test_table_provider_with_tsid(
4975            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4976            1,
4977            1,
4978        )
4979        .await;
4980        let plan =
4981            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4982                .await
4983                .unwrap();
4984
4985        let plan_str = plan.display_indent_schema().to_string();
4986        assert!(plan_str.contains("first_value") && plan_str.contains("__tsid"));
4987        assert!(
4988            !plan
4989                .schema()
4990                .fields()
4991                .iter()
4992                .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4993        );
4994
4995        // Merging aggregate: label set is reduced, tsid should not be carried.
4996        let prom_expr = parser::parse("sum(some_metric)").unwrap();
4997        let eval_stmt = EvalStmt {
4998            expr: prom_expr,
4999            start: UNIX_EPOCH,
5000            end: UNIX_EPOCH
5001                .checked_add(Duration::from_secs(100_000))
5002                .unwrap(),
5003            interval: Duration::from_secs(5),
5004            lookback_delta: Duration::from_secs(1),
5005        };
5006        let table_provider = build_test_table_provider_with_tsid(
5007            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5008            1,
5009            1,
5010        )
5011        .await;
5012        let plan =
5013            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5014                .await
5015                .unwrap();
5016        let plan_str = plan.display_indent_schema().to_string();
5017        assert!(!plan_str.contains("first_value"));
5018    }
5019
5020    #[tokio::test]
5021    async fn or_operator_with_unknown_metric_does_not_require_tsid() {
5022        let prom_expr = parser::parse("unknown_metric or some_metric").unwrap();
5023        let eval_stmt = EvalStmt {
5024            expr: prom_expr,
5025            start: UNIX_EPOCH,
5026            end: UNIX_EPOCH
5027                .checked_add(Duration::from_secs(100_000))
5028                .unwrap(),
5029            interval: Duration::from_secs(5),
5030            lookback_delta: Duration::from_secs(1),
5031        };
5032
5033        let table_provider = build_test_table_provider_with_tsid(
5034            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5035            1,
5036            1,
5037        )
5038        .await;
5039
5040        let plan =
5041            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5042                .await
5043                .unwrap();
5044
5045        assert!(
5046            !plan
5047                .schema()
5048                .fields()
5049                .iter()
5050                .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
5051        );
5052    }
5053
5054    #[tokio::test]
5055    async fn aggregate_avg() {
5056        do_aggregate_expr_plan("avg", "avg").await;
5057    }
5058
5059    #[tokio::test]
5060    #[should_panic] // output type doesn't match
5061    async fn aggregate_count() {
5062        do_aggregate_expr_plan("count", "count").await;
5063    }
5064
5065    #[tokio::test]
5066    async fn aggregate_min() {
5067        do_aggregate_expr_plan("min", "min").await;
5068    }
5069
5070    #[tokio::test]
5071    async fn aggregate_max() {
5072        do_aggregate_expr_plan("max", "max").await;
5073    }
5074
5075    #[tokio::test]
5076    async fn aggregate_group() {
5077        // Regression test for `group()` aggregator.
5078        // PromQL: sum(group by (cluster)(kubernetes_build_info{service="kubernetes",job="apiserver"}))
5079        // should be plannable, and `group()` should produce constant 1 for each group.
5080        let prom_expr = parser::parse(
5081            "sum(group by (cluster)(kubernetes_build_info{service=\"kubernetes\",job=\"apiserver\"}))",
5082        )
5083        .unwrap();
5084        let eval_stmt = EvalStmt {
5085            expr: prom_expr,
5086            start: UNIX_EPOCH,
5087            end: UNIX_EPOCH
5088                .checked_add(Duration::from_secs(100_000))
5089                .unwrap(),
5090            interval: Duration::from_secs(5),
5091            lookback_delta: Duration::from_secs(1),
5092        };
5093
5094        let table_provider = build_test_table_provider_with_fields(
5095            &[(
5096                DEFAULT_SCHEMA_NAME.to_string(),
5097                "kubernetes_build_info".to_string(),
5098            )],
5099            &["cluster", "service", "job"],
5100        )
5101        .await;
5102        let plan =
5103            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5104                .await
5105                .unwrap();
5106
5107        let plan_str = plan.display_indent_schema().to_string();
5108        assert!(plan_str.contains("max(Float64(1"));
5109    }
5110
5111    #[tokio::test]
5112    async fn aggregate_stddev() {
5113        do_aggregate_expr_plan("stddev", "stddev_pop").await;
5114    }
5115
5116    #[tokio::test]
5117    async fn aggregate_stdvar() {
5118        do_aggregate_expr_plan("stdvar", "var_pop").await;
5119    }
5120
5121    // TODO(ruihang): add range fn tests once exprs are ready.
5122
5123    // {
5124    //     input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}",
5125    //     expected: &BinaryExpr{
5126    //         Op: ADD,
5127    //         LHS: &VectorSelector{
5128    //             Name: "a",
5129    //             LabelMatchers: []*labels.Matcher{
5130    //                     MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"),
5131    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
5132    //             },
5133    //         },
5134    //         RHS: &VectorSelector{
5135    //             Name: "sum",
5136    //             LabelMatchers: []*labels.Matcher{
5137    //                     MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"),
5138    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
5139    //             },
5140    //         },
5141    //         VectorMatching: &VectorMatching{},
5142    //     },
5143    // },
5144    #[tokio::test]
5145    async fn binary_op_column_column() {
5146        let prom_expr =
5147            parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
5148        let eval_stmt = EvalStmt {
5149            expr: prom_expr,
5150            start: UNIX_EPOCH,
5151            end: UNIX_EPOCH
5152                .checked_add(Duration::from_secs(100_000))
5153                .unwrap(),
5154            interval: Duration::from_secs(5),
5155            lookback_delta: Duration::from_secs(1),
5156        };
5157
5158        let table_provider = build_test_table_provider(
5159            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5160            1,
5161            1,
5162        )
5163        .await;
5164        let plan =
5165            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5166                .await
5167                .unwrap();
5168
5169        let expected = String::from(
5170            "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
5171            \n  Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5172            \n    SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5173            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5174            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5175            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5176            \n            Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5177            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5178            \n    SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5179            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5180            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5181            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5182            \n            Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5183            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5184        );
5185
5186        assert_eq!(plan.display_indent_schema().to_string(), expected);
5187    }
5188
5189    async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
5190        let prom_expr = parser::parse(query).unwrap();
5191        let eval_stmt = EvalStmt {
5192            expr: prom_expr,
5193            start: UNIX_EPOCH,
5194            end: UNIX_EPOCH
5195                .checked_add(Duration::from_secs(100_000))
5196                .unwrap(),
5197            interval: Duration::from_secs(5),
5198            lookback_delta: Duration::from_secs(1),
5199        };
5200
5201        let table_provider = build_test_table_provider(
5202            &[
5203                (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5204                (
5205                    "greptime_private".to_string(),
5206                    "some_alt_metric".to_string(),
5207                ),
5208            ],
5209            1,
5210            1,
5211        )
5212        .await;
5213        let plan =
5214            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5215                .await
5216                .unwrap();
5217
5218        assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
5219    }
5220
5221    #[tokio::test]
5222    async fn binary_op_literal_column() {
5223        let query = r#"1 + some_metric{tag_0="bar"}"#;
5224        let expected = String::from(
5225            "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
5226            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5227            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5228            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5229            \n        Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5230            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5231        );
5232
5233        indie_query_plan_compare(query, expected).await;
5234    }
5235
5236    #[tokio::test]
5237    async fn binary_op_literal_literal() {
5238        let query = r#"1 + 1"#;
5239        let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
5240  TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
5241        indie_query_plan_compare(query, expected).await;
5242    }
5243
5244    #[tokio::test]
5245    async fn simple_bool_grammar() {
5246        let query = "some_metric != bool 1.2345";
5247        let expected = String::from(
5248            "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
5249            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5250            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5251            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5252            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5253            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5254        );
5255
5256        indie_query_plan_compare(query, expected).await;
5257    }
5258
5259    #[tokio::test]
5260    async fn bool_with_additional_arithmetic() {
5261        let query = "some_metric + (1 == bool 2)";
5262        let expected = String::from(
5263            "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
5264            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5265            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5266            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5267            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5268            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5269        );
5270
5271        indie_query_plan_compare(query, expected).await;
5272    }
5273
5274    #[tokio::test]
5275    async fn simple_unary() {
5276        let query = "-some_metric";
5277        let expected = String::from(
5278            "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
5279            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5280            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5281            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5282            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5283            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5284        );
5285
5286        indie_query_plan_compare(query, expected).await;
5287    }
5288
5289    #[tokio::test]
5290    async fn increase_aggr() {
5291        let query = "increase(some_metric[5m])";
5292        let expected = String::from(
5293            "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5294            \n  Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5295            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
5296            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5297            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5298            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5299            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5300            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5301        );
5302
5303        indie_query_plan_compare(query, expected).await;
5304    }
5305
5306    #[tokio::test]
5307    async fn less_filter_on_value() {
5308        let query = "some_metric < 1.2345";
5309        let expected = String::from(
5310            "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5311            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5312            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5313            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5314            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5315            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5316        );
5317
5318        indie_query_plan_compare(query, expected).await;
5319    }
5320
5321    #[tokio::test]
5322    async fn count_over_time() {
5323        let query = "count_over_time(some_metric[5m])";
5324        let expected = String::from(
5325            "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5326            \n  Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5327            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
5328            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5329            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5330            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5331            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5332            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5333        );
5334
5335        indie_query_plan_compare(query, expected).await;
5336    }
5337
5338    #[tokio::test]
5339    async fn test_hash_join() {
5340        let mut eval_stmt = EvalStmt {
5341            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5342            start: UNIX_EPOCH,
5343            end: UNIX_EPOCH
5344                .checked_add(Duration::from_secs(100_000))
5345                .unwrap(),
5346            interval: Duration::from_secs(5),
5347            lookback_delta: Duration::from_secs(1),
5348        };
5349
5350        let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
5351
5352        let prom_expr = parser::parse(case).unwrap();
5353        eval_stmt.expr = prom_expr;
5354        let table_provider = build_test_table_provider_with_fields(
5355            &[
5356                (
5357                    DEFAULT_SCHEMA_NAME.to_string(),
5358                    "http_server_requests_seconds_sum".to_string(),
5359                ),
5360                (
5361                    DEFAULT_SCHEMA_NAME.to_string(),
5362                    "http_server_requests_seconds_count".to_string(),
5363                ),
5364            ],
5365            &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
5366        )
5367        .await;
5368        // Should be ok
5369        let plan =
5370            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5371                .await
5372                .unwrap();
5373        let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
5374            \n  Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
5375            \n    SubqueryAlias: http_server_requests_seconds_sum\
5376            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5377            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5378            \n          Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
5379            \n            Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5380            \n              TableScan: http_server_requests_seconds_sum\
5381            \n    SubqueryAlias: http_server_requests_seconds_count\
5382            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5383            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5384            \n          Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
5385            \n            Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5386            \n              TableScan: http_server_requests_seconds_count";
5387        assert_eq!(plan.to_string(), expected);
5388    }
5389
5390    #[tokio::test]
5391    async fn test_nested_histogram_quantile() {
5392        let mut eval_stmt = EvalStmt {
5393            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5394            start: UNIX_EPOCH,
5395            end: UNIX_EPOCH
5396                .checked_add(Duration::from_secs(100_000))
5397                .unwrap(),
5398            interval: Duration::from_secs(5),
5399            lookback_delta: Duration::from_secs(1),
5400        };
5401
5402        let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
5403
5404        let prom_expr = parser::parse(case).unwrap();
5405        eval_stmt.expr = prom_expr;
5406        let table_provider = build_test_table_provider_with_fields(
5407            &[(
5408                DEFAULT_SCHEMA_NAME.to_string(),
5409                "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
5410            )],
5411            &["pod", "le", "path", "code", "container"],
5412        )
5413        .await;
5414        // Should be ok
5415        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5416            .await
5417            .unwrap();
5418    }
5419
5420    #[tokio::test]
5421    async fn test_parse_and_operator() {
5422        let mut eval_stmt = EvalStmt {
5423            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5424            start: UNIX_EPOCH,
5425            end: UNIX_EPOCH
5426                .checked_add(Duration::from_secs(100_000))
5427                .unwrap(),
5428            interval: Duration::from_secs(5),
5429            lookback_delta: Duration::from_secs(1),
5430        };
5431
5432        let cases = [
5433            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5434            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5435        ];
5436
5437        for case in cases {
5438            let prom_expr = parser::parse(case).unwrap();
5439            eval_stmt.expr = prom_expr;
5440            let table_provider = build_test_table_provider_with_fields(
5441                &[
5442                    (
5443                        DEFAULT_SCHEMA_NAME.to_string(),
5444                        "kubelet_volume_stats_used_bytes".to_string(),
5445                    ),
5446                    (
5447                        DEFAULT_SCHEMA_NAME.to_string(),
5448                        "kubelet_volume_stats_capacity_bytes".to_string(),
5449                    ),
5450                ],
5451                &["namespace", "persistentvolumeclaim"],
5452            )
5453            .await;
5454            // Should be ok
5455            let _ =
5456                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5457                    .await
5458                    .unwrap();
5459        }
5460    }
5461
5462    #[tokio::test]
5463    async fn test_nested_binary_op() {
5464        let mut eval_stmt = EvalStmt {
5465            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5466            start: UNIX_EPOCH,
5467            end: UNIX_EPOCH
5468                .checked_add(Duration::from_secs(100_000))
5469                .unwrap(),
5470            interval: Duration::from_secs(5),
5471            lookback_delta: Duration::from_secs(1),
5472        };
5473
5474        let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
5475        (
5476            sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
5477            or
5478            vector(0)
5479        )"#;
5480
5481        let prom_expr = parser::parse(case).unwrap();
5482        eval_stmt.expr = prom_expr;
5483        let table_provider = build_test_table_provider_with_fields(
5484            &[(
5485                DEFAULT_SCHEMA_NAME.to_string(),
5486                "nginx_ingress_controller_requests".to_string(),
5487            )],
5488            &["namespace", "job"],
5489        )
5490        .await;
5491        // Should be ok
5492        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5493            .await
5494            .unwrap();
5495    }
5496
5497    #[tokio::test]
5498    async fn test_parse_or_operator() {
5499        let mut eval_stmt = EvalStmt {
5500            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5501            start: UNIX_EPOCH,
5502            end: UNIX_EPOCH
5503                .checked_add(Duration::from_secs(100_000))
5504                .unwrap(),
5505            interval: Duration::from_secs(5),
5506            lookback_delta: Duration::from_secs(1),
5507        };
5508
5509        let case = r#"
5510        sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
5511        (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
5512            or
5513        200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
5514        sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
5515
5516        let table_provider = build_test_table_provider_with_fields(
5517            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5518            &["tenant_name", "cluster_name"],
5519        )
5520        .await;
5521        eval_stmt.expr = parser::parse(case).unwrap();
5522        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5523            .await
5524            .unwrap();
5525
5526        let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5527            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
5528            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5529            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5530            or
5531            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5532            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5533            or
5534            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5535            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
5536        let table_provider = build_test_table_provider_with_fields(
5537            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5538            &["tenant_name", "cluster_name"],
5539        )
5540        .await;
5541        eval_stmt.expr = parser::parse(case).unwrap();
5542        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5543            .await
5544            .unwrap();
5545
5546        let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
5547            sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5548            (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5549            (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
5550        let table_provider = build_test_table_provider_with_fields(
5551            &[
5552                (
5553                    DEFAULT_SCHEMA_NAME.to_string(),
5554                    "background_waitevent_cnt".to_string(),
5555                ),
5556                (
5557                    DEFAULT_SCHEMA_NAME.to_string(),
5558                    "foreground_waitevent_cnt".to_string(),
5559                ),
5560            ],
5561            &["tenant_name", "cluster_name"],
5562        )
5563        .await;
5564        eval_stmt.expr = parser::parse(case).unwrap();
5565        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5566            .await
5567            .unwrap();
5568
5569        let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
5570        let table_provider = build_test_table_provider_with_fields(
5571            &[
5572                (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
5573                (
5574                    DEFAULT_SCHEMA_NAME.to_string(),
5575                    "container_cpu_load_average_10s".to_string(),
5576                ),
5577                (
5578                    DEFAULT_SCHEMA_NAME.to_string(),
5579                    "container_spec_cpu_quota".to_string(),
5580                ),
5581            ],
5582            &["cluster_name", "host_name"],
5583        )
5584        .await;
5585        eval_stmt.expr = parser::parse(case).unwrap();
5586        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5587            .await
5588            .unwrap();
5589    }
5590
5591    #[tokio::test]
5592    async fn value_matcher() {
5593        // template
5594        let mut eval_stmt = EvalStmt {
5595            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5596            start: UNIX_EPOCH,
5597            end: UNIX_EPOCH
5598                .checked_add(Duration::from_secs(100_000))
5599                .unwrap(),
5600            interval: Duration::from_secs(5),
5601            lookback_delta: Duration::from_secs(1),
5602        };
5603
5604        let cases = [
5605            // single equal matcher
5606            (
5607                r#"some_metric{__field__="field_1"}"#,
5608                vec![
5609                    "some_metric.field_1",
5610                    "some_metric.tag_0",
5611                    "some_metric.tag_1",
5612                    "some_metric.tag_2",
5613                    "some_metric.timestamp",
5614                ],
5615            ),
5616            // two equal matchers
5617            (
5618                r#"some_metric{__field__="field_1", __field__="field_0"}"#,
5619                vec![
5620                    "some_metric.field_0",
5621                    "some_metric.field_1",
5622                    "some_metric.tag_0",
5623                    "some_metric.tag_1",
5624                    "some_metric.tag_2",
5625                    "some_metric.timestamp",
5626                ],
5627            ),
5628            // single not_eq matcher
5629            (
5630                r#"some_metric{__field__!="field_1"}"#,
5631                vec![
5632                    "some_metric.field_0",
5633                    "some_metric.field_2",
5634                    "some_metric.tag_0",
5635                    "some_metric.tag_1",
5636                    "some_metric.tag_2",
5637                    "some_metric.timestamp",
5638                ],
5639            ),
5640            // two not_eq matchers
5641            (
5642                r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
5643                vec![
5644                    "some_metric.field_0",
5645                    "some_metric.tag_0",
5646                    "some_metric.tag_1",
5647                    "some_metric.tag_2",
5648                    "some_metric.timestamp",
5649                ],
5650            ),
5651            // equal and not_eq matchers (no conflict)
5652            (
5653                r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
5654                vec![
5655                    "some_metric.field_1",
5656                    "some_metric.tag_0",
5657                    "some_metric.tag_1",
5658                    "some_metric.tag_2",
5659                    "some_metric.timestamp",
5660                ],
5661            ),
5662            // equal and not_eq matchers (conflict)
5663            (
5664                r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
5665                vec![
5666                    "some_metric.tag_0",
5667                    "some_metric.tag_1",
5668                    "some_metric.tag_2",
5669                    "some_metric.timestamp",
5670                ],
5671            ),
5672            // single regex eq matcher
5673            (
5674                r#"some_metric{__field__=~"field_1|field_2"}"#,
5675                vec![
5676                    "some_metric.field_1",
5677                    "some_metric.field_2",
5678                    "some_metric.tag_0",
5679                    "some_metric.tag_1",
5680                    "some_metric.tag_2",
5681                    "some_metric.timestamp",
5682                ],
5683            ),
5684            // single regex not_eq matcher
5685            (
5686                r#"some_metric{__field__!~"field_1|field_2"}"#,
5687                vec![
5688                    "some_metric.field_0",
5689                    "some_metric.tag_0",
5690                    "some_metric.tag_1",
5691                    "some_metric.tag_2",
5692                    "some_metric.timestamp",
5693                ],
5694            ),
5695        ];
5696
5697        for case in cases {
5698            let prom_expr = parser::parse(case.0).unwrap();
5699            eval_stmt.expr = prom_expr;
5700            let table_provider = build_test_table_provider(
5701                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5702                3,
5703                3,
5704            )
5705            .await;
5706            let plan =
5707                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5708                    .await
5709                    .unwrap();
5710            let mut fields = plan.schema().field_names();
5711            let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
5712            fields.sort();
5713            expected.sort();
5714            assert_eq!(fields, expected, "case: {:?}", case.0);
5715        }
5716
5717        let bad_cases = [
5718            r#"some_metric{__field__="nonexistent"}"#,
5719            r#"some_metric{__field__!="nonexistent"}"#,
5720        ];
5721
5722        for case in bad_cases {
5723            let prom_expr = parser::parse(case).unwrap();
5724            eval_stmt.expr = prom_expr;
5725            let table_provider = build_test_table_provider(
5726                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5727                3,
5728                3,
5729            )
5730            .await;
5731            let plan =
5732                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5733                    .await;
5734            assert!(plan.is_err(), "case: {:?}", case);
5735        }
5736    }
5737
5738    #[tokio::test]
5739    async fn custom_schema() {
5740        let query = "some_alt_metric{__schema__=\"greptime_private\"}";
5741        let expected = String::from(
5742            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5743            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5744            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5745            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5746            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5747        );
5748
5749        indie_query_plan_compare(query, expected).await;
5750
5751        let query = "some_alt_metric{__database__=\"greptime_private\"}";
5752        let expected = String::from(
5753            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5754            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5755            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5756            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5757            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5758        );
5759
5760        indie_query_plan_compare(query, expected).await;
5761
5762        let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
5763        let expected = String::from(
5764            "Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
5765        \n  Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5766        \n    SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5767        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5768        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5769        \n          Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5770        \n            Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5771        \n              TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5772        \n    SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5773        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5774        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5775        \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5776        \n            Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5777        \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5778        );
5779
5780        indie_query_plan_compare(query, expected).await;
5781    }
5782
5783    #[tokio::test]
5784    async fn only_equals_is_supported_for_special_matcher() {
5785        let queries = &[
5786            "some_alt_metric{__schema__!=\"greptime_private\"}",
5787            "some_alt_metric{__schema__=~\"lalala\"}",
5788            "some_alt_metric{__database__!=\"greptime_private\"}",
5789            "some_alt_metric{__database__=~\"lalala\"}",
5790        ];
5791
5792        for query in queries {
5793            let prom_expr = parser::parse(query).unwrap();
5794            let eval_stmt = EvalStmt {
5795                expr: prom_expr,
5796                start: UNIX_EPOCH,
5797                end: UNIX_EPOCH
5798                    .checked_add(Duration::from_secs(100_000))
5799                    .unwrap(),
5800                interval: Duration::from_secs(5),
5801                lookback_delta: Duration::from_secs(1),
5802            };
5803
5804            let table_provider = build_test_table_provider(
5805                &[
5806                    (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5807                    (
5808                        "greptime_private".to_string(),
5809                        "some_alt_metric".to_string(),
5810                    ),
5811                ],
5812                1,
5813                1,
5814            )
5815            .await;
5816
5817            let plan =
5818                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5819                    .await;
5820            assert!(plan.is_err(), "query: {:?}", query);
5821        }
5822    }
5823
5824    #[tokio::test]
5825    async fn test_non_ms_precision() {
5826        let catalog_list = MemoryCatalogManager::with_default_setup();
5827        let columns = vec![
5828            ColumnSchema::new(
5829                "tag".to_string(),
5830                ConcreteDataType::string_datatype(),
5831                false,
5832            ),
5833            ColumnSchema::new(
5834                "timestamp".to_string(),
5835                ConcreteDataType::timestamp_nanosecond_datatype(),
5836                false,
5837            )
5838            .with_time_index(true),
5839            ColumnSchema::new(
5840                "field".to_string(),
5841                ConcreteDataType::float64_datatype(),
5842                true,
5843            ),
5844        ];
5845        let schema = Arc::new(Schema::new(columns));
5846        let table_meta = TableMetaBuilder::empty()
5847            .schema(schema)
5848            .primary_key_indices(vec![0])
5849            .value_indices(vec![2])
5850            .next_column_id(1024)
5851            .build()
5852            .unwrap();
5853        let table_info = TableInfoBuilder::default()
5854            .name("metrics".to_string())
5855            .meta(table_meta)
5856            .build()
5857            .unwrap();
5858        let table = EmptyTable::from_table_info(&table_info);
5859        assert!(
5860            catalog_list
5861                .register_table_sync(RegisterTableRequest {
5862                    catalog: DEFAULT_CATALOG_NAME.to_string(),
5863                    schema: DEFAULT_SCHEMA_NAME.to_string(),
5864                    table_name: "metrics".to_string(),
5865                    table_id: 1024,
5866                    table,
5867                })
5868                .is_ok()
5869        );
5870
5871        let plan = PromPlanner::stmt_to_plan(
5872            DfTableSourceProvider::new(
5873                catalog_list.clone(),
5874                false,
5875                QueryContext::arc(),
5876                DummyDecoder::arc(),
5877                true,
5878            ),
5879            &EvalStmt {
5880                expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
5881                start: UNIX_EPOCH,
5882                end: UNIX_EPOCH
5883                    .checked_add(Duration::from_secs(100_000))
5884                    .unwrap(),
5885                interval: Duration::from_secs(5),
5886                lookback_delta: Duration::from_secs(1),
5887            },
5888            &build_query_engine_state(),
5889        )
5890        .await
5891        .unwrap();
5892        assert_eq!(
5893            plan.display_indent_schema().to_string(),
5894            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5895            \n  PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5896            \n    Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5897            \n      Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5898            \n        Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5899            \n          TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
5900        );
5901        let plan = PromPlanner::stmt_to_plan(
5902            DfTableSourceProvider::new(
5903                catalog_list.clone(),
5904                false,
5905                QueryContext::arc(),
5906                DummyDecoder::arc(),
5907                true,
5908            ),
5909            &EvalStmt {
5910                expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
5911                start: UNIX_EPOCH,
5912                end: UNIX_EPOCH
5913                    .checked_add(Duration::from_secs(100_000))
5914                    .unwrap(),
5915                interval: Duration::from_secs(5),
5916                lookback_delta: Duration::from_secs(1),
5917            },
5918            &build_query_engine_state(),
5919        )
5920        .await
5921        .unwrap();
5922        assert_eq!(
5923            plan.display_indent_schema().to_string(),
5924            "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
5925            \n  Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
5926            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
5927            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5928            \n        PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5929            \n          Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5930            \n            Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-4999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5931            \n              Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5932            \n                TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
5933        );
5934    }
5935
5936    #[tokio::test]
5937    async fn test_nonexistent_label() {
5938        // template
5939        let mut eval_stmt = EvalStmt {
5940            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5941            start: UNIX_EPOCH,
5942            end: UNIX_EPOCH
5943                .checked_add(Duration::from_secs(100_000))
5944                .unwrap(),
5945            interval: Duration::from_secs(5),
5946            lookback_delta: Duration::from_secs(1),
5947        };
5948
5949        let case = r#"some_metric{nonexistent="hi"}"#;
5950        let prom_expr = parser::parse(case).unwrap();
5951        eval_stmt.expr = prom_expr;
5952        let table_provider = build_test_table_provider(
5953            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5954            3,
5955            3,
5956        )
5957        .await;
5958        // Should be ok
5959        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5960            .await
5961            .unwrap();
5962    }
5963
5964    #[tokio::test]
5965    async fn test_label_join() {
5966        let prom_expr = parser::parse(
5967            "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
5968        )
5969        .unwrap();
5970        let eval_stmt = EvalStmt {
5971            expr: prom_expr,
5972            start: UNIX_EPOCH,
5973            end: UNIX_EPOCH
5974                .checked_add(Duration::from_secs(100_000))
5975                .unwrap(),
5976            interval: Duration::from_secs(5),
5977            lookback_delta: Duration::from_secs(1),
5978        };
5979
5980        let table_provider =
5981            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
5982                .await;
5983        let plan =
5984            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5985                .await
5986                .unwrap();
5987
5988        let expected = r#"
5989Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
5990  Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
5991    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5992      PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5993        Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5994          Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5995            TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
5996
5997        let ret = plan.display_indent_schema().to_string();
5998        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
5999    }
6000
6001    #[tokio::test]
6002    async fn test_label_replace() {
6003        let prom_expr = parser::parse(
6004            "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
6005        )
6006        .unwrap();
6007        let eval_stmt = EvalStmt {
6008            expr: prom_expr,
6009            start: UNIX_EPOCH,
6010            end: UNIX_EPOCH
6011                .checked_add(Duration::from_secs(100_000))
6012                .unwrap(),
6013            interval: Duration::from_secs(5),
6014            lookback_delta: Duration::from_secs(1),
6015        };
6016
6017        let table_provider =
6018            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
6019                .await;
6020        let plan =
6021            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6022                .await
6023                .unwrap();
6024
6025        let expected = r#"
6026Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6027  Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6028    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
6029      PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
6030        Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
6031          Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
6032            TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
6033
6034        let ret = plan.display_indent_schema().to_string();
6035        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
6036    }
6037
6038    #[tokio::test]
6039    async fn test_matchers_to_expr() {
6040        let mut eval_stmt = EvalStmt {
6041            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6042            start: UNIX_EPOCH,
6043            end: UNIX_EPOCH
6044                .checked_add(Duration::from_secs(100_000))
6045                .unwrap(),
6046            interval: Duration::from_secs(5),
6047            lookback_delta: Duration::from_secs(1),
6048        };
6049        let case =
6050            r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
6051
6052        let prom_expr = parser::parse(case).unwrap();
6053        eval_stmt.expr = prom_expr;
6054        let table_provider = build_test_table_provider(
6055            &[(
6056                DEFAULT_SCHEMA_NAME.to_string(),
6057                "prometheus_tsdb_head_series".to_string(),
6058            )],
6059            3,
6060            3,
6061        )
6062        .await;
6063        let plan =
6064            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6065                .await
6066                .unwrap();
6067        let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6068        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6069        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6070        \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6071        \n        Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6072        \n          Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6073        \n            TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
6074        assert_eq!(plan.display_indent_schema().to_string(), expected);
6075    }
6076
6077    #[tokio::test]
6078    async fn test_topk_expr() {
6079        let mut eval_stmt = EvalStmt {
6080            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6081            start: UNIX_EPOCH,
6082            end: UNIX_EPOCH
6083                .checked_add(Duration::from_secs(100_000))
6084                .unwrap(),
6085            interval: Duration::from_secs(5),
6086            lookback_delta: Duration::from_secs(1),
6087        };
6088        let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6089
6090        let prom_expr = parser::parse(case).unwrap();
6091        eval_stmt.expr = prom_expr;
6092        let table_provider = build_test_table_provider_with_fields(
6093            &[
6094                (
6095                    DEFAULT_SCHEMA_NAME.to_string(),
6096                    "prometheus_tsdb_head_series".to_string(),
6097                ),
6098                (
6099                    DEFAULT_SCHEMA_NAME.to_string(),
6100                    "http_server_requests_seconds_count".to_string(),
6101                ),
6102            ],
6103            &["ip"],
6104        )
6105        .await;
6106
6107        let plan =
6108            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6109                .await
6110                .unwrap();
6111        let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
6112        \n  Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6113        \n    Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6114        \n      WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6115        \n        Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6116        \n          Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6117        \n            PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6118        \n              PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6119        \n                Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6120        \n                  Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6121        \n                    TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
6122
6123        assert_eq!(plan.display_indent_schema().to_string(), expected);
6124    }
6125
6126    #[tokio::test]
6127    async fn test_count_values_expr() {
6128        let mut eval_stmt = EvalStmt {
6129            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6130            start: UNIX_EPOCH,
6131            end: UNIX_EPOCH
6132                .checked_add(Duration::from_secs(100_000))
6133                .unwrap(),
6134            interval: Duration::from_secs(5),
6135            lookback_delta: Duration::from_secs(1),
6136        };
6137        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6138
6139        let prom_expr = parser::parse(case).unwrap();
6140        eval_stmt.expr = prom_expr;
6141        let table_provider = build_test_table_provider_with_fields(
6142            &[
6143                (
6144                    DEFAULT_SCHEMA_NAME.to_string(),
6145                    "prometheus_tsdb_head_series".to_string(),
6146                ),
6147                (
6148                    DEFAULT_SCHEMA_NAME.to_string(),
6149                    "http_server_requests_seconds_count".to_string(),
6150                ),
6151            ],
6152            &["ip"],
6153        )
6154        .await;
6155
6156        let plan =
6157            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6158                .await
6159                .unwrap();
6160        let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
6161        \n  Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
6162        \n    Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
6163        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
6164        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6165        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6166        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6167        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6168        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
6169
6170        assert_eq!(plan.display_indent_schema().to_string(), expected);
6171    }
6172
6173    #[tokio::test]
6174    async fn test_value_alias() {
6175        let mut eval_stmt = EvalStmt {
6176            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6177            start: UNIX_EPOCH,
6178            end: UNIX_EPOCH
6179                .checked_add(Duration::from_secs(100_000))
6180                .unwrap(),
6181            interval: Duration::from_secs(5),
6182            lookback_delta: Duration::from_secs(1),
6183        };
6184        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6185
6186        let prom_expr = parser::parse(case).unwrap();
6187        eval_stmt.expr = prom_expr;
6188        eval_stmt = QueryLanguageParser::apply_alias_extension(eval_stmt, "my_series");
6189        let table_provider = build_test_table_provider_with_fields(
6190            &[
6191                (
6192                    DEFAULT_SCHEMA_NAME.to_string(),
6193                    "prometheus_tsdb_head_series".to_string(),
6194                ),
6195                (
6196                    DEFAULT_SCHEMA_NAME.to_string(),
6197                    "http_server_requests_seconds_count".to_string(),
6198                ),
6199            ],
6200            &["ip"],
6201        )
6202        .await;
6203
6204        let plan =
6205            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6206                .await
6207                .unwrap();
6208        let expected = r#"
6209Projection: count(prometheus_tsdb_head_series.greptime_value) AS my_series, prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [my_series:Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]
6210  Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]
6211    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
6212      Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
6213        Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
6214          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6215            PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6216              Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6217                Filter: prometheus_tsdb_head_series.ip ~ Utf8("^(?:(10.0.160.237:8080|10.0.160.237:9090))$") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6218                  TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
6219        assert_eq!(format!("\n{}", plan.display_indent_schema()), expected);
6220    }
6221
6222    #[tokio::test]
6223    async fn test_quantile_expr() {
6224        let mut eval_stmt = EvalStmt {
6225            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6226            start: UNIX_EPOCH,
6227            end: UNIX_EPOCH
6228                .checked_add(Duration::from_secs(100_000))
6229                .unwrap(),
6230            interval: Duration::from_secs(5),
6231            lookback_delta: Duration::from_secs(1),
6232        };
6233        let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6234
6235        let prom_expr = parser::parse(case).unwrap();
6236        eval_stmt.expr = prom_expr;
6237        let table_provider = build_test_table_provider_with_fields(
6238            &[
6239                (
6240                    DEFAULT_SCHEMA_NAME.to_string(),
6241                    "prometheus_tsdb_head_series".to_string(),
6242                ),
6243                (
6244                    DEFAULT_SCHEMA_NAME.to_string(),
6245                    "http_server_requests_seconds_count".to_string(),
6246                ),
6247            ],
6248            &["ip"],
6249        )
6250        .await;
6251
6252        let plan =
6253            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6254                .await
6255                .unwrap();
6256        let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6257        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6258        \n    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6259        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6260        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6261        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6262        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6263        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6264        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
6265
6266        assert_eq!(plan.display_indent_schema().to_string(), expected);
6267    }
6268
6269    #[tokio::test]
6270    async fn test_or_not_exists_table_label() {
6271        let mut eval_stmt = EvalStmt {
6272            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6273            start: UNIX_EPOCH,
6274            end: UNIX_EPOCH
6275                .checked_add(Duration::from_secs(100_000))
6276                .unwrap(),
6277            interval: Duration::from_secs(5),
6278            lookback_delta: Duration::from_secs(1),
6279        };
6280        let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
6281
6282        let prom_expr = parser::parse(case).unwrap();
6283        eval_stmt.expr = prom_expr;
6284        let table_provider = build_test_table_provider_with_fields(
6285            &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
6286            &["job"],
6287        )
6288        .await;
6289
6290        let plan =
6291            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6292                .await
6293                .unwrap();
6294        let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6295  SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6296    Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6297      Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
6298        Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
6299          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6300            PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6301              Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6302                Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-999, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100000000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6303                  TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6304  SubqueryAlias:  [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
6305    Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
6306      Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
6307        Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
6308          EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
6309            TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
6310
6311        assert_eq!(plan.display_indent_schema().to_string(), expected);
6312    }
6313
6314    #[tokio::test]
6315    async fn test_histogram_quantile_missing_le_column() {
6316        let mut eval_stmt = EvalStmt {
6317            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6318            start: UNIX_EPOCH,
6319            end: UNIX_EPOCH
6320                .checked_add(Duration::from_secs(100_000))
6321                .unwrap(),
6322            interval: Duration::from_secs(5),
6323            lookback_delta: Duration::from_secs(1),
6324        };
6325
6326        // Test case: histogram_quantile with a table that doesn't have 'le' column
6327        let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
6328
6329        let prom_expr = parser::parse(case).unwrap();
6330        eval_stmt.expr = prom_expr;
6331
6332        // Create a table provider with a table that doesn't have 'le' column
6333        let table_provider = build_test_table_provider_with_fields(
6334            &[(
6335                DEFAULT_SCHEMA_NAME.to_string(),
6336                "non_existent_histogram_bucket".to_string(),
6337            )],
6338            &["pod", "instance"], // Note: no 'le' column
6339        )
6340        .await;
6341
6342        // Should return empty result instead of error
6343        let result =
6344            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6345                .await;
6346
6347        // This should succeed now (returning empty result) instead of failing with "Cannot find column le"
6348        assert!(
6349            result.is_ok(),
6350            "Expected successful plan creation with empty result, but got error: {:?}",
6351            result.err()
6352        );
6353
6354        // Verify that the result is an EmptyRelation
6355        let plan = result.unwrap();
6356        match plan {
6357            LogicalPlan::EmptyRelation(_) => {
6358                // This is what we expect
6359            }
6360            _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
6361        }
6362    }
6363}