query/promql/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::greptime_value;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
32use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
33use datafusion::functions_aggregate::sum::sum_udaf;
34use datafusion::functions_aggregate::variance::var_pop_udaf;
35use datafusion::functions_window::row_number::RowNumber;
36use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
37use datafusion::logical_expr::expr_rewriter::normalize_cols;
38use datafusion::logical_expr::{
39    BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
40    ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
41};
42use datafusion::prelude as df_prelude;
43use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
44use datafusion::scalar::ScalarValue;
45use datafusion::sql::TableReference;
46use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{
51    ExprSchemable, Literal, Projection, SortExpr, TableScan, TableSource, col, lit,
52};
53use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
54use datatypes::data_type::ConcreteDataType;
55use itertools::Itertools;
56use once_cell::sync::Lazy;
57use promql::extension_plan::{
58    Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
59    ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
60};
61use promql::functions::{
62    AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, DoubleExponentialSmoothing,
63    IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
64    QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
65    quantile_udaf,
66};
67use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
68use promql_parser::parser::token::TokenType;
69use promql_parser::parser::value::ValueType;
70use promql_parser::parser::{
71    AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
72    Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
73    Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
74    VectorSelector, token,
75};
76use regex::{self, Regex};
77use snafu::{OptionExt, ResultExt, ensure};
78use store_api::metric_engine_consts::{
79    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
80    METRIC_ENGINE_NAME, is_metric_engine_internal_column,
81};
82use table::table::adapter::DfTableProviderAdapter;
83
84use crate::parser::{
85    ALIAS_NODE_NAME, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, AliasExpr, EXPLAIN_NODE_NAME,
86    EXPLAIN_VERBOSE_NODE_NAME,
87};
88use crate::promql::error::{
89    CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
90    ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
91    InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
92    MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
93    Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
94    UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
95    UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
96    ZeroRangeSelectorSnafu,
97};
98use crate::query_engine::QueryEngineState;
99
100/// `time()` function in PromQL.
101const SPECIAL_TIME_FUNCTION: &str = "time";
102/// `scalar()` function in PromQL.
103const SCALAR_FUNCTION: &str = "scalar";
104/// `absent()` function in PromQL
105const SPECIAL_ABSENT_FUNCTION: &str = "absent";
106/// `histogram_quantile` function in PromQL
107const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
108/// `vector` function in PromQL
109const SPECIAL_VECTOR_FUNCTION: &str = "vector";
110/// `le` column for conventional histogram.
111const LE_COLUMN_NAME: &str = "le";
112
113/// Static regex for validating label names according to Prometheus specification.
114/// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
115static LABEL_NAME_REGEX: Lazy<Regex> =
116    Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
117
118const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
119
120/// default value column name for empty metric
121const DEFAULT_FIELD_COLUMN: &str = "value";
122
123/// Special modifier to project field columns under multi-field mode
124const FIELD_COLUMN_MATCHER: &str = "__field__";
125
126/// Special modifier for cross schema query
127const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
128const DB_COLUMN_MATCHER: &str = "__database__";
129
130/// Threshold for scatter scan mode
131const MAX_SCATTER_POINTS: i64 = 400;
132
133/// Interval 1 hour in millisecond
134const INTERVAL_1H: i64 = 60 * 60 * 1000;
135
136#[derive(Default, Debug, Clone)]
137struct PromPlannerContext {
138    // query parameters
139    start: Millisecond,
140    end: Millisecond,
141    interval: Millisecond,
142    lookback_delta: Millisecond,
143
144    // planner states
145    table_name: Option<String>,
146    time_index_column: Option<String>,
147    field_columns: Vec<String>,
148    tag_columns: Vec<String>,
149    /// Use metric engine internal series identifier column (`__tsid`) as series key.
150    ///
151    /// This is enabled only when the underlying scan can provide `__tsid` (`UInt64`). The planner
152    /// uses it internally (e.g. as the series key for [`SeriesDivide`]) and strips it from the
153    /// final output.
154    use_tsid: bool,
155    /// The matcher for field columns `__field__`.
156    field_column_matcher: Option<Vec<Matcher>>,
157    /// The matcher for selectors (normal matchers).
158    selector_matcher: Vec<Matcher>,
159    schema_name: Option<String>,
160    /// The range in millisecond of range selector. None if there is no range selector.
161    range: Option<Millisecond>,
162}
163
164impl PromPlannerContext {
165    fn from_eval_stmt(stmt: &EvalStmt) -> Self {
166        Self {
167            start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
168            end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
169            interval: stmt.interval.as_millis() as _,
170            lookback_delta: stmt.lookback_delta.as_millis() as _,
171            ..Default::default()
172        }
173    }
174
175    /// Reset all planner states
176    fn reset(&mut self) {
177        self.table_name = None;
178        self.time_index_column = None;
179        self.field_columns = vec![];
180        self.tag_columns = vec![];
181        self.use_tsid = false;
182        self.field_column_matcher = None;
183        self.selector_matcher.clear();
184        self.schema_name = None;
185        self.range = None;
186    }
187
188    /// Reset table name and schema to empty
189    fn reset_table_name_and_schema(&mut self) {
190        self.table_name = Some(String::new());
191        self.schema_name = None;
192        self.use_tsid = false;
193    }
194
195    /// Check if `le` is present in tag columns
196    fn has_le_tag(&self) -> bool {
197        self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
198    }
199}
200
201pub struct PromPlanner {
202    table_provider: DfTableSourceProvider,
203    ctx: PromPlannerContext,
204}
205
206impl PromPlanner {
207    pub async fn stmt_to_plan(
208        table_provider: DfTableSourceProvider,
209        stmt: &EvalStmt,
210        query_engine_state: &QueryEngineState,
211    ) -> Result<LogicalPlan> {
212        let mut planner = Self {
213            table_provider,
214            ctx: PromPlannerContext::from_eval_stmt(stmt),
215        };
216
217        let plan = planner
218            .prom_expr_to_plan(&stmt.expr, query_engine_state)
219            .await?;
220
221        // Never leak internal series identifier to output.
222        planner.strip_tsid_column(plan)
223    }
224
225    pub async fn prom_expr_to_plan(
226        &mut self,
227        prom_expr: &PromExpr,
228        query_engine_state: &QueryEngineState,
229    ) -> Result<LogicalPlan> {
230        self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
231            .await
232    }
233
234    /**
235    Converts a PromQL expression to a logical plan.
236
237    NOTE:
238        The `timestamp_fn` indicates whether the PromQL `timestamp()` function is being evaluated in the current context.
239        If `true`, the planner generates a logical plan that projects the timestamp (time index) column
240        as the value column for each input row, implementing the PromQL `timestamp()` function semantics.
241        If `false`, the planner generates the standard logical plan for the given PromQL expression.
242    */
243    #[async_recursion]
244    async fn prom_expr_to_plan_inner(
245        &mut self,
246        prom_expr: &PromExpr,
247        timestamp_fn: bool,
248        query_engine_state: &QueryEngineState,
249    ) -> Result<LogicalPlan> {
250        let res = match prom_expr {
251            PromExpr::Aggregate(expr) => {
252                self.prom_aggr_expr_to_plan(query_engine_state, expr)
253                    .await?
254            }
255            PromExpr::Unary(expr) => {
256                self.prom_unary_expr_to_plan(query_engine_state, expr)
257                    .await?
258            }
259            PromExpr::Binary(expr) => {
260                self.prom_binary_expr_to_plan(query_engine_state, expr)
261                    .await?
262            }
263            PromExpr::Paren(ParenExpr { expr }) => {
264                self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
265                    .await?
266            }
267            PromExpr::Subquery(expr) => {
268                self.prom_subquery_expr_to_plan(query_engine_state, expr)
269                    .await?
270            }
271            PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
272            PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
273            PromExpr::VectorSelector(selector) => {
274                self.prom_vector_selector_to_plan(selector, timestamp_fn)
275                    .await?
276            }
277            PromExpr::MatrixSelector(selector) => {
278                self.prom_matrix_selector_to_plan(selector).await?
279            }
280            PromExpr::Call(expr) => {
281                self.prom_call_expr_to_plan(query_engine_state, expr)
282                    .await?
283            }
284            PromExpr::Extension(expr) => {
285                self.prom_ext_expr_to_plan(query_engine_state, expr).await?
286            }
287        };
288
289        Ok(res)
290    }
291
292    async fn prom_subquery_expr_to_plan(
293        &mut self,
294        query_engine_state: &QueryEngineState,
295        subquery_expr: &SubqueryExpr,
296    ) -> Result<LogicalPlan> {
297        let SubqueryExpr {
298            expr, range, step, ..
299        } = subquery_expr;
300
301        let current_interval = self.ctx.interval;
302        if let Some(step) = step {
303            self.ctx.interval = step.as_millis() as _;
304        }
305        let current_start = self.ctx.start;
306        self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
307        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
308        self.ctx.interval = current_interval;
309        self.ctx.start = current_start;
310
311        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
312        let range_ms = range.as_millis() as _;
313        self.ctx.range = Some(range_ms);
314
315        let manipulate = RangeManipulate::new(
316            self.ctx.start,
317            self.ctx.end,
318            self.ctx.interval,
319            range_ms,
320            self.ctx
321                .time_index_column
322                .clone()
323                .expect("time index should be set in `setup_context`"),
324            self.ctx.field_columns.clone(),
325            input,
326        )
327        .context(DataFusionPlanningSnafu)?;
328
329        Ok(LogicalPlan::Extension(Extension {
330            node: Arc::new(manipulate),
331        }))
332    }
333
334    async fn prom_aggr_expr_to_plan(
335        &mut self,
336        query_engine_state: &QueryEngineState,
337        aggr_expr: &AggregateExpr,
338    ) -> Result<LogicalPlan> {
339        let AggregateExpr {
340            op,
341            expr,
342            modifier,
343            param,
344        } = aggr_expr;
345
346        let mut input = self.prom_expr_to_plan(expr, query_engine_state).await?;
347        let input_has_tsid = input.schema().fields().iter().any(|field| {
348            field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
349                && field.data_type() == &ArrowDataType::UInt64
350        });
351
352        // `__tsid` based scan projection may prune tag columns. Ensure tags referenced in
353        // aggregation modifiers (`by`/`without`) are available before planning group keys.
354        let required_group_tags = match modifier {
355            None => BTreeSet::new(),
356            Some(LabelModifier::Include(labels)) => labels
357                .labels
358                .iter()
359                .filter(|label| !is_metric_engine_internal_column(label.as_str()))
360                .cloned()
361                .collect(),
362            Some(LabelModifier::Exclude(labels)) => {
363                let mut all_tags = self.collect_row_key_tag_columns_from_plan(&input)?;
364                for label in &labels.labels {
365                    let _ = all_tags.remove(label);
366                }
367                all_tags
368            }
369        };
370
371        if !required_group_tags.is_empty()
372            && required_group_tags
373                .iter()
374                .any(|tag| Self::find_case_sensitive_column(input.schema(), tag.as_str()).is_none())
375        {
376            input = self.ensure_tag_columns_available(input, &required_group_tags)?;
377            self.refresh_tag_columns_from_schema(input.schema());
378        }
379
380        match (*op).id() {
381            token::T_TOPK | token::T_BOTTOMK => {
382                self.prom_topk_bottomk_to_plan(aggr_expr, input).await
383            }
384            _ => {
385                // When `__tsid` is available, tag columns may have been pruned from the input plan.
386                // For `keep_tsid` decision we should compare against the full row-key label set,
387                // otherwise we may incorrectly treat label-reducing aggregates as preserving labels.
388                let input_tag_columns = if input_has_tsid {
389                    self.collect_row_key_tag_columns_from_plan(&input)?
390                        .into_iter()
391                        .collect::<Vec<_>>()
392                } else {
393                    self.ctx.tag_columns.clone()
394                };
395                // calculate columns to group by
396                // Need to append time index column into group by columns
397                let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
398                // convert op and value columns to aggregate exprs
399                let (mut aggr_exprs, prev_field_exprs) =
400                    self.create_aggregate_exprs(*op, param, &input)?;
401
402                let keep_tsid = op.id() != token::T_COUNT_VALUES
403                    && input_has_tsid
404                    && input_tag_columns.iter().collect::<HashSet<_>>()
405                        == self.ctx.tag_columns.iter().collect::<HashSet<_>>();
406
407                if keep_tsid {
408                    aggr_exprs.push(
409                        first_value(
410                            DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)),
411                            vec![],
412                        )
413                        .alias(DATA_SCHEMA_TSID_COLUMN_NAME),
414                    );
415                }
416                self.ctx.use_tsid = keep_tsid;
417
418                // create plan
419                let builder = LogicalPlanBuilder::from(input);
420                let builder = if op.id() == token::T_COUNT_VALUES {
421                    let label = Self::get_param_value_as_str(*op, param)?;
422                    // `count_values` must be grouped by fields,
423                    // and project the fields to the new label.
424                    group_exprs.extend(prev_field_exprs.clone());
425                    let project_fields = self
426                        .create_field_column_exprs()?
427                        .into_iter()
428                        .chain(self.create_tag_column_exprs()?)
429                        .chain(Some(self.create_time_index_column_expr()?))
430                        .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
431
432                    builder
433                        .aggregate(group_exprs.clone(), aggr_exprs)
434                        .context(DataFusionPlanningSnafu)?
435                        .project(project_fields)
436                        .context(DataFusionPlanningSnafu)?
437                } else {
438                    builder
439                        .aggregate(group_exprs.clone(), aggr_exprs)
440                        .context(DataFusionPlanningSnafu)?
441                };
442
443                let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
444
445                builder
446                    .sort(sort_expr)
447                    .context(DataFusionPlanningSnafu)?
448                    .build()
449                    .context(DataFusionPlanningSnafu)
450            }
451        }
452    }
453
454    /// Create logical plan for PromQL topk and bottomk expr.
455    async fn prom_topk_bottomk_to_plan(
456        &mut self,
457        aggr_expr: &AggregateExpr,
458        input: LogicalPlan,
459    ) -> Result<LogicalPlan> {
460        let AggregateExpr {
461            op,
462            param,
463            modifier,
464            ..
465        } = aggr_expr;
466
467        let input_has_tsid = input.schema().fields().iter().any(|field| {
468            field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
469                && field.data_type() == &ArrowDataType::UInt64
470        });
471        self.ctx.use_tsid = input_has_tsid;
472
473        let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
474
475        let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
476
477        // convert op and value columns to window exprs.
478        let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
479
480        let rank_columns: Vec<_> = window_exprs
481            .iter()
482            .map(|expr| expr.schema_name().to_string())
483            .collect();
484
485        // Create ranks filter with `Operator::Or`.
486        // Safety: at least one rank column
487        let filter: DfExpr = rank_columns
488            .iter()
489            .fold(None, |expr, rank| {
490                let predicate = DfExpr::BinaryExpr(BinaryExpr {
491                    left: Box::new(col(rank)),
492                    op: Operator::LtEq,
493                    right: Box::new(val.clone()),
494                });
495
496                match expr {
497                    None => Some(predicate),
498                    Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
499                        left: Box::new(expr),
500                        op: Operator::Or,
501                        right: Box::new(predicate),
502                    })),
503                }
504            })
505            .unwrap();
506
507        let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
508
509        let mut new_group_exprs = group_exprs.clone();
510        // Order by ranks
511        new_group_exprs.extend(rank_columns);
512
513        let group_sort_expr = new_group_exprs
514            .into_iter()
515            .map(|expr| expr.sort(true, false));
516
517        let project_fields = self
518            .create_field_column_exprs()?
519            .into_iter()
520            .chain(self.create_tag_column_exprs()?)
521            .chain(
522                self.ctx
523                    .use_tsid
524                    .then_some(DfExpr::Column(Column::from_name(
525                        DATA_SCHEMA_TSID_COLUMN_NAME,
526                    )))
527                    .into_iter(),
528            )
529            .chain(Some(self.create_time_index_column_expr()?));
530
531        LogicalPlanBuilder::from(input)
532            .window(window_exprs)
533            .context(DataFusionPlanningSnafu)?
534            .filter(filter)
535            .context(DataFusionPlanningSnafu)?
536            .sort(group_sort_expr)
537            .context(DataFusionPlanningSnafu)?
538            .project(project_fields)
539            .context(DataFusionPlanningSnafu)?
540            .build()
541            .context(DataFusionPlanningSnafu)
542    }
543
544    async fn prom_unary_expr_to_plan(
545        &mut self,
546        query_engine_state: &QueryEngineState,
547        unary_expr: &UnaryExpr,
548    ) -> Result<LogicalPlan> {
549        let UnaryExpr { expr } = unary_expr;
550        // Unary Expr in PromQL implys the `-` operator
551        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
552        self.projection_for_each_field_column(input, |col| {
553            Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
554        })
555    }
556
557    async fn prom_binary_expr_to_plan(
558        &mut self,
559        query_engine_state: &QueryEngineState,
560        binary_expr: &PromBinaryExpr,
561    ) -> Result<LogicalPlan> {
562        let PromBinaryExpr {
563            lhs,
564            rhs,
565            op,
566            modifier,
567        } = binary_expr;
568
569        // if set to true, comparison operator will return 0/1 (for true/false) instead of
570        // filter on the result column
571        let should_return_bool = if let Some(m) = modifier {
572            m.return_bool
573        } else {
574            false
575        };
576        let is_comparison_op = Self::is_token_a_comparison_op(*op);
577
578        // we should build a filter plan here if the op is comparison op and need not
579        // to return 0/1. Otherwise, we should build a projection plan
580        match (
581            Self::try_build_literal_expr(lhs),
582            Self::try_build_literal_expr(rhs),
583        ) {
584            (Some(lhs), Some(rhs)) => {
585                self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
586                self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
587                self.ctx.reset_table_name_and_schema();
588                let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
589                let mut field_expr = field_expr_builder(lhs, rhs)?;
590
591                if is_comparison_op && should_return_bool {
592                    field_expr = DfExpr::Cast(Cast {
593                        expr: Box::new(field_expr),
594                        data_type: ArrowDataType::Float64,
595                    });
596                }
597
598                Ok(LogicalPlan::Extension(Extension {
599                    node: Arc::new(
600                        EmptyMetric::new(
601                            self.ctx.start,
602                            self.ctx.end,
603                            self.ctx.interval,
604                            SPECIAL_TIME_FUNCTION.to_string(),
605                            DEFAULT_FIELD_COLUMN.to_string(),
606                            Some(field_expr),
607                        )
608                        .context(DataFusionPlanningSnafu)?,
609                    ),
610                }))
611            }
612            // lhs is a literal, rhs is a column
613            (Some(mut expr), None) => {
614                let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
615                // check if the literal is a special time expr
616                if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
617                    expr = time_expr
618                }
619                let bin_expr_builder = |col: &String| {
620                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
621                    let mut binary_expr =
622                        binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
623
624                    if is_comparison_op && should_return_bool {
625                        binary_expr = DfExpr::Cast(Cast {
626                            expr: Box::new(binary_expr),
627                            data_type: ArrowDataType::Float64,
628                        });
629                    }
630                    Ok(binary_expr)
631                };
632                if is_comparison_op && !should_return_bool {
633                    self.filter_on_field_column(input, bin_expr_builder)
634                } else {
635                    self.projection_for_each_field_column(input, bin_expr_builder)
636                }
637            }
638            // lhs is a column, rhs is a literal
639            (None, Some(mut expr)) => {
640                let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
641                // check if the literal is a special time expr
642                if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
643                    expr = time_expr
644                }
645                let bin_expr_builder = |col: &String| {
646                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
647                    let mut binary_expr =
648                        binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
649
650                    if is_comparison_op && should_return_bool {
651                        binary_expr = DfExpr::Cast(Cast {
652                            expr: Box::new(binary_expr),
653                            data_type: ArrowDataType::Float64,
654                        });
655                    }
656                    Ok(binary_expr)
657                };
658                if is_comparison_op && !should_return_bool {
659                    self.filter_on_field_column(input, bin_expr_builder)
660                } else {
661                    self.projection_for_each_field_column(input, bin_expr_builder)
662                }
663            }
664            // both are columns. join them on time index
665            (None, None) => {
666                let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
667                let left_field_columns = self.ctx.field_columns.clone();
668                let left_time_index_column = self.ctx.time_index_column.clone();
669                let mut left_table_ref = self
670                    .table_ref()
671                    .unwrap_or_else(|_| TableReference::bare(""));
672                let left_context = self.ctx.clone();
673
674                let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
675                let right_field_columns = self.ctx.field_columns.clone();
676                let right_time_index_column = self.ctx.time_index_column.clone();
677                let mut right_table_ref = self
678                    .table_ref()
679                    .unwrap_or_else(|_| TableReference::bare(""));
680                let right_context = self.ctx.clone();
681
682                // TODO(ruihang): avoid join if left and right are the same table
683
684                // set op has "special" join semantics
685                if Self::is_token_a_set_op(*op) {
686                    return self.set_op_on_non_field_columns(
687                        left_input,
688                        right_input,
689                        left_context,
690                        right_context,
691                        *op,
692                        modifier,
693                    );
694                }
695
696                // normal join
697                if left_table_ref == right_table_ref {
698                    // rename table references to avoid ambiguity
699                    left_table_ref = TableReference::bare("lhs");
700                    right_table_ref = TableReference::bare("rhs");
701                    // `self.ctx` have ctx in right plan, if right plan have no tag,
702                    // we use left plan ctx as the ctx for subsequent calculations,
703                    // to avoid case like `host + scalar(...)`
704                    // we need preserve tag column on `host` table in subsequent projection,
705                    // which only show in left plan ctx.
706                    if self.ctx.tag_columns.is_empty() {
707                        self.ctx = left_context.clone();
708                        self.ctx.table_name = Some("lhs".to_string());
709                    } else {
710                        self.ctx.table_name = Some("rhs".to_string());
711                    }
712                }
713                let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
714
715                let join_plan = self.join_on_non_field_columns(
716                    left_input,
717                    right_input,
718                    left_table_ref.clone(),
719                    right_table_ref.clone(),
720                    left_time_index_column,
721                    right_time_index_column,
722                    // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
723                    // under this case we only join on time index
724                    left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
725                    modifier,
726                )?;
727                let join_plan_schema = join_plan.schema().clone();
728
729                let bin_expr_builder = |_: &String| {
730                    let (left_col_name, right_col_name) = field_columns.next().unwrap();
731                    let left_col = join_plan_schema
732                        .qualified_field_with_name(Some(&left_table_ref), left_col_name)
733                        .context(DataFusionPlanningSnafu)?
734                        .into();
735                    let right_col = join_plan_schema
736                        .qualified_field_with_name(Some(&right_table_ref), right_col_name)
737                        .context(DataFusionPlanningSnafu)?
738                        .into();
739
740                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
741                    let mut binary_expr =
742                        binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
743                    if is_comparison_op && should_return_bool {
744                        binary_expr = DfExpr::Cast(Cast {
745                            expr: Box::new(binary_expr),
746                            data_type: ArrowDataType::Float64,
747                        });
748                    }
749                    Ok(binary_expr)
750                };
751                if is_comparison_op && !should_return_bool {
752                    // PromQL comparison operators without `bool` are filters:
753                    //   - keep the instant-vector side sample values
754                    //   - drop samples where the comparison is false
755                    //
756                    // So we filter on the join result and then project only the side that should
757                    // be preserved according to PromQL semantics.
758                    let filtered = self.filter_on_field_column(join_plan, bin_expr_builder)?;
759                    let (project_table_ref, project_context) =
760                        match (lhs.value_type(), rhs.value_type()) {
761                            (ValueType::Scalar, ValueType::Vector) => {
762                                (&right_table_ref, &right_context)
763                            }
764                            _ => (&left_table_ref, &left_context),
765                        };
766                    self.project_binary_join_side(filtered, project_table_ref, project_context)
767                } else {
768                    self.projection_for_each_field_column(join_plan, bin_expr_builder)
769                }
770            }
771        }
772    }
773
774    fn project_binary_join_side(
775        &mut self,
776        input: LogicalPlan,
777        table_ref: &TableReference,
778        context: &PromPlannerContext,
779    ) -> Result<LogicalPlan> {
780        let schema = input.schema();
781
782        let mut project_exprs =
783            Vec::with_capacity(context.tag_columns.len() + context.field_columns.len() + 2);
784
785        // Project time index from the chosen side.
786        if let Some(time_index_column) = &context.time_index_column {
787            let time_index_col = schema
788                .qualified_field_with_name(Some(table_ref), time_index_column)
789                .context(DataFusionPlanningSnafu)?
790                .into();
791            project_exprs.push(DfExpr::Column(time_index_col));
792        }
793
794        // Project field columns from the chosen side.
795        for field_column in &context.field_columns {
796            let field_col = schema
797                .qualified_field_with_name(Some(table_ref), field_column)
798                .context(DataFusionPlanningSnafu)?
799                .into();
800            project_exprs.push(DfExpr::Column(field_col));
801        }
802
803        // Project tag columns from the chosen side.
804        for tag_column in &context.tag_columns {
805            let tag_col = schema
806                .qualified_field_with_name(Some(table_ref), tag_column)
807                .context(DataFusionPlanningSnafu)?
808                .into();
809            project_exprs.push(DfExpr::Column(tag_col));
810        }
811
812        // Preserve `__tsid` if present, so it can still be used internally downstream. It's
813        // stripped from the final output anyway.
814        if context.use_tsid
815            && let Ok(tsid_col) =
816                schema.qualified_field_with_name(Some(table_ref), DATA_SCHEMA_TSID_COLUMN_NAME)
817        {
818            project_exprs.push(DfExpr::Column(tsid_col.into()));
819        }
820
821        let plan = LogicalPlanBuilder::from(input)
822            .project(project_exprs)
823            .context(DataFusionPlanningSnafu)?
824            .build()
825            .context(DataFusionPlanningSnafu)?;
826
827        // Update context to reflect the projected schema. Don't keep a table qualifier since
828        // the result is a derived expression.
829        self.ctx = context.clone();
830        self.ctx.table_name = None;
831        self.ctx.schema_name = None;
832
833        Ok(plan)
834    }
835
836    fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
837        let NumberLiteral { val } = number_literal;
838        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
839        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
840        self.ctx.reset_table_name_and_schema();
841        let literal_expr = df_prelude::lit(*val);
842
843        let plan = LogicalPlan::Extension(Extension {
844            node: Arc::new(
845                EmptyMetric::new(
846                    self.ctx.start,
847                    self.ctx.end,
848                    self.ctx.interval,
849                    SPECIAL_TIME_FUNCTION.to_string(),
850                    DEFAULT_FIELD_COLUMN.to_string(),
851                    Some(literal_expr),
852                )
853                .context(DataFusionPlanningSnafu)?,
854            ),
855        });
856        Ok(plan)
857    }
858
859    fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
860        let StringLiteral { val } = string_literal;
861        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
862        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
863        self.ctx.reset_table_name_and_schema();
864        let literal_expr = df_prelude::lit(val.clone());
865
866        let plan = LogicalPlan::Extension(Extension {
867            node: Arc::new(
868                EmptyMetric::new(
869                    self.ctx.start,
870                    self.ctx.end,
871                    self.ctx.interval,
872                    SPECIAL_TIME_FUNCTION.to_string(),
873                    DEFAULT_FIELD_COLUMN.to_string(),
874                    Some(literal_expr),
875                )
876                .context(DataFusionPlanningSnafu)?,
877            ),
878        });
879        Ok(plan)
880    }
881
882    async fn prom_vector_selector_to_plan(
883        &mut self,
884        vector_selector: &VectorSelector,
885        timestamp_fn: bool,
886    ) -> Result<LogicalPlan> {
887        let VectorSelector {
888            name,
889            offset,
890            matchers,
891            at: _,
892        } = vector_selector;
893        let matchers = self.preprocess_label_matchers(matchers, name)?;
894        if let Some(empty_plan) = self.setup_context().await? {
895            return Ok(empty_plan);
896        }
897        let normalize = self
898            .selector_to_series_normalize_plan(offset, matchers, false)
899            .await?;
900
901        let normalize = if timestamp_fn {
902            // If evaluating the PromQL `timestamp()` function, project the time index column as the value column
903            // before wrapping with [`InstantManipulate`], so the output matches PromQL's `timestamp()` semantics.
904            self.create_timestamp_func_plan(normalize)?
905        } else {
906            normalize
907        };
908
909        let manipulate = InstantManipulate::new(
910            self.ctx.start,
911            self.ctx.end,
912            self.ctx.lookback_delta,
913            self.ctx.interval,
914            self.ctx
915                .time_index_column
916                .clone()
917                .expect("time index should be set in `setup_context`"),
918            self.ctx.field_columns.first().cloned(),
919            normalize,
920        );
921        Ok(LogicalPlan::Extension(Extension {
922            node: Arc::new(manipulate),
923        }))
924    }
925
926    /// Builds a projection plan for the PromQL `timestamp()` function.
927    /// Projects the time index column as the value column for each row.
928    ///
929    /// # Arguments
930    /// * `normalize` - Input [`LogicalPlan`] for the normalized series.
931    ///
932    /// # Returns
933    /// Returns a [`Result<LogicalPlan>`] where the resulting logical plan projects the timestamp
934    /// column as the value column, along with the original tag and time index columns.
935    ///
936    /// # Timestamp vs. Time Function
937    ///
938    /// - **Timestamp Function (`timestamp()`)**: In PromQL, the `timestamp()` function returns the
939    ///   timestamp (time index) of each sample as the value column.
940    ///
941    /// - **Time Function (`time()`)**: The `time()` function returns the evaluation time of the query
942    ///   as a scalar value.
943    ///
944    /// # Side Effects
945    /// Updates the planner context's field columns to the timestamp column name.
946    ///
947    fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
948        let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
949            .alias(DEFAULT_FIELD_COLUMN);
950        self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
951        let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
952        project_exprs.push(self.create_time_index_column_expr()?);
953        project_exprs.push(time_expr);
954        project_exprs.extend(self.create_tag_column_exprs()?);
955
956        LogicalPlanBuilder::from(normalize)
957            .project(project_exprs)
958            .context(DataFusionPlanningSnafu)?
959            .build()
960            .context(DataFusionPlanningSnafu)
961    }
962
963    async fn prom_matrix_selector_to_plan(
964        &mut self,
965        matrix_selector: &MatrixSelector,
966    ) -> Result<LogicalPlan> {
967        let MatrixSelector { vs, range } = matrix_selector;
968        let VectorSelector {
969            name,
970            offset,
971            matchers,
972            ..
973        } = vs;
974        let matchers = self.preprocess_label_matchers(matchers, name)?;
975        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
976        let range_ms = range.as_millis() as _;
977        self.ctx.range = Some(range_ms);
978
979        // Some functions like rate may require special fields in the RangeManipulate plan
980        // so we can't skip RangeManipulate.
981        let normalize = match self.setup_context().await? {
982            Some(empty_plan) => empty_plan,
983            None => {
984                self.selector_to_series_normalize_plan(offset, matchers, true)
985                    .await?
986            }
987        };
988        let manipulate = RangeManipulate::new(
989            self.ctx.start,
990            self.ctx.end,
991            self.ctx.interval,
992            // TODO(ruihang): convert via Timestamp datatypes to support different time units
993            range_ms,
994            self.ctx
995                .time_index_column
996                .clone()
997                .expect("time index should be set in `setup_context`"),
998            self.ctx.field_columns.clone(),
999            normalize,
1000        )
1001        .context(DataFusionPlanningSnafu)?;
1002
1003        Ok(LogicalPlan::Extension(Extension {
1004            node: Arc::new(manipulate),
1005        }))
1006    }
1007
1008    async fn prom_call_expr_to_plan(
1009        &mut self,
1010        query_engine_state: &QueryEngineState,
1011        call_expr: &Call,
1012    ) -> Result<LogicalPlan> {
1013        let Call { func, args } = call_expr;
1014        // some special functions that are not expression but a plan
1015        match func.name {
1016            SPECIAL_HISTOGRAM_QUANTILE => {
1017                return self.create_histogram_plan(args, query_engine_state).await;
1018            }
1019            SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
1020            SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
1021            SPECIAL_ABSENT_FUNCTION => {
1022                return self.create_absent_plan(args, query_engine_state).await;
1023            }
1024            _ => {}
1025        }
1026
1027        // transform function arguments
1028        let args = self.create_function_args(&args.args)?;
1029        let input = if let Some(prom_expr) = &args.input {
1030            self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
1031                .await?
1032        } else {
1033            self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1034            self.ctx.reset_table_name_and_schema();
1035            self.ctx.tag_columns = vec![];
1036            self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1037            LogicalPlan::Extension(Extension {
1038                node: Arc::new(
1039                    EmptyMetric::new(
1040                        self.ctx.start,
1041                        self.ctx.end,
1042                        self.ctx.interval,
1043                        SPECIAL_TIME_FUNCTION.to_string(),
1044                        DEFAULT_FIELD_COLUMN.to_string(),
1045                        None,
1046                    )
1047                    .context(DataFusionPlanningSnafu)?,
1048                ),
1049            })
1050        };
1051        let (mut func_exprs, new_tags) =
1052            self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
1053        func_exprs.insert(0, self.create_time_index_column_expr()?);
1054        func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
1055
1056        let builder = LogicalPlanBuilder::from(input)
1057            .project(func_exprs)
1058            .context(DataFusionPlanningSnafu)?
1059            .filter(self.create_empty_values_filter_expr()?)
1060            .context(DataFusionPlanningSnafu)?;
1061
1062        let builder = match func.name {
1063            "sort" => builder
1064                .sort(self.create_field_columns_sort_exprs(true))
1065                .context(DataFusionPlanningSnafu)?,
1066            "sort_desc" => builder
1067                .sort(self.create_field_columns_sort_exprs(false))
1068                .context(DataFusionPlanningSnafu)?,
1069            "sort_by_label" => builder
1070                .sort(Self::create_sort_exprs_by_tags(
1071                    func.name,
1072                    args.literals,
1073                    true,
1074                )?)
1075                .context(DataFusionPlanningSnafu)?,
1076            "sort_by_label_desc" => builder
1077                .sort(Self::create_sort_exprs_by_tags(
1078                    func.name,
1079                    args.literals,
1080                    false,
1081                )?)
1082                .context(DataFusionPlanningSnafu)?,
1083
1084            _ => builder,
1085        };
1086
1087        // Update context tags after building plan
1088        // We can't push them before planning, because they won't exist until projection.
1089        for tag in new_tags {
1090            self.ctx.tag_columns.push(tag);
1091        }
1092
1093        let plan = builder.build().context(DataFusionPlanningSnafu)?;
1094        common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
1095
1096        Ok(plan)
1097    }
1098
1099    async fn prom_ext_expr_to_plan(
1100        &mut self,
1101        query_engine_state: &QueryEngineState,
1102        ext_expr: &promql_parser::parser::ast::Extension,
1103    ) -> Result<LogicalPlan> {
1104        // let promql_parser::parser::ast::Extension { expr } = ext_expr;
1105        let expr = &ext_expr.expr;
1106        let children = expr.children();
1107        let plan = self
1108            .prom_expr_to_plan(&children[0], query_engine_state)
1109            .await?;
1110        // Wrapper for the explanation/analyze of the existing plan
1111        // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
1112        // if `analyze` is true, runs the actual plan and produces
1113        // information about metrics during run.
1114        // if `verbose` is true, prints out additional details when VERBOSE keyword is specified
1115        match expr.name() {
1116            ANALYZE_NODE_NAME => LogicalPlanBuilder::from(plan)
1117                .explain(false, true)
1118                .unwrap()
1119                .build()
1120                .context(DataFusionPlanningSnafu),
1121            ANALYZE_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1122                .explain(true, true)
1123                .unwrap()
1124                .build()
1125                .context(DataFusionPlanningSnafu),
1126            EXPLAIN_NODE_NAME => LogicalPlanBuilder::from(plan)
1127                .explain(false, false)
1128                .unwrap()
1129                .build()
1130                .context(DataFusionPlanningSnafu),
1131            EXPLAIN_VERBOSE_NODE_NAME => LogicalPlanBuilder::from(plan)
1132                .explain(true, false)
1133                .unwrap()
1134                .build()
1135                .context(DataFusionPlanningSnafu),
1136            ALIAS_NODE_NAME => {
1137                let alias = expr
1138                    .as_any()
1139                    .downcast_ref::<AliasExpr>()
1140                    .context(UnexpectedPlanExprSnafu {
1141                        desc: "Expected AliasExpr",
1142                    })?
1143                    .alias
1144                    .clone();
1145                self.apply_alias(plan, alias)
1146            }
1147            _ => LogicalPlanBuilder::empty(true)
1148                .build()
1149                .context(DataFusionPlanningSnafu),
1150        }
1151    }
1152
1153    /// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].
1154    /// Returns a new [Matchers] that doesn't contain metric name matcher.
1155    ///
1156    /// Each call to this function means new selector is started. Thus, the context will be reset
1157    /// at first.
1158    ///
1159    /// Name rule:
1160    /// - if `name` is some, then the matchers MUST NOT contain `__name__` matcher.
1161    /// - if `name` is none, then the matchers MAY contain NONE OR MULTIPLE `__name__` matchers.
1162    #[allow(clippy::mutable_key_type)]
1163    fn preprocess_label_matchers(
1164        &mut self,
1165        label_matchers: &Matchers,
1166        name: &Option<String>,
1167    ) -> Result<Matchers> {
1168        self.ctx.reset();
1169
1170        let metric_name;
1171        if let Some(name) = name.clone() {
1172            metric_name = Some(name);
1173            ensure!(
1174                label_matchers.find_matchers(METRIC_NAME).is_empty(),
1175                MultipleMetricMatchersSnafu
1176            );
1177        } else {
1178            let mut matches = label_matchers.find_matchers(METRIC_NAME);
1179            ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1180            ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1181            ensure!(
1182                matches[0].op == MatchOp::Equal,
1183                UnsupportedMatcherOpSnafu {
1184                    matcher_op: matches[0].op.to_string(),
1185                    matcher: METRIC_NAME
1186                }
1187            );
1188            metric_name = matches.pop().map(|m| m.value);
1189        }
1190
1191        self.ctx.table_name = metric_name;
1192
1193        let mut matchers = HashSet::new();
1194        for matcher in &label_matchers.matchers {
1195            // TODO(ruihang): support other metric match ops
1196            if matcher.name == FIELD_COLUMN_MATCHER {
1197                self.ctx
1198                    .field_column_matcher
1199                    .get_or_insert_default()
1200                    .push(matcher.clone());
1201            } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1202                ensure!(
1203                    matcher.op == MatchOp::Equal,
1204                    UnsupportedMatcherOpSnafu {
1205                        matcher: matcher.name.clone(),
1206                        matcher_op: matcher.op.to_string(),
1207                    }
1208                );
1209                self.ctx.schema_name = Some(matcher.value.clone());
1210            } else if matcher.name != METRIC_NAME {
1211                self.ctx.selector_matcher.push(matcher.clone());
1212                let _ = matchers.insert(matcher.clone());
1213            }
1214        }
1215
1216        Ok(Matchers::new(matchers.into_iter().collect()))
1217    }
1218
1219    async fn selector_to_series_normalize_plan(
1220        &mut self,
1221        offset: &Option<Offset>,
1222        label_matchers: Matchers,
1223        is_range_selector: bool,
1224    ) -> Result<LogicalPlan> {
1225        // make table scan plan
1226        let table_ref = self.table_ref()?;
1227        let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1228        let table_schema = table_scan.schema();
1229
1230        // make filter exprs
1231        let offset_duration = match offset {
1232            Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1233            Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1234            None => 0,
1235        };
1236        let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1237        if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1238            scan_filters.push(time_index_filter);
1239        }
1240        table_scan = LogicalPlanBuilder::from(table_scan)
1241            .filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
1242            .context(DataFusionPlanningSnafu)?
1243            .build()
1244            .context(DataFusionPlanningSnafu)?;
1245
1246        // make a projection plan if there is any `__field__` matcher
1247        if let Some(field_matchers) = &self.ctx.field_column_matcher {
1248            let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1249            // opt-in set
1250            let mut result_set = HashSet::new();
1251            // opt-out set
1252            let mut reverse_set = HashSet::new();
1253            for matcher in field_matchers {
1254                match &matcher.op {
1255                    MatchOp::Equal => {
1256                        if col_set.contains(&matcher.value) {
1257                            let _ = result_set.insert(matcher.value.clone());
1258                        } else {
1259                            return Err(ColumnNotFoundSnafu {
1260                                col: matcher.value.clone(),
1261                            }
1262                            .build());
1263                        }
1264                    }
1265                    MatchOp::NotEqual => {
1266                        if col_set.contains(&matcher.value) {
1267                            let _ = reverse_set.insert(matcher.value.clone());
1268                        } else {
1269                            return Err(ColumnNotFoundSnafu {
1270                                col: matcher.value.clone(),
1271                            }
1272                            .build());
1273                        }
1274                    }
1275                    MatchOp::Re(regex) => {
1276                        for col in &self.ctx.field_columns {
1277                            if regex.is_match(col) {
1278                                let _ = result_set.insert(col.clone());
1279                            }
1280                        }
1281                    }
1282                    MatchOp::NotRe(regex) => {
1283                        for col in &self.ctx.field_columns {
1284                            if regex.is_match(col) {
1285                                let _ = reverse_set.insert(col.clone());
1286                            }
1287                        }
1288                    }
1289                }
1290            }
1291            // merge two set
1292            if result_set.is_empty() {
1293                result_set = col_set.into_iter().cloned().collect();
1294            }
1295            for col in reverse_set {
1296                let _ = result_set.remove(&col);
1297            }
1298
1299            // mask the field columns in context using computed result set
1300            self.ctx.field_columns = self
1301                .ctx
1302                .field_columns
1303                .drain(..)
1304                .filter(|col| result_set.contains(col))
1305                .collect();
1306
1307            let exprs = result_set
1308                .into_iter()
1309                .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1310                .chain(self.create_tag_column_exprs()?)
1311                .chain(
1312                    self.ctx
1313                        .use_tsid
1314                        .then_some(DfExpr::Column(Column::new_unqualified(
1315                            DATA_SCHEMA_TSID_COLUMN_NAME,
1316                        )))
1317                        .into_iter(),
1318                )
1319                .chain(Some(self.create_time_index_column_expr()?))
1320                .collect::<Vec<_>>();
1321
1322            // reuse this variable for simplicity
1323            table_scan = LogicalPlanBuilder::from(table_scan)
1324                .project(exprs)
1325                .context(DataFusionPlanningSnafu)?
1326                .build()
1327                .context(DataFusionPlanningSnafu)?;
1328        }
1329
1330        // make sort plan
1331        let series_key_columns = if self.ctx.use_tsid {
1332            vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
1333        } else {
1334            self.ctx.tag_columns.clone()
1335        };
1336
1337        let sort_exprs = if self.ctx.use_tsid {
1338            vec![
1339                DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true),
1340                self.create_time_index_column_expr()?.sort(true, true),
1341            ]
1342        } else {
1343            self.create_tag_and_time_index_column_sort_exprs()?
1344        };
1345
1346        let sort_plan = LogicalPlanBuilder::from(table_scan)
1347            .sort(sort_exprs)
1348            .context(DataFusionPlanningSnafu)?
1349            .build()
1350            .context(DataFusionPlanningSnafu)?;
1351
1352        // make divide plan
1353        let time_index_column =
1354            self.ctx
1355                .time_index_column
1356                .clone()
1357                .with_context(|| TimeIndexNotFoundSnafu {
1358                    table: table_ref.to_string(),
1359                })?;
1360        let divide_plan = LogicalPlan::Extension(Extension {
1361            node: Arc::new(SeriesDivide::new(
1362                series_key_columns.clone(),
1363                time_index_column,
1364                sort_plan,
1365            )),
1366        });
1367
1368        // make series_normalize plan
1369        if !is_range_selector && offset_duration == 0 {
1370            return Ok(divide_plan);
1371        }
1372        let series_normalize = SeriesNormalize::new(
1373            offset_duration,
1374            self.ctx
1375                .time_index_column
1376                .clone()
1377                .with_context(|| TimeIndexNotFoundSnafu {
1378                    table: table_ref.to_quoted_string(),
1379                })?,
1380            is_range_selector,
1381            series_key_columns,
1382            divide_plan,
1383        );
1384        let logical_plan = LogicalPlan::Extension(Extension {
1385            node: Arc::new(series_normalize),
1386        });
1387
1388        Ok(logical_plan)
1389    }
1390
1391    /// Convert [LabelModifier] to [Column] exprs for aggregation.
1392    /// Timestamp column and tag columns will be included.
1393    ///
1394    /// # Side effect
1395    ///
1396    /// This method will also change the tag columns in ctx if `update_ctx` is true.
1397    fn agg_modifier_to_col(
1398        &mut self,
1399        input_schema: &DFSchemaRef,
1400        modifier: &Option<LabelModifier>,
1401        update_ctx: bool,
1402    ) -> Result<Vec<DfExpr>> {
1403        match modifier {
1404            None => {
1405                if update_ctx {
1406                    self.ctx.tag_columns.clear();
1407                }
1408                Ok(vec![self.create_time_index_column_expr()?])
1409            }
1410            Some(LabelModifier::Include(labels)) => {
1411                if update_ctx {
1412                    self.ctx.tag_columns.clear();
1413                }
1414                let mut exprs = Vec::with_capacity(labels.labels.len());
1415                for label in &labels.labels {
1416                    if is_metric_engine_internal_column(label) {
1417                        continue;
1418                    }
1419                    // nonexistence label will be ignored
1420                    if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label)
1421                    {
1422                        exprs.push(DfExpr::Column(Column::from_name(column_name.clone())));
1423
1424                        if update_ctx {
1425                            // update the tag columns in context
1426                            self.ctx.tag_columns.push(column_name);
1427                        }
1428                    }
1429                }
1430                // add timestamp column
1431                exprs.push(self.create_time_index_column_expr()?);
1432
1433                Ok(exprs)
1434            }
1435            Some(LabelModifier::Exclude(labels)) => {
1436                let mut all_fields = input_schema
1437                    .fields()
1438                    .iter()
1439                    .map(|f| f.name())
1440                    .collect::<BTreeSet<_>>();
1441
1442                // Exclude metric engine internal columns (not PromQL labels) from the implicit
1443                // "without" label set.
1444                all_fields.retain(|col| !is_metric_engine_internal_column(col.as_str()));
1445
1446                // remove "without"-ed fields
1447                // nonexistence label will be ignored
1448                for label in &labels.labels {
1449                    let _ = all_fields.remove(label);
1450                }
1451
1452                // remove time index and value fields
1453                if let Some(time_index) = &self.ctx.time_index_column {
1454                    let _ = all_fields.remove(time_index);
1455                }
1456                for value in &self.ctx.field_columns {
1457                    let _ = all_fields.remove(value);
1458                }
1459
1460                if update_ctx {
1461                    // change the tag columns in context
1462                    self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1463                }
1464
1465                // collect remaining fields and convert to col expr
1466                let mut exprs = all_fields
1467                    .into_iter()
1468                    .map(|c| DfExpr::Column(Column::from(c)))
1469                    .collect::<Vec<_>>();
1470
1471                // add timestamp column
1472                exprs.push(self.create_time_index_column_expr()?);
1473
1474                Ok(exprs)
1475            }
1476        }
1477    }
1478
1479    // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
1480    pub fn matchers_to_expr(
1481        label_matchers: Matchers,
1482        table_schema: &DFSchemaRef,
1483    ) -> Result<Vec<DfExpr>> {
1484        let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1485        for matcher in label_matchers.matchers {
1486            if matcher.name == SCHEMA_COLUMN_MATCHER
1487                || matcher.name == DB_COLUMN_MATCHER
1488                || matcher.name == FIELD_COLUMN_MATCHER
1489            {
1490                continue;
1491            }
1492
1493            let column_name = Self::find_case_sensitive_column(table_schema, matcher.name.as_str());
1494            let col = if let Some(column_name) = column_name {
1495                DfExpr::Column(Column::from_name(column_name))
1496            } else {
1497                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
1498                    .alias(matcher.name.clone())
1499            };
1500            let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1501            let expr = match matcher.op {
1502                MatchOp::Equal => col.eq(lit),
1503                MatchOp::NotEqual => col.not_eq(lit),
1504                MatchOp::Re(re) => {
1505                    // TODO(ruihang): a more programmatic way to handle this in datafusion
1506
1507                    // This is a hack to handle `.+` and `.*`, and is not strictly correct
1508                    // `.` doesn't match newline (`\n`). Given this is in PromQL context,
1509                    // most of the time it's fine.
1510                    if re.as_str() == "^(?:.*)$" {
1511                        continue;
1512                    }
1513                    if re.as_str() == "^(?:.+)$" {
1514                        col.not_eq(DfExpr::Literal(
1515                            ScalarValue::Utf8(Some(String::new())),
1516                            None,
1517                        ))
1518                    } else {
1519                        DfExpr::BinaryExpr(BinaryExpr {
1520                            left: Box::new(col),
1521                            op: Operator::RegexMatch,
1522                            right: Box::new(DfExpr::Literal(
1523                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1524                                None,
1525                            )),
1526                        })
1527                    }
1528                }
1529                MatchOp::NotRe(re) => {
1530                    if re.as_str() == "^(?:.*)$" {
1531                        DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1532                    } else if re.as_str() == "^(?:.+)$" {
1533                        col.eq(DfExpr::Literal(
1534                            ScalarValue::Utf8(Some(String::new())),
1535                            None,
1536                        ))
1537                    } else {
1538                        DfExpr::BinaryExpr(BinaryExpr {
1539                            left: Box::new(col),
1540                            op: Operator::RegexNotMatch,
1541                            right: Box::new(DfExpr::Literal(
1542                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1543                                None,
1544                            )),
1545                        })
1546                    }
1547                }
1548            };
1549            exprs.push(expr);
1550        }
1551
1552        Ok(exprs)
1553    }
1554
1555    fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option<String> {
1556        if is_metric_engine_internal_column(column) {
1557            return None;
1558        }
1559        schema
1560            .fields()
1561            .iter()
1562            .find(|field| field.name() == column)
1563            .map(|field| field.name().clone())
1564    }
1565
1566    fn table_from_source(&self, source: &Arc<dyn TableSource>) -> Result<table::TableRef> {
1567        Ok(source
1568            .as_any()
1569            .downcast_ref::<DefaultTableSource>()
1570            .context(UnknownTableSnafu)?
1571            .table_provider
1572            .as_any()
1573            .downcast_ref::<DfTableProviderAdapter>()
1574            .context(UnknownTableSnafu)?
1575            .table())
1576    }
1577
1578    fn table_ref(&self) -> Result<TableReference> {
1579        let table_name = self
1580            .ctx
1581            .table_name
1582            .clone()
1583            .context(TableNameNotFoundSnafu)?;
1584
1585        // set schema name if `__schema__` is given
1586        let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1587            TableReference::partial(schema_name.as_str(), table_name.as_str())
1588        } else {
1589            TableReference::bare(table_name.as_str())
1590        };
1591
1592        Ok(table_ref)
1593    }
1594
1595    fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1596        let start = self.ctx.start;
1597        let end = self.ctx.end;
1598        if end < start {
1599            return InvalidTimeRangeSnafu { start, end }.fail();
1600        }
1601        let lookback_delta = self.ctx.lookback_delta;
1602        let range = self.ctx.range.unwrap_or_default();
1603        let interval = self.ctx.interval;
1604        let time_index_expr = self.create_time_index_column_expr()?;
1605        let num_points = (end - start) / interval;
1606
1607        // Prometheus semantics:
1608        // - Instant selector lookback: (eval_ts - lookback_delta, eval_ts]
1609        // - Range selector:           (eval_ts - range, eval_ts]
1610        //
1611        // So samples positioned exactly at the lower boundary must be excluded. We align the scan
1612        // lower bound with Prometheus by shifting it forward by 1ms (millisecond granularity),
1613        // while still using a `>=` filter.
1614        let selector_window = if range == 0 { lookback_delta } else { range };
1615        let lower_exclusive_adjustment = if selector_window > 0 { 1 } else { 0 };
1616
1617        // Scan a continuous time range
1618        if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1619            let single_time_range = time_index_expr
1620                .clone()
1621                .gt_eq(DfExpr::Literal(
1622                    ScalarValue::TimestampMillisecond(
1623                        Some(
1624                            self.ctx.start - offset_duration - selector_window
1625                                + lower_exclusive_adjustment,
1626                        ),
1627                        None,
1628                    ),
1629                    None,
1630                ))
1631                .and(time_index_expr.lt_eq(DfExpr::Literal(
1632                    ScalarValue::TimestampMillisecond(Some(self.ctx.end - offset_duration), None),
1633                    None,
1634                )));
1635            return Ok(Some(single_time_range));
1636        }
1637
1638        // Otherwise scan scatter ranges separately
1639        let mut filters = Vec::with_capacity(num_points as usize + 1);
1640        for timestamp in (start..=end).step_by(interval as usize) {
1641            filters.push(
1642                time_index_expr
1643                    .clone()
1644                    .gt_eq(DfExpr::Literal(
1645                        ScalarValue::TimestampMillisecond(
1646                            Some(
1647                                timestamp - offset_duration - selector_window
1648                                    + lower_exclusive_adjustment,
1649                            ),
1650                            None,
1651                        ),
1652                        None,
1653                    ))
1654                    .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1655                        ScalarValue::TimestampMillisecond(Some(timestamp - offset_duration), None),
1656                        None,
1657                    ))),
1658            )
1659        }
1660
1661        Ok(filters.into_iter().reduce(DfExpr::or))
1662    }
1663
1664    /// Create a table scan plan and a filter plan with given filter.
1665    ///
1666    /// # Panic
1667    /// If the filter is empty
1668    async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1669        let provider = self
1670            .table_provider
1671            .resolve_table(table_ref.clone())
1672            .await
1673            .context(CatalogSnafu)?;
1674
1675        let logical_table = self.table_from_source(&provider)?;
1676
1677        // Try to rewrite the table scan to physical table scan if possible.
1678        let mut maybe_phy_table_ref = table_ref.clone();
1679        let mut scan_provider = provider;
1680        let mut table_id_filter: Option<u32> = None;
1681
1682        // If it's a metric engine logical table, scan its physical table directly and filter by
1683        // `__table_id = logical_table_id` to get access to internal columns like `__tsid`.
1684        if logical_table.table_info().meta.engine == METRIC_ENGINE_NAME
1685            && let Some(physical_table_name) = logical_table
1686                .table_info()
1687                .meta
1688                .options
1689                .extra_options
1690                .get(LOGICAL_TABLE_METADATA_KEY)
1691        {
1692            let physical_table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1693                TableReference::partial(schema_name.as_str(), physical_table_name.as_str())
1694            } else {
1695                TableReference::bare(physical_table_name.as_str())
1696            };
1697
1698            let physical_provider = match self
1699                .table_provider
1700                .resolve_table(physical_table_ref.clone())
1701                .await
1702            {
1703                Ok(provider) => provider,
1704                Err(e) if e.status_code() == StatusCode::TableNotFound => {
1705                    // Fall back to scanning the logical table. It still works, but without
1706                    // `__tsid` optimization.
1707                    scan_provider.clone()
1708                }
1709                Err(e) => return Err(e).context(CatalogSnafu),
1710            };
1711
1712            if !Arc::ptr_eq(&physical_provider, &scan_provider) {
1713                // Only rewrite when internal columns exist in physical schema.
1714                let physical_table = self.table_from_source(&physical_provider)?;
1715
1716                let has_table_id = physical_table
1717                    .schema()
1718                    .column_schema_by_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
1719                    .is_some();
1720                let has_tsid = physical_table
1721                    .schema()
1722                    .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1723                    .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1724
1725                if has_table_id && has_tsid {
1726                    scan_provider = physical_provider;
1727                    maybe_phy_table_ref = physical_table_ref;
1728                    table_id_filter = Some(logical_table.table_info().ident.table_id);
1729                }
1730            }
1731        }
1732
1733        let scan_table = self.table_from_source(&scan_provider)?;
1734
1735        let use_tsid = table_id_filter.is_some()
1736            && scan_table
1737                .schema()
1738                .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1739                .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1740        self.ctx.use_tsid = use_tsid;
1741
1742        let all_table_tags = self.ctx.tag_columns.clone();
1743
1744        let scan_tag_columns = if use_tsid {
1745            let mut scan_tags = self.ctx.tag_columns.clone();
1746            for matcher in &self.ctx.selector_matcher {
1747                if is_metric_engine_internal_column(&matcher.name) {
1748                    continue;
1749                }
1750                if all_table_tags.iter().any(|tag| tag == &matcher.name) {
1751                    scan_tags.push(matcher.name.clone());
1752                }
1753            }
1754            scan_tags.sort_unstable();
1755            scan_tags.dedup();
1756            scan_tags
1757        } else {
1758            self.ctx.tag_columns.clone()
1759        };
1760
1761        let is_time_index_ms = scan_table
1762            .schema()
1763            .timestamp_column()
1764            .with_context(|| TimeIndexNotFoundSnafu {
1765                table: maybe_phy_table_ref.to_quoted_string(),
1766            })?
1767            .data_type
1768            == ConcreteDataType::timestamp_millisecond_datatype();
1769
1770        let scan_projection = if table_id_filter.is_some() {
1771            let mut required_columns = HashSet::new();
1772            required_columns.insert(DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string());
1773            required_columns.insert(self.ctx.time_index_column.clone().with_context(|| {
1774                TimeIndexNotFoundSnafu {
1775                    table: maybe_phy_table_ref.to_quoted_string(),
1776                }
1777            })?);
1778            for col in &scan_tag_columns {
1779                required_columns.insert(col.clone());
1780            }
1781            for col in &self.ctx.field_columns {
1782                required_columns.insert(col.clone());
1783            }
1784            if use_tsid {
1785                required_columns.insert(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
1786            }
1787
1788            let arrow_schema = scan_table.schema().arrow_schema().clone();
1789            Some(
1790                arrow_schema
1791                    .fields()
1792                    .iter()
1793                    .enumerate()
1794                    .filter(|(_, field)| required_columns.contains(field.name().as_str()))
1795                    .map(|(idx, _)| idx)
1796                    .collect::<Vec<_>>(),
1797            )
1798        } else {
1799            None
1800        };
1801
1802        let mut scan_plan =
1803            LogicalPlanBuilder::scan(maybe_phy_table_ref.clone(), scan_provider, scan_projection)
1804                .context(DataFusionPlanningSnafu)?
1805                .build()
1806                .context(DataFusionPlanningSnafu)?;
1807
1808        if let Some(table_id) = table_id_filter {
1809            scan_plan = LogicalPlanBuilder::from(scan_plan)
1810                .filter(
1811                    DfExpr::Column(Column::from_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME))
1812                        .eq(lit(table_id)),
1813                )
1814                .context(DataFusionPlanningSnafu)?
1815                .alias(table_ref.clone()) // rename the relation back to logical table's name after filtering
1816                .context(DataFusionPlanningSnafu)?
1817                .build()
1818                .context(DataFusionPlanningSnafu)?;
1819        }
1820
1821        if !is_time_index_ms {
1822            // cast to ms if time_index not in Millisecond precision
1823            let expr: Vec<_> = self
1824                .create_field_column_exprs()?
1825                .into_iter()
1826                .chain(
1827                    scan_tag_columns
1828                        .iter()
1829                        .map(|tag| DfExpr::Column(Column::from_name(tag))),
1830                )
1831                .chain(
1832                    self.ctx
1833                        .use_tsid
1834                        .then_some(DfExpr::Column(Column::new(
1835                            Some(table_ref.clone()),
1836                            DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
1837                        )))
1838                        .into_iter(),
1839                )
1840                .chain(Some(DfExpr::Alias(Alias {
1841                    expr: Box::new(DfExpr::Cast(Cast {
1842                        expr: Box::new(self.create_time_index_column_expr()?),
1843                        data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1844                    })),
1845                    relation: Some(table_ref.clone()),
1846                    name: self
1847                        .ctx
1848                        .time_index_column
1849                        .as_ref()
1850                        .with_context(|| TimeIndexNotFoundSnafu {
1851                            table: table_ref.to_quoted_string(),
1852                        })?
1853                        .clone(),
1854                    metadata: None,
1855                })))
1856                .collect::<Vec<_>>();
1857            scan_plan = LogicalPlanBuilder::from(scan_plan)
1858                .project(expr)
1859                .context(DataFusionPlanningSnafu)?
1860                .build()
1861                .context(DataFusionPlanningSnafu)?;
1862        } else if table_id_filter.is_some() {
1863            // Drop the internal `__table_id` column after filtering.
1864            let project_exprs = self
1865                .create_field_column_exprs()?
1866                .into_iter()
1867                .chain(
1868                    scan_tag_columns
1869                        .iter()
1870                        .map(|tag| DfExpr::Column(Column::from_name(tag))),
1871                )
1872                .chain(
1873                    self.ctx
1874                        .use_tsid
1875                        .then_some(DfExpr::Column(Column::from_name(
1876                            DATA_SCHEMA_TSID_COLUMN_NAME,
1877                        )))
1878                        .into_iter(),
1879                )
1880                .chain(Some(self.create_time_index_column_expr()?))
1881                .collect::<Vec<_>>();
1882
1883            scan_plan = LogicalPlanBuilder::from(scan_plan)
1884                .project(project_exprs)
1885                .context(DataFusionPlanningSnafu)?
1886                .build()
1887                .context(DataFusionPlanningSnafu)?;
1888        }
1889
1890        let result = LogicalPlanBuilder::from(scan_plan)
1891            .build()
1892            .context(DataFusionPlanningSnafu)?;
1893        Ok(result)
1894    }
1895
1896    fn collect_row_key_tag_columns_from_plan(
1897        &self,
1898        plan: &LogicalPlan,
1899    ) -> Result<BTreeSet<String>> {
1900        fn walk(
1901            planner: &PromPlanner,
1902            plan: &LogicalPlan,
1903            out: &mut BTreeSet<String>,
1904        ) -> Result<()> {
1905            if let LogicalPlan::TableScan(scan) = plan {
1906                let table = planner.table_from_source(&scan.source)?;
1907                for col in table.table_info().meta.row_key_column_names() {
1908                    if col != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
1909                        && col != DATA_SCHEMA_TSID_COLUMN_NAME
1910                        && !is_metric_engine_internal_column(col)
1911                    {
1912                        out.insert(col.clone());
1913                    }
1914                }
1915            }
1916
1917            for input in plan.inputs() {
1918                walk(planner, input, out)?;
1919            }
1920            Ok(())
1921        }
1922
1923        let mut out = BTreeSet::new();
1924        walk(self, plan, &mut out)?;
1925        Ok(out)
1926    }
1927
1928    fn ensure_tag_columns_available(
1929        &self,
1930        plan: LogicalPlan,
1931        required_tags: &BTreeSet<String>,
1932    ) -> Result<LogicalPlan> {
1933        if required_tags.is_empty() {
1934            return Ok(plan);
1935        }
1936
1937        struct Rewriter {
1938            required_tags: BTreeSet<String>,
1939        }
1940
1941        impl TreeNodeRewriter for Rewriter {
1942            type Node = LogicalPlan;
1943
1944            fn f_up(
1945                &mut self,
1946                node: Self::Node,
1947            ) -> datafusion_common::Result<Transformed<Self::Node>> {
1948                match node {
1949                    LogicalPlan::TableScan(scan) => {
1950                        let schema = scan.source.schema();
1951                        let mut projection = match scan.projection.clone() {
1952                            Some(p) => p,
1953                            None => {
1954                                // Scanning all columns already covers required tags.
1955                                return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1956                            }
1957                        };
1958
1959                        let mut changed = false;
1960                        for tag in &self.required_tags {
1961                            if let Some((idx, _)) = schema
1962                                .fields()
1963                                .iter()
1964                                .enumerate()
1965                                .find(|(_, field)| field.name() == tag)
1966                                && !projection.contains(&idx)
1967                            {
1968                                projection.push(idx);
1969                                changed = true;
1970                            }
1971                        }
1972
1973                        if !changed {
1974                            return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1975                        }
1976
1977                        projection.sort_unstable();
1978                        projection.dedup();
1979
1980                        let new_scan = TableScan::try_new(
1981                            scan.table_name.clone(),
1982                            scan.source.clone(),
1983                            Some(projection),
1984                            scan.filters,
1985                            scan.fetch,
1986                        )?;
1987                        Ok(Transformed::yes(LogicalPlan::TableScan(new_scan)))
1988                    }
1989                    LogicalPlan::Projection(proj) => {
1990                        let input_schema = proj.input.schema();
1991
1992                        let existing = proj
1993                            .schema
1994                            .fields()
1995                            .iter()
1996                            .map(|f| f.name().as_str())
1997                            .collect::<HashSet<_>>();
1998
1999                        let mut expr = proj.expr.clone();
2000                        let mut has_changed = false;
2001                        for tag in &self.required_tags {
2002                            if existing.contains(tag.as_str()) {
2003                                continue;
2004                            }
2005
2006                            if let Some(idx) = input_schema.index_of_column_by_name(None, tag) {
2007                                expr.push(DfExpr::Column(Column::from(
2008                                    input_schema.qualified_field(idx),
2009                                )));
2010                                has_changed = true;
2011                            }
2012                        }
2013
2014                        if !has_changed {
2015                            return Ok(Transformed::no(LogicalPlan::Projection(proj)));
2016                        }
2017
2018                        let new_proj = Projection::try_new(expr, proj.input)?;
2019                        Ok(Transformed::yes(LogicalPlan::Projection(new_proj)))
2020                    }
2021                    other => Ok(Transformed::no(other)),
2022                }
2023            }
2024        }
2025
2026        let mut rewriter = Rewriter {
2027            required_tags: required_tags.clone(),
2028        };
2029        let rewritten = plan
2030            .rewrite(&mut rewriter)
2031            .context(DataFusionPlanningSnafu)?;
2032        Ok(rewritten.data)
2033    }
2034
2035    fn refresh_tag_columns_from_schema(&mut self, schema: &DFSchemaRef) {
2036        let time_index = self.ctx.time_index_column.as_deref();
2037        let field_columns = self.ctx.field_columns.iter().collect::<HashSet<_>>();
2038
2039        let mut tags = schema
2040            .fields()
2041            .iter()
2042            .map(|f| f.name())
2043            .filter(|name| Some(name.as_str()) != time_index)
2044            .filter(|name| !field_columns.contains(name))
2045            .filter(|name| !is_metric_engine_internal_column(name))
2046            .cloned()
2047            .collect::<Vec<_>>();
2048        tags.sort_unstable();
2049        tags.dedup();
2050        self.ctx.tag_columns = tags;
2051    }
2052
2053    /// Setup [PromPlannerContext]'s state fields.
2054    ///
2055    /// Returns a logical plan for an empty metric.
2056    async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
2057        let table_ref = self.table_ref()?;
2058        let source = match self.table_provider.resolve_table(table_ref.clone()).await {
2059            Err(e) if e.status_code() == StatusCode::TableNotFound => {
2060                let plan = self.setup_context_for_empty_metric()?;
2061                return Ok(Some(plan));
2062            }
2063            res => res.context(CatalogSnafu)?,
2064        };
2065        let table = self.table_from_source(&source)?;
2066
2067        // set time index column name
2068        let time_index = table
2069            .schema()
2070            .timestamp_column()
2071            .with_context(|| TimeIndexNotFoundSnafu {
2072                table: table_ref.to_quoted_string(),
2073            })?
2074            .name
2075            .clone();
2076        self.ctx.time_index_column = Some(time_index);
2077
2078        // set values columns
2079        let values = table
2080            .table_info()
2081            .meta
2082            .field_column_names()
2083            .cloned()
2084            .collect();
2085        self.ctx.field_columns = values;
2086
2087        // set primary key (tag) columns
2088        let tags = table
2089            .table_info()
2090            .meta
2091            .row_key_column_names()
2092            .filter(|col| {
2093                // remove metric engine's internal columns
2094                col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
2095            })
2096            .cloned()
2097            .collect();
2098        self.ctx.tag_columns = tags;
2099
2100        self.ctx.use_tsid = false;
2101
2102        Ok(None)
2103    }
2104
2105    /// Setup [PromPlannerContext]'s state fields for a non existent table
2106    /// without any rows.
2107    fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
2108        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2109        self.ctx.reset_table_name_and_schema();
2110        self.ctx.tag_columns = vec![];
2111        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
2112        self.ctx.use_tsid = false;
2113
2114        // The table doesn't have any data, so we set start to 0 and end to -1.
2115        let plan = LogicalPlan::Extension(Extension {
2116            node: Arc::new(
2117                EmptyMetric::new(
2118                    0,
2119                    -1,
2120                    self.ctx.interval,
2121                    SPECIAL_TIME_FUNCTION.to_string(),
2122                    DEFAULT_FIELD_COLUMN.to_string(),
2123                    Some(lit(0.0f64)),
2124                )
2125                .context(DataFusionPlanningSnafu)?,
2126            ),
2127        });
2128        Ok(plan)
2129    }
2130
2131    // TODO(ruihang): insert column expr
2132    fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
2133        let mut result = FunctionArgs::default();
2134
2135        for arg in args {
2136            // First try to parse as literal expression (including binary expressions like 100.0 + 3.0)
2137            if let Some(expr) = Self::try_build_literal_expr(arg) {
2138                result.literals.push(expr);
2139            } else {
2140                // If not a literal, treat as vector input
2141                match arg.as_ref() {
2142                    PromExpr::Subquery(_)
2143                    | PromExpr::VectorSelector(_)
2144                    | PromExpr::MatrixSelector(_)
2145                    | PromExpr::Extension(_)
2146                    | PromExpr::Aggregate(_)
2147                    | PromExpr::Paren(_)
2148                    | PromExpr::Call(_)
2149                    | PromExpr::Binary(_)
2150                    | PromExpr::Unary(_) => {
2151                        if result.input.replace(*arg.clone()).is_some() {
2152                            MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
2153                        }
2154                    }
2155
2156                    _ => {
2157                        let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
2158                        result.literals.push(expr);
2159                    }
2160                }
2161            }
2162        }
2163
2164        Ok(result)
2165    }
2166
2167    /// Creates function expressions for projection and returns the expressions and new tags.
2168    ///
2169    /// # Side Effects
2170    ///
2171    /// This method will update [PromPlannerContext]'s fields and tags if needed.
2172    fn create_function_expr(
2173        &mut self,
2174        func: &Function,
2175        other_input_exprs: Vec<DfExpr>,
2176        query_engine_state: &QueryEngineState,
2177    ) -> Result<(Vec<DfExpr>, Vec<String>)> {
2178        // TODO(ruihang): check function args list
2179        let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
2180
2181        // TODO(ruihang): set this according to in-param list
2182        let field_column_pos = 0;
2183        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2184        // New labels after executing the function, e.g. `label_replace` etc.
2185        let mut new_tags = vec![];
2186        let scalar_func = match func.name {
2187            "increase" => ScalarFunc::ExtrapolateUdf(
2188                Arc::new(Increase::scalar_udf()),
2189                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2190            ),
2191            "rate" => ScalarFunc::ExtrapolateUdf(
2192                Arc::new(Rate::scalar_udf()),
2193                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2194            ),
2195            "delta" => ScalarFunc::ExtrapolateUdf(
2196                Arc::new(Delta::scalar_udf()),
2197                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2198            ),
2199            "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
2200            "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
2201            "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
2202            "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
2203            "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
2204            "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
2205            "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
2206            "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
2207            "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
2208            "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
2209            "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
2210            "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
2211            "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
2212            "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
2213            "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
2214            "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
2215            "predict_linear" => {
2216                other_input_exprs[0] = DfExpr::Cast(Cast {
2217                    expr: Box::new(other_input_exprs[0].clone()),
2218                    data_type: ArrowDataType::Int64,
2219                });
2220                ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
2221            }
2222            "double_exponential_smoothing" | "holt_winters" => {
2223                ScalarFunc::Udf(Arc::new(DoubleExponentialSmoothing::scalar_udf()))
2224            }
2225            "time" => {
2226                exprs.push(build_special_time_expr(
2227                    self.ctx.time_index_column.as_ref().unwrap(),
2228                ));
2229                ScalarFunc::GeneratedExpr
2230            }
2231            "minute" => {
2232                // date_part('minute', time_index)
2233                let expr = self.date_part_on_time_index("minute")?;
2234                exprs.push(expr);
2235                ScalarFunc::GeneratedExpr
2236            }
2237            "hour" => {
2238                // date_part('hour', time_index)
2239                let expr = self.date_part_on_time_index("hour")?;
2240                exprs.push(expr);
2241                ScalarFunc::GeneratedExpr
2242            }
2243            "month" => {
2244                // date_part('month', time_index)
2245                let expr = self.date_part_on_time_index("month")?;
2246                exprs.push(expr);
2247                ScalarFunc::GeneratedExpr
2248            }
2249            "year" => {
2250                // date_part('year', time_index)
2251                let expr = self.date_part_on_time_index("year")?;
2252                exprs.push(expr);
2253                ScalarFunc::GeneratedExpr
2254            }
2255            "day_of_month" => {
2256                // date_part('day', time_index)
2257                let expr = self.date_part_on_time_index("day")?;
2258                exprs.push(expr);
2259                ScalarFunc::GeneratedExpr
2260            }
2261            "day_of_week" => {
2262                // date_part('dow', time_index)
2263                let expr = self.date_part_on_time_index("dow")?;
2264                exprs.push(expr);
2265                ScalarFunc::GeneratedExpr
2266            }
2267            "day_of_year" => {
2268                // date_part('doy', time_index)
2269                let expr = self.date_part_on_time_index("doy")?;
2270                exprs.push(expr);
2271                ScalarFunc::GeneratedExpr
2272            }
2273            "days_in_month" => {
2274                // date_part(
2275                //     'days',
2276                //     (date_trunc('month', <TIME INDEX>::date) + interval '1 month - 1 day')
2277                // );
2278                let day_lit_expr = "day".lit();
2279                let month_lit_expr = "month".lit();
2280                let interval_1month_lit_expr =
2281                    DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
2282                let interval_1day_lit_expr = DfExpr::Literal(
2283                    ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
2284                    None,
2285                );
2286                let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
2287                    left: Box::new(interval_1month_lit_expr),
2288                    op: Operator::Minus,
2289                    right: Box::new(interval_1day_lit_expr),
2290                });
2291                let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
2292                    func: datafusion_functions::datetime::date_trunc(),
2293                    args: vec![month_lit_expr, self.create_time_index_column_expr()?],
2294                });
2295                let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
2296                    left: Box::new(date_trunc_expr),
2297                    op: Operator::Plus,
2298                    right: Box::new(the_1month_minus_1day_expr),
2299                });
2300                let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
2301                    func: datafusion_functions::datetime::date_part(),
2302                    args: vec![day_lit_expr, date_trunc_plus_interval_expr],
2303                });
2304
2305                exprs.push(date_part_expr);
2306                ScalarFunc::GeneratedExpr
2307            }
2308
2309            "label_join" => {
2310                let (concat_expr, dst_label) = Self::build_concat_labels_expr(
2311                    &mut other_input_exprs,
2312                    &self.ctx,
2313                    query_engine_state,
2314                )?;
2315
2316                // Reserve the current field columns except the `dst_label`.
2317                for value in &self.ctx.field_columns {
2318                    if *value != dst_label {
2319                        let expr = DfExpr::Column(Column::from_name(value));
2320                        exprs.push(expr);
2321                    }
2322                }
2323
2324                // Remove it from tag columns if exists to avoid duplicated column names
2325                self.ctx.tag_columns.retain(|tag| *tag != dst_label);
2326                new_tags.push(dst_label);
2327                // Add the new label expr to evaluate
2328                exprs.push(concat_expr);
2329
2330                ScalarFunc::GeneratedExpr
2331            }
2332            "label_replace" => {
2333                if let Some((replace_expr, dst_label)) = self
2334                    .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
2335                {
2336                    // Reserve the current field columns except the `dst_label`.
2337                    for value in &self.ctx.field_columns {
2338                        if *value != dst_label {
2339                            let expr = DfExpr::Column(Column::from_name(value));
2340                            exprs.push(expr);
2341                        }
2342                    }
2343
2344                    ensure!(
2345                        !self.ctx.tag_columns.contains(&dst_label),
2346                        SameLabelSetSnafu
2347                    );
2348                    new_tags.push(dst_label);
2349                    // Add the new label expr to evaluate
2350                    exprs.push(replace_expr);
2351                } else {
2352                    // Keep the current field columns
2353                    for value in &self.ctx.field_columns {
2354                        let expr = DfExpr::Column(Column::from_name(value));
2355                        exprs.push(expr);
2356                    }
2357                }
2358
2359                ScalarFunc::GeneratedExpr
2360            }
2361            "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
2362                // These functions are not expression but a part of plan,
2363                // they are processed by `prom_call_expr_to_plan`.
2364                for value in &self.ctx.field_columns {
2365                    let expr = DfExpr::Column(Column::from_name(value));
2366                    exprs.push(expr);
2367                }
2368
2369                ScalarFunc::GeneratedExpr
2370            }
2371            "round" => {
2372                if other_input_exprs.is_empty() {
2373                    other_input_exprs.push_front(0.0f64.lit());
2374                }
2375                ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
2376            }
2377            "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
2378            "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
2379            "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
2380            "pi" => {
2381                // pi functions doesn't accepts any arguments, needs special processing
2382                let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2383                    func: datafusion::functions::math::pi(),
2384                    args: vec![],
2385                });
2386                exprs.push(fn_expr);
2387
2388                ScalarFunc::GeneratedExpr
2389            }
2390            _ => {
2391                if let Some(f) = query_engine_state
2392                    .session_state()
2393                    .scalar_functions()
2394                    .get(func.name)
2395                {
2396                    ScalarFunc::DataFusionBuiltin(f.clone())
2397                } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
2398                    let func_state = query_engine_state.function_state();
2399                    let query_ctx = self.table_provider.query_ctx();
2400
2401                    ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
2402                        state: func_state,
2403                        query_ctx: query_ctx.clone(),
2404                    })))
2405                } else if let Some(f) = datafusion_functions::math::functions()
2406                    .iter()
2407                    .find(|f| f.name() == func.name)
2408                {
2409                    ScalarFunc::DataFusionUdf(f.clone())
2410                } else {
2411                    return UnsupportedExprSnafu {
2412                        name: func.name.to_string(),
2413                    }
2414                    .fail();
2415                }
2416            }
2417        };
2418
2419        for value in &self.ctx.field_columns {
2420            let col_expr = DfExpr::Column(Column::from_name(value));
2421
2422            match scalar_func.clone() {
2423                ScalarFunc::DataFusionBuiltin(func) => {
2424                    other_input_exprs.insert(field_column_pos, col_expr);
2425                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2426                        func,
2427                        args: other_input_exprs.clone().into(),
2428                    });
2429                    exprs.push(fn_expr);
2430                    let _ = other_input_exprs.remove(field_column_pos);
2431                }
2432                ScalarFunc::DataFusionUdf(func) => {
2433                    let args = itertools::chain!(
2434                        other_input_exprs.iter().take(field_column_pos).cloned(),
2435                        std::iter::once(col_expr),
2436                        other_input_exprs.iter().skip(field_column_pos).cloned()
2437                    )
2438                    .collect_vec();
2439                    exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
2440                }
2441                ScalarFunc::Udf(func) => {
2442                    let ts_range_expr = DfExpr::Column(Column::from_name(
2443                        RangeManipulate::build_timestamp_range_name(
2444                            self.ctx.time_index_column.as_ref().unwrap(),
2445                        ),
2446                    ));
2447                    other_input_exprs.insert(field_column_pos, ts_range_expr);
2448                    other_input_exprs.insert(field_column_pos + 1, col_expr);
2449                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2450                        func,
2451                        args: other_input_exprs.clone().into(),
2452                    });
2453                    exprs.push(fn_expr);
2454                    let _ = other_input_exprs.remove(field_column_pos + 1);
2455                    let _ = other_input_exprs.remove(field_column_pos);
2456                }
2457                ScalarFunc::ExtrapolateUdf(func, range_length) => {
2458                    let ts_range_expr = DfExpr::Column(Column::from_name(
2459                        RangeManipulate::build_timestamp_range_name(
2460                            self.ctx.time_index_column.as_ref().unwrap(),
2461                        ),
2462                    ));
2463                    other_input_exprs.insert(field_column_pos, ts_range_expr);
2464                    other_input_exprs.insert(field_column_pos + 1, col_expr);
2465                    other_input_exprs
2466                        .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
2467                    other_input_exprs.push_back(lit(range_length));
2468                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2469                        func,
2470                        args: other_input_exprs.clone().into(),
2471                    });
2472                    exprs.push(fn_expr);
2473                    let _ = other_input_exprs.pop_back();
2474                    let _ = other_input_exprs.remove(field_column_pos + 2);
2475                    let _ = other_input_exprs.remove(field_column_pos + 1);
2476                    let _ = other_input_exprs.remove(field_column_pos);
2477                }
2478                ScalarFunc::GeneratedExpr => {}
2479            }
2480        }
2481
2482        // Update value columns' name, and alias them to remove qualifiers
2483        // For label functions such as `label_join`, `label_replace`, etc.,
2484        // we keep the fields unchanged.
2485        if !matches!(func.name, "label_join" | "label_replace") {
2486            let mut new_field_columns = Vec::with_capacity(exprs.len());
2487
2488            exprs = exprs
2489                .into_iter()
2490                .map(|expr| {
2491                    let display_name = expr.schema_name().to_string();
2492                    new_field_columns.push(display_name.clone());
2493                    Ok(expr.alias(display_name))
2494                })
2495                .collect::<std::result::Result<Vec<_>, _>>()
2496                .context(DataFusionPlanningSnafu)?;
2497
2498            self.ctx.field_columns = new_field_columns;
2499        }
2500
2501        Ok((exprs, new_tags))
2502    }
2503
2504    /// Validate label name according to Prometheus specification.
2505    /// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
2506    /// Additionally, label names starting with double underscores are reserved for internal use.
2507    fn validate_label_name(label_name: &str) -> Result<()> {
2508        // Check if label name starts with double underscores (reserved)
2509        if label_name.starts_with("__") {
2510            return InvalidDestinationLabelNameSnafu { label_name }.fail();
2511        }
2512        // Check if label name matches the required pattern
2513        if !LABEL_NAME_REGEX.is_match(label_name) {
2514            return InvalidDestinationLabelNameSnafu { label_name }.fail();
2515        }
2516
2517        Ok(())
2518    }
2519
2520    /// Build expr for `label_replace` function
2521    fn build_regexp_replace_label_expr(
2522        &self,
2523        other_input_exprs: &mut VecDeque<DfExpr>,
2524        query_engine_state: &QueryEngineState,
2525    ) -> Result<Option<(DfExpr, String)>> {
2526        // label_replace(vector, dst_label, replacement, src_label, regex)
2527        let dst_label = match other_input_exprs.pop_front() {
2528            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2529            other => UnexpectedPlanExprSnafu {
2530                desc: format!("expected dst_label string literal, but found {:?}", other),
2531            }
2532            .fail()?,
2533        };
2534
2535        // Validate the destination label name
2536        Self::validate_label_name(&dst_label)?;
2537        let replacement = match other_input_exprs.pop_front() {
2538            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
2539            other => UnexpectedPlanExprSnafu {
2540                desc: format!("expected replacement string literal, but found {:?}", other),
2541            }
2542            .fail()?,
2543        };
2544        let src_label = match other_input_exprs.pop_front() {
2545            Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2546            other => UnexpectedPlanExprSnafu {
2547                desc: format!("expected src_label string literal, but found {:?}", other),
2548            }
2549            .fail()?,
2550        };
2551
2552        let regex = match other_input_exprs.pop_front() {
2553            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2554            other => UnexpectedPlanExprSnafu {
2555                desc: format!("expected regex string literal, but found {:?}", other),
2556            }
2557            .fail()?,
2558        };
2559
2560        // Validate the regex before using it
2561        // doc: https://prometheus.io/docs/prometheus/latest/querying/functions/#label_replace
2562        regex::Regex::new(&regex).map_err(|_| {
2563            InvalidRegularExpressionSnafu {
2564                regex: regex.clone(),
2565            }
2566            .build()
2567        })?;
2568
2569        // If the src_label exists and regex is empty, keep everything unchanged.
2570        if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2571            return Ok(None);
2572        }
2573
2574        // If the src_label doesn't exists, and
2575        if !self.ctx.tag_columns.contains(&src_label) {
2576            if replacement.is_empty() {
2577                // the replacement is empty, keep everything unchanged.
2578                return Ok(None);
2579            } else {
2580                // the replacement is not empty, always adds dst_label with replacement value.
2581                return Ok(Some((
2582                    // alias literal `replacement` as dst_label
2583                    lit(replacement).alias(&dst_label),
2584                    dst_label,
2585                )));
2586            }
2587        }
2588
2589        // Preprocess the regex:
2590        // https://github.com/prometheus/prometheus/blob/d902abc50d6652ba8fe9a81ff8e5cce936114eba/promql/functions.go#L1575C32-L1575C37
2591        let regex = format!("^(?s:{regex})$");
2592
2593        let session_state = query_engine_state.session_state();
2594        let func = session_state
2595            .scalar_functions()
2596            .get("regexp_replace")
2597            .context(UnsupportedExprSnafu {
2598                name: "regexp_replace",
2599            })?;
2600
2601        // regexp_replace(src_label, regex, replacement)
2602        let args = vec![
2603            if src_label.is_empty() {
2604                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2605            } else {
2606                DfExpr::Column(Column::from_name(src_label))
2607            },
2608            DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2609            DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2610        ];
2611
2612        Ok(Some((
2613            DfExpr::ScalarFunction(ScalarFunction {
2614                func: func.clone(),
2615                args,
2616            })
2617            .alias(&dst_label),
2618            dst_label,
2619        )))
2620    }
2621
2622    /// Build expr for `label_join` function
2623    fn build_concat_labels_expr(
2624        other_input_exprs: &mut VecDeque<DfExpr>,
2625        ctx: &PromPlannerContext,
2626        query_engine_state: &QueryEngineState,
2627    ) -> Result<(DfExpr, String)> {
2628        // label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)
2629
2630        let dst_label = match other_input_exprs.pop_front() {
2631            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2632            other => UnexpectedPlanExprSnafu {
2633                desc: format!("expected dst_label string literal, but found {:?}", other),
2634            }
2635            .fail()?,
2636        };
2637        let separator = match other_input_exprs.pop_front() {
2638            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2639            other => UnexpectedPlanExprSnafu {
2640                desc: format!("expected separator string literal, but found {:?}", other),
2641            }
2642            .fail()?,
2643        };
2644
2645        // Create a set of available columns (tag columns + field columns + time index column)
2646        let available_columns: HashSet<&str> = ctx
2647            .tag_columns
2648            .iter()
2649            .chain(ctx.field_columns.iter())
2650            .chain(ctx.time_index_column.as_ref())
2651            .map(|s| s.as_str())
2652            .collect();
2653
2654        let src_labels = other_input_exprs
2655            .iter()
2656            .map(|expr| {
2657                // Cast source label into column or null literal
2658                match expr {
2659                    DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2660                        if label.is_empty() {
2661                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2662                        } else if available_columns.contains(label.as_str()) {
2663                            // Label exists in the table schema
2664                            Ok(DfExpr::Column(Column::from_name(label)))
2665                        } else {
2666                            // Label doesn't exist, treat as empty string (null)
2667                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2668                        }
2669                    }
2670                    other => UnexpectedPlanExprSnafu {
2671                        desc: format!(
2672                            "expected source label string literal, but found {:?}",
2673                            other
2674                        ),
2675                    }
2676                    .fail(),
2677                }
2678            })
2679            .collect::<Result<Vec<_>>>()?;
2680        ensure!(
2681            !src_labels.is_empty(),
2682            FunctionInvalidArgumentSnafu {
2683                fn_name: "label_join"
2684            }
2685        );
2686
2687        let session_state = query_engine_state.session_state();
2688        let func = session_state
2689            .scalar_functions()
2690            .get("concat_ws")
2691            .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2692
2693        // concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
2694        let mut args = Vec::with_capacity(1 + src_labels.len());
2695        args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2696        args.extend(src_labels);
2697
2698        Ok((
2699            DfExpr::ScalarFunction(ScalarFunction {
2700                func: func.clone(),
2701                args,
2702            })
2703            .alias(&dst_label),
2704            dst_label,
2705        ))
2706    }
2707
2708    fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2709        Ok(DfExpr::Column(Column::from_name(
2710            self.ctx
2711                .time_index_column
2712                .clone()
2713                .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2714        )))
2715    }
2716
2717    fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2718        let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2719        for tag in &self.ctx.tag_columns {
2720            let expr = DfExpr::Column(Column::from_name(tag));
2721            result.push(expr);
2722        }
2723        Ok(result)
2724    }
2725
2726    fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2727        let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2728        for field in &self.ctx.field_columns {
2729            let expr = DfExpr::Column(Column::from_name(field));
2730            result.push(expr);
2731        }
2732        Ok(result)
2733    }
2734
2735    fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2736        let mut result = self
2737            .ctx
2738            .tag_columns
2739            .iter()
2740            .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2741            .collect::<Vec<_>>();
2742        result.push(self.create_time_index_column_expr()?.sort(true, true));
2743        Ok(result)
2744    }
2745
2746    fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2747        self.ctx
2748            .field_columns
2749            .iter()
2750            .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2751            .collect::<Vec<_>>()
2752    }
2753
2754    fn create_sort_exprs_by_tags(
2755        func: &str,
2756        tags: Vec<DfExpr>,
2757        asc: bool,
2758    ) -> Result<Vec<SortExpr>> {
2759        ensure!(
2760            !tags.is_empty(),
2761            FunctionInvalidArgumentSnafu { fn_name: func }
2762        );
2763
2764        tags.iter()
2765            .map(|col| match col {
2766                DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2767                    Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2768                }
2769                other => UnexpectedPlanExprSnafu {
2770                    desc: format!("expected label string literal, but found {:?}", other),
2771                }
2772                .fail(),
2773            })
2774            .collect::<Result<Vec<_>>>()
2775    }
2776
2777    fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2778        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2779        for value in &self.ctx.field_columns {
2780            let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2781            exprs.push(expr);
2782        }
2783
2784        // This error context should be computed lazily: the planner may set `ctx.table_name` to
2785        // `None` for derived expressions (e.g. after projecting the LHS of a vector-vector
2786        // comparison filter). Eagerly calling `table_ref()?` here can turn a valid plan into
2787        // a `TableNameNotFound` error even when `conjunction(exprs)` succeeds.
2788        conjunction(exprs).with_context(|| ValueNotFoundSnafu {
2789            table: self
2790                .table_ref()
2791                .map(|t| t.to_quoted_string())
2792                .unwrap_or_else(|_| "unknown".to_string()),
2793        })
2794    }
2795
2796    /// Creates a set of DataFusion `DfExpr::AggregateFunction` expressions for each value column using the specified aggregate function.
2797    ///
2798    /// # Side Effects
2799    ///
2800    /// This method modifies the value columns in the context by replacing them with the new columns
2801    /// created by the aggregate function application.
2802    ///
2803    /// # Returns
2804    ///
2805    /// Returns a tuple of `(aggregate_expressions, previous_field_expressions)` where:
2806    /// - `aggregate_expressions`: Expressions that apply the aggregate function to the original fields
2807    /// - `previous_field_expressions`: Original field expressions before aggregation. This is non-empty
2808    ///   only when the operation is `count_values`, as this operation requires preserving the original
2809    ///   values for grouping.
2810    ///
2811    fn create_aggregate_exprs(
2812        &mut self,
2813        op: TokenType,
2814        param: &Option<Box<PromExpr>>,
2815        input_plan: &LogicalPlan,
2816    ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2817        let mut non_col_args = Vec::new();
2818        let is_group_agg = op.id() == token::T_GROUP;
2819        if is_group_agg {
2820            ensure!(
2821                self.ctx.field_columns.len() == 1,
2822                MultiFieldsNotSupportedSnafu {
2823                    operator: "group()"
2824                }
2825            );
2826        }
2827        let aggr = match op.id() {
2828            token::T_SUM => sum_udaf(),
2829            token::T_QUANTILE => {
2830                let q =
2831                    Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2832                non_col_args.push(q);
2833                quantile_udaf()
2834            }
2835            token::T_AVG => avg_udaf(),
2836            token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2837            token::T_MIN => min_udaf(),
2838            token::T_MAX => max_udaf(),
2839            // PromQL's `group()` aggregator produces 1 for each group.
2840            // Use `max(1.0)` (per-group) to match semantics and output type (Float64).
2841            token::T_GROUP => max_udaf(),
2842            token::T_STDDEV => stddev_pop_udaf(),
2843            token::T_STDVAR => var_pop_udaf(),
2844            token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2845                name: format!("{op:?}"),
2846            }
2847            .fail()?,
2848            _ => UnexpectedTokenSnafu { token: op }.fail()?,
2849        };
2850
2851        // perform aggregate operation to each value column
2852        let exprs: Vec<DfExpr> = self
2853            .ctx
2854            .field_columns
2855            .iter()
2856            .map(|col| {
2857                if is_group_agg {
2858                    aggr.call(vec![lit(1_f64)])
2859                } else {
2860                    non_col_args.push(DfExpr::Column(Column::from_name(col)));
2861                    let expr = aggr.call(non_col_args.clone());
2862                    non_col_args.pop();
2863                    expr
2864                }
2865            })
2866            .collect::<Vec<_>>();
2867
2868        // if the aggregator is `count_values`, it must be grouped by current fields.
2869        let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2870            let prev_field_exprs: Vec<_> = self
2871                .ctx
2872                .field_columns
2873                .iter()
2874                .map(|col| DfExpr::Column(Column::from_name(col)))
2875                .collect();
2876
2877            ensure!(
2878                self.ctx.field_columns.len() == 1,
2879                UnsupportedExprSnafu {
2880                    name: "count_values on multi-value input"
2881                }
2882            );
2883
2884            prev_field_exprs
2885        } else {
2886            vec![]
2887        };
2888
2889        // update value column name according to the aggregators,
2890        let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2891
2892        let normalized_exprs =
2893            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2894        for expr in normalized_exprs {
2895            new_field_columns.push(expr.schema_name().to_string());
2896        }
2897        self.ctx.field_columns = new_field_columns;
2898
2899        Ok((exprs, prev_field_exprs))
2900    }
2901
2902    fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2903        let param = param
2904            .as_deref()
2905            .with_context(|| FunctionInvalidArgumentSnafu {
2906                fn_name: op.to_string(),
2907            })?;
2908        let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2909            return FunctionInvalidArgumentSnafu {
2910                fn_name: op.to_string(),
2911            }
2912            .fail();
2913        };
2914
2915        Ok(val)
2916    }
2917
2918    fn get_param_as_literal_expr(
2919        param: &Option<Box<PromExpr>>,
2920        op: Option<TokenType>,
2921        expected_type: Option<ArrowDataType>,
2922    ) -> Result<DfExpr> {
2923        let prom_param = param.as_deref().with_context(|| {
2924            if let Some(op) = op {
2925                FunctionInvalidArgumentSnafu {
2926                    fn_name: op.to_string(),
2927                }
2928            } else {
2929                FunctionInvalidArgumentSnafu {
2930                    fn_name: "unknown".to_string(),
2931                }
2932            }
2933        })?;
2934
2935        let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2936            if let Some(op) = op {
2937                FunctionInvalidArgumentSnafu {
2938                    fn_name: op.to_string(),
2939                }
2940            } else {
2941                FunctionInvalidArgumentSnafu {
2942                    fn_name: "unknown".to_string(),
2943                }
2944            }
2945        })?;
2946
2947        // check if the type is expected
2948        if let Some(expected_type) = expected_type {
2949            // literal should not have reference to column
2950            let expr_type = expr
2951                .get_type(&DFSchema::empty())
2952                .context(DataFusionPlanningSnafu)?;
2953            if expected_type != expr_type {
2954                return FunctionInvalidArgumentSnafu {
2955                    fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2956                }
2957                .fail();
2958            }
2959        }
2960
2961        Ok(expr)
2962    }
2963
2964    /// Create [DfExpr::WindowFunction] expr for each value column with given window function.
2965    ///
2966    fn create_window_exprs(
2967        &mut self,
2968        op: TokenType,
2969        group_exprs: Vec<DfExpr>,
2970        input_plan: &LogicalPlan,
2971    ) -> Result<Vec<DfExpr>> {
2972        ensure!(
2973            self.ctx.field_columns.len() == 1,
2974            UnsupportedExprSnafu {
2975                name: "topk or bottomk on multi-value input"
2976            }
2977        );
2978
2979        assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2980
2981        let asc = matches!(op.id(), token::T_BOTTOMK);
2982
2983        let tag_sort_exprs = self
2984            .create_tag_column_exprs()?
2985            .into_iter()
2986            .map(|expr| expr.sort(asc, true));
2987
2988        // perform window operation to each value column
2989        let exprs: Vec<DfExpr> = self
2990            .ctx
2991            .field_columns
2992            .iter()
2993            .map(|col| {
2994                let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2995                // Order by value in the specific order
2996                sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2997                // Then tags if the values are equal,
2998                // Try to ensure the relative stability of the output results.
2999                sort_exprs.extend(tag_sort_exprs.clone());
3000
3001                DfExpr::WindowFunction(Box::new(WindowFunction {
3002                    fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
3003                    params: WindowFunctionParams {
3004                        args: vec![],
3005                        partition_by: group_exprs.clone(),
3006                        order_by: sort_exprs,
3007                        window_frame: WindowFrame::new(Some(true)),
3008                        null_treatment: None,
3009                        distinct: false,
3010                        filter: None,
3011                    },
3012                }))
3013            })
3014            .collect();
3015
3016        let normalized_exprs =
3017            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
3018        Ok(normalized_exprs)
3019    }
3020
3021    /// Try to build a [f64] from [PromExpr].
3022    #[deprecated(
3023        note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
3024    )]
3025    fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
3026        match expr {
3027            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
3028            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
3029            PromExpr::Unary(UnaryExpr { expr, .. }) => {
3030                Self::try_build_float_literal(expr).map(|f| -f)
3031            }
3032            PromExpr::StringLiteral(_)
3033            | PromExpr::Binary(_)
3034            | PromExpr::VectorSelector(_)
3035            | PromExpr::MatrixSelector(_)
3036            | PromExpr::Call(_)
3037            | PromExpr::Extension(_)
3038            | PromExpr::Aggregate(_)
3039            | PromExpr::Subquery(_) => None,
3040        }
3041    }
3042
3043    /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
3044    async fn create_histogram_plan(
3045        &mut self,
3046        args: &PromFunctionArgs,
3047        query_engine_state: &QueryEngineState,
3048    ) -> Result<LogicalPlan> {
3049        if args.args.len() != 2 {
3050            return FunctionInvalidArgumentSnafu {
3051                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3052            }
3053            .fail();
3054        }
3055        #[allow(deprecated)]
3056        let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
3057            FunctionInvalidArgumentSnafu {
3058                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3059            }
3060        })?;
3061
3062        let input = args.args[1].as_ref().clone();
3063        let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
3064        // `histogram_quantile` folds buckets across `le`, so `__tsid` (which includes `le`) is not
3065        // a stable series identifier anymore. Also, HistogramFold infers label columns from the
3066        // input schema and must not treat `__tsid` as a label column.
3067        let input_plan = self.strip_tsid_column(input_plan)?;
3068        self.ctx.use_tsid = false;
3069
3070        if !self.ctx.has_le_tag() {
3071            // Return empty result instead of error when 'le' column is not found
3072            // This handles the case when histogram metrics don't exist
3073            return Ok(LogicalPlan::EmptyRelation(
3074                datafusion::logical_expr::EmptyRelation {
3075                    produce_one_row: false,
3076                    schema: Arc::new(DFSchema::empty()),
3077                },
3078            ));
3079        }
3080        let time_index_column =
3081            self.ctx
3082                .time_index_column
3083                .clone()
3084                .with_context(|| TimeIndexNotFoundSnafu {
3085                    table: self.ctx.table_name.clone().unwrap_or_default(),
3086                })?;
3087        // FIXME(ruihang): support multi fields
3088        let field_column = self
3089            .ctx
3090            .field_columns
3091            .first()
3092            .with_context(|| FunctionInvalidArgumentSnafu {
3093                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
3094            })?
3095            .clone();
3096        // remove le column from tag columns
3097        self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
3098
3099        Ok(LogicalPlan::Extension(Extension {
3100            node: Arc::new(
3101                HistogramFold::new(
3102                    LE_COLUMN_NAME.to_string(),
3103                    field_column,
3104                    time_index_column,
3105                    phi,
3106                    input_plan,
3107                )
3108                .context(DataFusionPlanningSnafu)?,
3109            ),
3110        }))
3111    }
3112
3113    /// Create a [SPECIAL_VECTOR_FUNCTION] plan
3114    async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
3115        if args.args.len() != 1 {
3116            return FunctionInvalidArgumentSnafu {
3117                fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
3118            }
3119            .fail();
3120        }
3121        let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
3122
3123        // reuse `SPECIAL_TIME_FUNCTION` as name of time index column
3124        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
3125        self.ctx.reset_table_name_and_schema();
3126        self.ctx.tag_columns = vec![];
3127        self.ctx.field_columns = vec![greptime_value().to_string()];
3128        Ok(LogicalPlan::Extension(Extension {
3129            node: Arc::new(
3130                EmptyMetric::new(
3131                    self.ctx.start,
3132                    self.ctx.end,
3133                    self.ctx.interval,
3134                    SPECIAL_TIME_FUNCTION.to_string(),
3135                    greptime_value().to_string(),
3136                    Some(lit),
3137                )
3138                .context(DataFusionPlanningSnafu)?,
3139            ),
3140        }))
3141    }
3142
3143    /// Create a [SCALAR_FUNCTION] plan
3144    async fn create_scalar_plan(
3145        &mut self,
3146        args: &PromFunctionArgs,
3147        query_engine_state: &QueryEngineState,
3148    ) -> Result<LogicalPlan> {
3149        ensure!(
3150            args.len() == 1,
3151            FunctionInvalidArgumentSnafu {
3152                fn_name: SCALAR_FUNCTION
3153            }
3154        );
3155        let input = self
3156            .prom_expr_to_plan(&args.args[0], query_engine_state)
3157            .await?;
3158        ensure!(
3159            self.ctx.field_columns.len() == 1,
3160            MultiFieldsNotSupportedSnafu {
3161                operator: SCALAR_FUNCTION
3162            },
3163        );
3164        let scalar_plan = LogicalPlan::Extension(Extension {
3165            node: Arc::new(
3166                ScalarCalculate::new(
3167                    self.ctx.start,
3168                    self.ctx.end,
3169                    self.ctx.interval,
3170                    input,
3171                    self.ctx.time_index_column.as_ref().unwrap(),
3172                    &self.ctx.tag_columns,
3173                    &self.ctx.field_columns[0],
3174                    self.ctx.table_name.as_deref(),
3175                )
3176                .context(PromqlPlanNodeSnafu)?,
3177            ),
3178        });
3179        // scalar plan have no tag columns
3180        self.ctx.tag_columns.clear();
3181        self.ctx.field_columns.clear();
3182        self.ctx
3183            .field_columns
3184            .push(scalar_plan.schema().field(1).name().clone());
3185        Ok(scalar_plan)
3186    }
3187
3188    /// Create a [SPECIAL_ABSENT_FUNCTION] plan
3189    async fn create_absent_plan(
3190        &mut self,
3191        args: &PromFunctionArgs,
3192        query_engine_state: &QueryEngineState,
3193    ) -> Result<LogicalPlan> {
3194        if args.args.len() != 1 {
3195            return FunctionInvalidArgumentSnafu {
3196                fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
3197            }
3198            .fail();
3199        }
3200        let input = self
3201            .prom_expr_to_plan(&args.args[0], query_engine_state)
3202            .await?;
3203
3204        let time_index_expr = self.create_time_index_column_expr()?;
3205        let first_field_expr =
3206            self.create_field_column_exprs()?
3207                .pop()
3208                .with_context(|| ValueNotFoundSnafu {
3209                    table: self.ctx.table_name.clone().unwrap_or_default(),
3210                })?;
3211        let first_value_expr = first_value(first_field_expr, vec![]);
3212
3213        let ordered_aggregated_input = LogicalPlanBuilder::from(input)
3214            .aggregate(
3215                vec![time_index_expr.clone()],
3216                vec![first_value_expr.clone()],
3217            )
3218            .context(DataFusionPlanningSnafu)?
3219            .sort(vec![time_index_expr.sort(true, false)])
3220            .context(DataFusionPlanningSnafu)?
3221            .build()
3222            .context(DataFusionPlanningSnafu)?;
3223
3224        let fake_labels = self
3225            .ctx
3226            .selector_matcher
3227            .iter()
3228            .filter_map(|matcher| match matcher.op {
3229                MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
3230                _ => None,
3231            })
3232            .collect::<Vec<_>>();
3233
3234        // Create the absent plan
3235        let absent_plan = LogicalPlan::Extension(Extension {
3236            node: Arc::new(
3237                Absent::try_new(
3238                    self.ctx.start,
3239                    self.ctx.end,
3240                    self.ctx.interval,
3241                    self.ctx.time_index_column.as_ref().unwrap().clone(),
3242                    self.ctx.field_columns[0].clone(),
3243                    fake_labels,
3244                    ordered_aggregated_input,
3245                )
3246                .context(DataFusionPlanningSnafu)?,
3247            ),
3248        });
3249
3250        Ok(absent_plan)
3251    }
3252
3253    /// Try to build a DataFusion Literal Expression from PromQL Expr, return
3254    /// `None` if the input is not a literal expression.
3255    fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
3256        match expr {
3257            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
3258            PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
3259            PromExpr::VectorSelector(_)
3260            | PromExpr::MatrixSelector(_)
3261            | PromExpr::Extension(_)
3262            | PromExpr::Aggregate(_)
3263            | PromExpr::Subquery(_) => None,
3264            PromExpr::Call(Call { func, .. }) => {
3265                if func.name == SPECIAL_TIME_FUNCTION {
3266                    // For time() function, don't treat it as a literal
3267                    // Let it be handled as a regular function call
3268                    None
3269                } else {
3270                    None
3271                }
3272            }
3273            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
3274            // TODO(ruihang): support Unary operator
3275            PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
3276            PromExpr::Binary(PromBinaryExpr {
3277                lhs,
3278                rhs,
3279                op,
3280                modifier,
3281            }) => {
3282                let lhs = Self::try_build_literal_expr(lhs)?;
3283                let rhs = Self::try_build_literal_expr(rhs)?;
3284                let is_comparison_op = Self::is_token_a_comparison_op(*op);
3285                let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
3286                let expr = expr_builder(lhs, rhs).ok()?;
3287
3288                let should_return_bool = if let Some(m) = modifier {
3289                    m.return_bool
3290                } else {
3291                    false
3292                };
3293                if is_comparison_op && should_return_bool {
3294                    Some(DfExpr::Cast(Cast {
3295                        expr: Box::new(expr),
3296                        data_type: ArrowDataType::Float64,
3297                    }))
3298                } else {
3299                    Some(expr)
3300                }
3301            }
3302        }
3303    }
3304
3305    fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
3306        match expr {
3307            PromExpr::Call(Call { func, .. }) => {
3308                if func.name == SPECIAL_TIME_FUNCTION
3309                    && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
3310                {
3311                    Some(build_special_time_expr(time_index_col))
3312                } else {
3313                    None
3314                }
3315            }
3316            _ => None,
3317        }
3318    }
3319
3320    /// Return a lambda to build binary expression from token.
3321    /// Because some binary operator are function in DataFusion like `atan2` or `^`.
3322    #[allow(clippy::type_complexity)]
3323    fn prom_token_to_binary_expr_builder(
3324        token: TokenType,
3325    ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
3326        match token.id() {
3327            token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
3328            token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
3329            token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
3330            token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
3331            token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
3332            token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
3333            token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
3334            token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
3335            token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
3336            token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
3337            token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
3338            token::T_POW => Ok(Box::new(|lhs, rhs| {
3339                Ok(DfExpr::ScalarFunction(ScalarFunction {
3340                    func: datafusion_functions::math::power(),
3341                    args: vec![lhs, rhs],
3342                }))
3343            })),
3344            token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
3345                Ok(DfExpr::ScalarFunction(ScalarFunction {
3346                    func: datafusion_functions::math::atan2(),
3347                    args: vec![lhs, rhs],
3348                }))
3349            })),
3350            _ => UnexpectedTokenSnafu { token }.fail(),
3351        }
3352    }
3353
3354    /// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
3355    fn is_token_a_comparison_op(token: TokenType) -> bool {
3356        matches!(
3357            token.id(),
3358            token::T_EQLC
3359                | token::T_NEQ
3360                | token::T_GTR
3361                | token::T_LSS
3362                | token::T_GTE
3363                | token::T_LTE
3364        )
3365    }
3366
3367    /// Check if the given op is a set operator (UNION, INTERSECT and EXCEPT in SQL).
3368    fn is_token_a_set_op(token: TokenType) -> bool {
3369        matches!(
3370            token.id(),
3371            token::T_LAND // INTERSECT
3372                | token::T_LOR // UNION
3373                | token::T_LUNLESS // EXCEPT
3374        )
3375    }
3376
3377    /// Build a inner join on time index column and tag columns to concat two logical plans.
3378    /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
3379    #[allow(clippy::too_many_arguments)]
3380    fn join_on_non_field_columns(
3381        &self,
3382        left: LogicalPlan,
3383        right: LogicalPlan,
3384        left_table_ref: TableReference,
3385        right_table_ref: TableReference,
3386        left_time_index_column: Option<String>,
3387        right_time_index_column: Option<String>,
3388        only_join_time_index: bool,
3389        modifier: &Option<BinModifier>,
3390    ) -> Result<LogicalPlan> {
3391        let mut left_tag_columns = if only_join_time_index {
3392            BTreeSet::new()
3393        } else {
3394            self.ctx
3395                .tag_columns
3396                .iter()
3397                .cloned()
3398                .collect::<BTreeSet<_>>()
3399        };
3400        let mut right_tag_columns = left_tag_columns.clone();
3401
3402        // apply modifier
3403        if let Some(modifier) = modifier {
3404            // apply label modifier
3405            if let Some(matching) = &modifier.matching {
3406                match matching {
3407                    // keeps columns mentioned in `on`
3408                    LabelModifier::Include(on) => {
3409                        let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
3410                        left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
3411                        right_tag_columns =
3412                            right_tag_columns.intersection(&mask).cloned().collect();
3413                    }
3414                    // removes columns memtioned in `ignoring`
3415                    LabelModifier::Exclude(ignoring) => {
3416                        // doesn't check existence of label
3417                        for label in &ignoring.labels {
3418                            let _ = left_tag_columns.remove(label);
3419                            let _ = right_tag_columns.remove(label);
3420                        }
3421                    }
3422                }
3423            }
3424        }
3425
3426        // push time index column if it exists
3427        if let (Some(left_time_index_column), Some(right_time_index_column)) =
3428            (left_time_index_column, right_time_index_column)
3429        {
3430            left_tag_columns.insert(left_time_index_column);
3431            right_tag_columns.insert(right_time_index_column);
3432        }
3433
3434        let right = LogicalPlanBuilder::from(right)
3435            .alias(right_table_ref)
3436            .context(DataFusionPlanningSnafu)?
3437            .build()
3438            .context(DataFusionPlanningSnafu)?;
3439
3440        // Inner Join on time index column to concat two operator
3441        LogicalPlanBuilder::from(left)
3442            .alias(left_table_ref)
3443            .context(DataFusionPlanningSnafu)?
3444            .join_detailed(
3445                right,
3446                JoinType::Inner,
3447                (
3448                    left_tag_columns
3449                        .into_iter()
3450                        .map(Column::from_name)
3451                        .collect::<Vec<_>>(),
3452                    right_tag_columns
3453                        .into_iter()
3454                        .map(Column::from_name)
3455                        .collect::<Vec<_>>(),
3456                ),
3457                None,
3458                NullEquality::NullEqualsNull,
3459            )
3460            .context(DataFusionPlanningSnafu)?
3461            .build()
3462            .context(DataFusionPlanningSnafu)
3463    }
3464
3465    /// Build a set operator (AND/OR/UNLESS)
3466    fn set_op_on_non_field_columns(
3467        &mut self,
3468        left: LogicalPlan,
3469        mut right: LogicalPlan,
3470        left_context: PromPlannerContext,
3471        right_context: PromPlannerContext,
3472        op: TokenType,
3473        modifier: &Option<BinModifier>,
3474    ) -> Result<LogicalPlan> {
3475        let mut left_tag_col_set = left_context
3476            .tag_columns
3477            .iter()
3478            .cloned()
3479            .collect::<HashSet<_>>();
3480        let mut right_tag_col_set = right_context
3481            .tag_columns
3482            .iter()
3483            .cloned()
3484            .collect::<HashSet<_>>();
3485
3486        if matches!(op.id(), token::T_LOR) {
3487            return self.or_operator(
3488                left,
3489                right,
3490                left_tag_col_set,
3491                right_tag_col_set,
3492                left_context,
3493                right_context,
3494                modifier,
3495            );
3496        }
3497
3498        // apply modifier
3499        if let Some(modifier) = modifier {
3500            // one-to-many and many-to-one are not supported
3501            ensure!(
3502                matches!(
3503                    modifier.card,
3504                    VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
3505                ),
3506                UnsupportedVectorMatchSnafu {
3507                    name: modifier.card.clone(),
3508                },
3509            );
3510            // apply label modifier
3511            if let Some(matching) = &modifier.matching {
3512                match matching {
3513                    // keeps columns mentioned in `on`
3514                    LabelModifier::Include(on) => {
3515                        let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
3516                        left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
3517                        right_tag_col_set =
3518                            right_tag_col_set.intersection(&mask).cloned().collect();
3519                    }
3520                    // removes columns memtioned in `ignoring`
3521                    LabelModifier::Exclude(ignoring) => {
3522                        // doesn't check existence of label
3523                        for label in &ignoring.labels {
3524                            let _ = left_tag_col_set.remove(label);
3525                            let _ = right_tag_col_set.remove(label);
3526                        }
3527                    }
3528                }
3529            }
3530        }
3531        // ensure two sides have the same tag columns
3532        if !matches!(op.id(), token::T_LOR) {
3533            ensure!(
3534                left_tag_col_set == right_tag_col_set,
3535                CombineTableColumnMismatchSnafu {
3536                    left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
3537                    right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
3538                }
3539            )
3540        };
3541        let left_time_index = left_context.time_index_column.clone().unwrap();
3542        let right_time_index = right_context.time_index_column.clone().unwrap();
3543        let join_keys = left_tag_col_set
3544            .iter()
3545            .cloned()
3546            .chain([left_time_index.clone()])
3547            .collect::<Vec<_>>();
3548        self.ctx.time_index_column = Some(left_time_index.clone());
3549        self.ctx.use_tsid = left_context.use_tsid;
3550
3551        // alias right time index column if necessary
3552        if left_context.time_index_column != right_context.time_index_column {
3553            let right_project_exprs = right
3554                .schema()
3555                .fields()
3556                .iter()
3557                .map(|field| {
3558                    if field.name() == &right_time_index {
3559                        DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
3560                    } else {
3561                        DfExpr::Column(Column::from_name(field.name()))
3562                    }
3563                })
3564                .collect::<Vec<_>>();
3565
3566            right = LogicalPlanBuilder::from(right)
3567                .project(right_project_exprs)
3568                .context(DataFusionPlanningSnafu)?
3569                .build()
3570                .context(DataFusionPlanningSnafu)?;
3571        }
3572
3573        ensure!(
3574            left_context.field_columns.len() == 1,
3575            MultiFieldsNotSupportedSnafu {
3576                operator: "AND operator"
3577            }
3578        );
3579        // Update the field column in context.
3580        // The AND/UNLESS operator only keep the field column in left input.
3581        let left_field_col = left_context.field_columns.first().unwrap();
3582        self.ctx.field_columns = vec![left_field_col.clone()];
3583
3584        // Generate join plan.
3585        // All set operations in PromQL are "distinct"
3586        match op.id() {
3587            token::T_LAND => LogicalPlanBuilder::from(left)
3588                .distinct()
3589                .context(DataFusionPlanningSnafu)?
3590                .join_detailed(
3591                    right,
3592                    JoinType::LeftSemi,
3593                    (join_keys.clone(), join_keys),
3594                    None,
3595                    NullEquality::NullEqualsNull,
3596                )
3597                .context(DataFusionPlanningSnafu)?
3598                .build()
3599                .context(DataFusionPlanningSnafu),
3600            token::T_LUNLESS => LogicalPlanBuilder::from(left)
3601                .distinct()
3602                .context(DataFusionPlanningSnafu)?
3603                .join_detailed(
3604                    right,
3605                    JoinType::LeftAnti,
3606                    (join_keys.clone(), join_keys),
3607                    None,
3608                    NullEquality::NullEqualsNull,
3609                )
3610                .context(DataFusionPlanningSnafu)?
3611                .build()
3612                .context(DataFusionPlanningSnafu),
3613            token::T_LOR => {
3614                // OR is handled at the beginning of this function, as it cannot
3615                // be expressed using JOIN like AND and UNLESS.
3616                unreachable!()
3617            }
3618            _ => UnexpectedTokenSnafu { token: op }.fail(),
3619        }
3620    }
3621
3622    // TODO(ruihang): change function name
3623    #[allow(clippy::too_many_arguments)]
3624    fn or_operator(
3625        &mut self,
3626        left: LogicalPlan,
3627        right: LogicalPlan,
3628        left_tag_cols_set: HashSet<String>,
3629        right_tag_cols_set: HashSet<String>,
3630        left_context: PromPlannerContext,
3631        right_context: PromPlannerContext,
3632        modifier: &Option<BinModifier>,
3633    ) -> Result<LogicalPlan> {
3634        // checks
3635        ensure!(
3636            left_context.field_columns.len() == right_context.field_columns.len(),
3637            CombineTableColumnMismatchSnafu {
3638                left: left_context.field_columns.clone(),
3639                right: right_context.field_columns.clone()
3640            }
3641        );
3642        ensure!(
3643            left_context.field_columns.len() == 1,
3644            MultiFieldsNotSupportedSnafu {
3645                operator: "OR operator"
3646            }
3647        );
3648
3649        // prepare hash sets
3650        let all_tags = left_tag_cols_set
3651            .union(&right_tag_cols_set)
3652            .cloned()
3653            .collect::<HashSet<_>>();
3654        let tags_not_in_left = all_tags
3655            .difference(&left_tag_cols_set)
3656            .cloned()
3657            .collect::<Vec<_>>();
3658        let tags_not_in_right = all_tags
3659            .difference(&right_tag_cols_set)
3660            .cloned()
3661            .collect::<Vec<_>>();
3662        let left_qualifier = left.schema().qualified_field(0).0.cloned();
3663        let right_qualifier = right.schema().qualified_field(0).0.cloned();
3664        let left_qualifier_string = left_qualifier
3665            .as_ref()
3666            .map(|l| l.to_string())
3667            .unwrap_or_default();
3668        let right_qualifier_string = right_qualifier
3669            .as_ref()
3670            .map(|r| r.to_string())
3671            .unwrap_or_default();
3672        let left_time_index_column =
3673            left_context
3674                .time_index_column
3675                .clone()
3676                .with_context(|| TimeIndexNotFoundSnafu {
3677                    table: left_qualifier_string.clone(),
3678                })?;
3679        let right_time_index_column =
3680            right_context
3681                .time_index_column
3682                .clone()
3683                .with_context(|| TimeIndexNotFoundSnafu {
3684                    table: right_qualifier_string.clone(),
3685                })?;
3686        // Take the name of first field column. The length is checked above.
3687        let left_field_col = left_context.field_columns.first().unwrap();
3688        let right_field_col = right_context.field_columns.first().unwrap();
3689        let left_has_tsid = left
3690            .schema()
3691            .fields()
3692            .iter()
3693            .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3694        let right_has_tsid = right
3695            .schema()
3696            .fields()
3697            .iter()
3698            .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3699
3700        // step 0: fill all columns in output schema
3701        let mut all_columns_set = left
3702            .schema()
3703            .fields()
3704            .iter()
3705            .chain(right.schema().fields().iter())
3706            .map(|field| field.name().clone())
3707            .collect::<HashSet<_>>();
3708        // Keep `__tsid` only when both sides contain it, otherwise it may break schema alignment
3709        // (e.g. `unknown_metric or some_metric`).
3710        if !(left_has_tsid && right_has_tsid) {
3711            all_columns_set.remove(DATA_SCHEMA_TSID_COLUMN_NAME);
3712        }
3713        // remove time index column
3714        all_columns_set.remove(&left_time_index_column);
3715        all_columns_set.remove(&right_time_index_column);
3716        // remove field column in the right
3717        if left_field_col != right_field_col {
3718            all_columns_set.remove(right_field_col);
3719        }
3720        let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3721        // sort to ensure the generated schema is not volatile
3722        all_columns.sort_unstable();
3723        // use left time index column name as the result time index column name
3724        all_columns.insert(0, left_time_index_column.clone());
3725
3726        // step 1: align schema using project, fill non-exist columns with null
3727        let left_proj_exprs = all_columns.iter().map(|col| {
3728            if tags_not_in_left.contains(col) {
3729                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3730            } else {
3731                DfExpr::Column(Column::new(None::<String>, col))
3732            }
3733        });
3734        let right_time_index_expr = DfExpr::Column(Column::new(
3735            right_qualifier.clone(),
3736            right_time_index_column,
3737        ))
3738        .alias(left_time_index_column.clone());
3739        // The field column in right side may not have qualifier (it may be removed by join operation),
3740        // so we need to find it from the schema.
3741        let right_qualifier_for_field = right
3742            .schema()
3743            .iter()
3744            .find(|(_, f)| f.name() == right_field_col)
3745            .map(|(q, _)| q)
3746            .with_context(|| ColumnNotFoundSnafu {
3747                col: right_field_col.clone(),
3748            })?
3749            .cloned();
3750
3751        // `skip(1)` to skip the time index column
3752        let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3753            // expr
3754            if col == left_field_col && left_field_col != right_field_col {
3755                // qualify field in right side if necessary to handle different field name
3756                DfExpr::Column(Column::new(
3757                    right_qualifier_for_field.clone(),
3758                    right_field_col,
3759                ))
3760            } else if tags_not_in_right.contains(col) {
3761                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3762            } else {
3763                DfExpr::Column(Column::new(None::<String>, col))
3764            }
3765        });
3766        let right_proj_exprs = [right_time_index_expr]
3767            .into_iter()
3768            .chain(right_proj_exprs_without_time_index);
3769
3770        let left_projected = LogicalPlanBuilder::from(left)
3771            .project(left_proj_exprs)
3772            .context(DataFusionPlanningSnafu)?
3773            .alias(left_qualifier_string.clone())
3774            .context(DataFusionPlanningSnafu)?
3775            .build()
3776            .context(DataFusionPlanningSnafu)?;
3777        let right_projected = LogicalPlanBuilder::from(right)
3778            .project(right_proj_exprs)
3779            .context(DataFusionPlanningSnafu)?
3780            .alias(right_qualifier_string.clone())
3781            .context(DataFusionPlanningSnafu)?
3782            .build()
3783            .context(DataFusionPlanningSnafu)?;
3784
3785        // step 2: compute match columns
3786        let mut match_columns = if let Some(modifier) = modifier
3787            && let Some(matching) = &modifier.matching
3788        {
3789            match matching {
3790                // keeps columns mentioned in `on`
3791                LabelModifier::Include(on) => on.labels.clone(),
3792                // removes columns memtioned in `ignoring`
3793                LabelModifier::Exclude(ignoring) => {
3794                    let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3795                    all_tags.difference(&ignoring).cloned().collect()
3796                }
3797            }
3798        } else {
3799            all_tags.iter().cloned().collect()
3800        };
3801        // sort to ensure the generated plan is not volatile
3802        match_columns.sort_unstable();
3803        // step 3: build `UnionDistinctOn` plan
3804        let schema = left_projected.schema().clone();
3805        let union_distinct_on = UnionDistinctOn::new(
3806            left_projected,
3807            right_projected,
3808            match_columns,
3809            left_time_index_column.clone(),
3810            schema,
3811        );
3812        let result = LogicalPlan::Extension(Extension {
3813            node: Arc::new(union_distinct_on),
3814        });
3815
3816        // step 4: update context
3817        self.ctx.time_index_column = Some(left_time_index_column);
3818        self.ctx.tag_columns = all_tags.into_iter().collect();
3819        self.ctx.field_columns = vec![left_field_col.clone()];
3820        self.ctx.use_tsid = left_has_tsid && right_has_tsid;
3821
3822        Ok(result)
3823    }
3824
3825    /// Build a projection that project and perform operation expr for every value columns.
3826    /// Non-value columns (tag and timestamp) will be preserved in the projection.
3827    ///
3828    /// # Side effect
3829    ///
3830    /// This function will update the value columns in the context. Those new column names
3831    /// don't contains qualifier.
3832    fn projection_for_each_field_column<F>(
3833        &mut self,
3834        input: LogicalPlan,
3835        name_to_expr: F,
3836    ) -> Result<LogicalPlan>
3837    where
3838        F: FnMut(&String) -> Result<DfExpr>,
3839    {
3840        let non_field_columns_iter = self
3841            .ctx
3842            .tag_columns
3843            .iter()
3844            .chain(self.ctx.time_index_column.iter())
3845            .map(|col| {
3846                Ok(DfExpr::Column(Column::new(
3847                    self.ctx.table_name.clone().map(TableReference::bare),
3848                    col,
3849                )))
3850            });
3851
3852        // build computation exprs
3853        let result_field_columns = self
3854            .ctx
3855            .field_columns
3856            .iter()
3857            .map(name_to_expr)
3858            .collect::<Result<Vec<_>>>()?;
3859
3860        // alias the computation exprs to remove qualifier
3861        self.ctx.field_columns = result_field_columns
3862            .iter()
3863            .map(|expr| expr.schema_name().to_string())
3864            .collect();
3865        let field_columns_iter = result_field_columns
3866            .into_iter()
3867            .zip(self.ctx.field_columns.iter())
3868            .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3869
3870        // chain non-field columns (unchanged) and field columns (applied computation then alias)
3871        let project_fields = non_field_columns_iter
3872            .chain(field_columns_iter)
3873            .collect::<Result<Vec<_>>>()?;
3874
3875        LogicalPlanBuilder::from(input)
3876            .project(project_fields)
3877            .context(DataFusionPlanningSnafu)?
3878            .build()
3879            .context(DataFusionPlanningSnafu)
3880    }
3881
3882    /// Build a filter plan that filter on value column. Notice that only one value column
3883    /// is expected.
3884    fn filter_on_field_column<F>(
3885        &self,
3886        input: LogicalPlan,
3887        mut name_to_expr: F,
3888    ) -> Result<LogicalPlan>
3889    where
3890        F: FnMut(&String) -> Result<DfExpr>,
3891    {
3892        ensure!(
3893            self.ctx.field_columns.len() == 1,
3894            UnsupportedExprSnafu {
3895                name: "filter on multi-value input"
3896            }
3897        );
3898
3899        let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3900
3901        LogicalPlanBuilder::from(input)
3902            .filter(field_column_filter)
3903            .context(DataFusionPlanningSnafu)?
3904            .build()
3905            .context(DataFusionPlanningSnafu)
3906    }
3907
3908    /// Generate an expr like `date_part("hour", <TIME_INDEX>)`. Caller should ensure the
3909    /// time index column in context is set
3910    fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3911        let input_expr = datafusion::logical_expr::col(
3912            self.ctx
3913                .time_index_column
3914                .as_ref()
3915                // table name doesn't matters here
3916                .with_context(|| TimeIndexNotFoundSnafu {
3917                    table: "<doesn't matter>",
3918                })?,
3919        );
3920        let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3921            func: datafusion_functions::datetime::date_part(),
3922            args: vec![date_part.lit(), input_expr],
3923        });
3924        Ok(fn_expr)
3925    }
3926
3927    fn strip_tsid_column(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
3928        let schema = plan.schema();
3929        if !schema
3930            .fields()
3931            .iter()
3932            .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
3933        {
3934            return Ok(plan);
3935        }
3936
3937        let project_exprs = schema
3938            .fields()
3939            .iter()
3940            .filter(|field| field.name() != DATA_SCHEMA_TSID_COLUMN_NAME)
3941            .map(|field| Ok(DfExpr::Column(Column::from_name(field.name().clone()))))
3942            .collect::<Result<Vec<_>>>()?;
3943
3944        LogicalPlanBuilder::from(plan)
3945            .project(project_exprs)
3946            .context(DataFusionPlanningSnafu)?
3947            .build()
3948            .context(DataFusionPlanningSnafu)
3949    }
3950
3951    /// Apply an alias to the query result by adding a projection with the alias name
3952    fn apply_alias(&mut self, plan: LogicalPlan, alias_name: String) -> Result<LogicalPlan> {
3953        let fields_expr = self.create_field_column_exprs()?;
3954
3955        // TODO(dennis): how to support multi-value aliasing?
3956        ensure!(
3957            fields_expr.len() == 1,
3958            UnsupportedExprSnafu {
3959                name: "alias on multi-value result"
3960            }
3961        );
3962
3963        let project_fields = fields_expr
3964            .into_iter()
3965            .map(|expr| expr.alias(&alias_name))
3966            .chain(self.create_tag_column_exprs()?)
3967            .chain(Some(self.create_time_index_column_expr()?));
3968
3969        LogicalPlanBuilder::from(plan)
3970            .project(project_fields)
3971            .context(DataFusionPlanningSnafu)?
3972            .build()
3973            .context(DataFusionPlanningSnafu)
3974    }
3975}
3976
3977#[derive(Default, Debug)]
3978struct FunctionArgs {
3979    input: Option<PromExpr>,
3980    literals: Vec<DfExpr>,
3981}
3982
3983/// Represents different types of scalar functions supported in PromQL expressions.
3984/// Each variant defines how the function should be processed and what arguments it expects.
3985#[derive(Debug, Clone)]
3986enum ScalarFunc {
3987    /// DataFusion's registered(including built-in) scalar functions (e.g., abs, sqrt, round, clamp).
3988    /// These are passed through directly to DataFusion's execution engine.
3989    /// Processing: Simple argument insertion at the specified position.
3990    DataFusionBuiltin(Arc<ScalarUdfDef>),
3991    /// User-defined functions registered in DataFusion's function registry.
3992    /// Similar to DataFusionBuiltin but for custom functions not built into DataFusion.
3993    /// Processing: Direct pass-through with argument positioning.
3994    DataFusionUdf(Arc<ScalarUdfDef>),
3995    /// PromQL-specific functions that operate on time series data with temporal context.
3996    /// These functions require both timestamp ranges and values to perform calculations.
3997    /// Processing: Automatically injects timestamp_range and value columns as first arguments.
3998    /// Examples: idelta, irate, resets, changes, deriv, *_over_time function
3999    Udf(Arc<ScalarUdfDef>),
4000    /// PromQL functions requiring extrapolation calculations with explicit range information.
4001    /// These functions need to know the time range length to perform rate calculations.
4002    /// The second field contains the range length in milliseconds.
4003    /// Processing: Injects timestamp_range, value, time_index columns and appends range_length.
4004    /// Examples: increase, rate, delta
4005    // TODO(ruihang): maybe merge with Udf later
4006    ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
4007    /// Functions that generate expressions directly without external UDF calls.
4008    /// The expression is constructed during function matching and requires no additional processing.
4009    /// Examples: time(), minute(), hour(), month(), year() and other date/time extractors
4010    GeneratedExpr,
4011}
4012
4013#[cfg(test)]
4014mod test {
4015    use std::time::{Duration, UNIX_EPOCH};
4016
4017    use catalog::RegisterTableRequest;
4018    use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
4019    use common_base::Plugins;
4020    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
4021    use common_query::prelude::greptime_timestamp;
4022    use common_query::test_util::DummyDecoder;
4023    use datatypes::prelude::ConcreteDataType;
4024    use datatypes::schema::{ColumnSchema, Schema};
4025    use promql_parser::label::Labels;
4026    use promql_parser::parser;
4027    use session::context::QueryContext;
4028    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
4029    use table::test_util::EmptyTable;
4030
4031    use super::*;
4032    use crate::options::QueryOptions;
4033    use crate::parser::QueryLanguageParser;
4034
4035    fn build_query_engine_state() -> QueryEngineState {
4036        QueryEngineState::new(
4037            new_memory_catalog_manager().unwrap(),
4038            None,
4039            None,
4040            None,
4041            None,
4042            None,
4043            false,
4044            Plugins::default(),
4045            QueryOptions::default(),
4046        )
4047    }
4048
4049    async fn build_test_table_provider(
4050        table_name_tuples: &[(String, String)],
4051        num_tag: usize,
4052        num_field: usize,
4053    ) -> DfTableSourceProvider {
4054        let catalog_list = MemoryCatalogManager::with_default_setup();
4055        for (schema_name, table_name) in table_name_tuples {
4056            let mut columns = vec![];
4057            for i in 0..num_tag {
4058                columns.push(ColumnSchema::new(
4059                    format!("tag_{i}"),
4060                    ConcreteDataType::string_datatype(),
4061                    false,
4062                ));
4063            }
4064            columns.push(
4065                ColumnSchema::new(
4066                    "timestamp".to_string(),
4067                    ConcreteDataType::timestamp_millisecond_datatype(),
4068                    false,
4069                )
4070                .with_time_index(true),
4071            );
4072            for i in 0..num_field {
4073                columns.push(ColumnSchema::new(
4074                    format!("field_{i}"),
4075                    ConcreteDataType::float64_datatype(),
4076                    true,
4077                ));
4078            }
4079            let schema = Arc::new(Schema::new(columns));
4080            let table_meta = TableMetaBuilder::empty()
4081                .schema(schema)
4082                .primary_key_indices((0..num_tag).collect())
4083                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4084                .next_column_id(1024)
4085                .build()
4086                .unwrap();
4087            let table_info = TableInfoBuilder::default()
4088                .name(table_name.clone())
4089                .meta(table_meta)
4090                .build()
4091                .unwrap();
4092            let table = EmptyTable::from_table_info(&table_info);
4093
4094            assert!(
4095                catalog_list
4096                    .register_table_sync(RegisterTableRequest {
4097                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4098                        schema: schema_name.clone(),
4099                        table_name: table_name.clone(),
4100                        table_id: 1024,
4101                        table,
4102                    })
4103                    .is_ok()
4104            );
4105        }
4106
4107        DfTableSourceProvider::new(
4108            catalog_list,
4109            false,
4110            QueryContext::arc(),
4111            DummyDecoder::arc(),
4112            false,
4113        )
4114    }
4115
4116    async fn build_test_table_provider_with_tsid(
4117        table_name_tuples: &[(String, String)],
4118        num_tag: usize,
4119        num_field: usize,
4120    ) -> DfTableSourceProvider {
4121        let catalog_list = MemoryCatalogManager::with_default_setup();
4122
4123        let physical_table_name = "phy";
4124        let physical_table_id = 999u32;
4125
4126        // Register a metric engine physical table with internal columns.
4127        {
4128            let mut columns = vec![
4129                ColumnSchema::new(
4130                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
4131                    ConcreteDataType::uint32_datatype(),
4132                    false,
4133                ),
4134                ColumnSchema::new(
4135                    DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
4136                    ConcreteDataType::uint64_datatype(),
4137                    false,
4138                ),
4139            ];
4140            for i in 0..num_tag {
4141                columns.push(ColumnSchema::new(
4142                    format!("tag_{i}"),
4143                    ConcreteDataType::string_datatype(),
4144                    false,
4145                ));
4146            }
4147            columns.push(
4148                ColumnSchema::new(
4149                    "timestamp".to_string(),
4150                    ConcreteDataType::timestamp_millisecond_datatype(),
4151                    false,
4152                )
4153                .with_time_index(true),
4154            );
4155            for i in 0..num_field {
4156                columns.push(ColumnSchema::new(
4157                    format!("field_{i}"),
4158                    ConcreteDataType::float64_datatype(),
4159                    true,
4160                ));
4161            }
4162
4163            let schema = Arc::new(Schema::new(columns));
4164            let primary_key_indices = (0..(2 + num_tag)).collect::<Vec<_>>();
4165            let table_meta = TableMetaBuilder::empty()
4166                .schema(schema)
4167                .primary_key_indices(primary_key_indices)
4168                .value_indices((2 + num_tag..2 + num_tag + 1 + num_field).collect())
4169                .engine(METRIC_ENGINE_NAME.to_string())
4170                .next_column_id(1024)
4171                .build()
4172                .unwrap();
4173            let table_info = TableInfoBuilder::default()
4174                .table_id(physical_table_id)
4175                .name(physical_table_name)
4176                .meta(table_meta)
4177                .build()
4178                .unwrap();
4179            let table = EmptyTable::from_table_info(&table_info);
4180
4181            assert!(
4182                catalog_list
4183                    .register_table_sync(RegisterTableRequest {
4184                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4185                        schema: DEFAULT_SCHEMA_NAME.to_string(),
4186                        table_name: physical_table_name.to_string(),
4187                        table_id: physical_table_id,
4188                        table,
4189                    })
4190                    .is_ok()
4191            );
4192        }
4193
4194        // Register metric engine logical tables without `__tsid`, referencing the physical table.
4195        for (idx, (schema_name, table_name)) in table_name_tuples.iter().enumerate() {
4196            let mut columns = vec![];
4197            for i in 0..num_tag {
4198                columns.push(ColumnSchema::new(
4199                    format!("tag_{i}"),
4200                    ConcreteDataType::string_datatype(),
4201                    false,
4202                ));
4203            }
4204            columns.push(
4205                ColumnSchema::new(
4206                    "timestamp".to_string(),
4207                    ConcreteDataType::timestamp_millisecond_datatype(),
4208                    false,
4209                )
4210                .with_time_index(true),
4211            );
4212            for i in 0..num_field {
4213                columns.push(ColumnSchema::new(
4214                    format!("field_{i}"),
4215                    ConcreteDataType::float64_datatype(),
4216                    true,
4217                ));
4218            }
4219
4220            let schema = Arc::new(Schema::new(columns));
4221            let mut options = table::requests::TableOptions::default();
4222            options.extra_options.insert(
4223                LOGICAL_TABLE_METADATA_KEY.to_string(),
4224                physical_table_name.to_string(),
4225            );
4226            let table_id = 1024u32 + idx as u32;
4227            let table_meta = TableMetaBuilder::empty()
4228                .schema(schema)
4229                .primary_key_indices((0..num_tag).collect())
4230                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4231                .engine(METRIC_ENGINE_NAME.to_string())
4232                .options(options)
4233                .next_column_id(1024)
4234                .build()
4235                .unwrap();
4236            let table_info = TableInfoBuilder::default()
4237                .table_id(table_id)
4238                .name(table_name.clone())
4239                .meta(table_meta)
4240                .build()
4241                .unwrap();
4242            let table = EmptyTable::from_table_info(&table_info);
4243
4244            assert!(
4245                catalog_list
4246                    .register_table_sync(RegisterTableRequest {
4247                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4248                        schema: schema_name.clone(),
4249                        table_name: table_name.clone(),
4250                        table_id,
4251                        table,
4252                    })
4253                    .is_ok()
4254            );
4255        }
4256
4257        DfTableSourceProvider::new(
4258            catalog_list,
4259            false,
4260            QueryContext::arc(),
4261            DummyDecoder::arc(),
4262            false,
4263        )
4264    }
4265
4266    async fn build_test_table_provider_with_fields(
4267        table_name_tuples: &[(String, String)],
4268        tags: &[&str],
4269    ) -> DfTableSourceProvider {
4270        let catalog_list = MemoryCatalogManager::with_default_setup();
4271        for (schema_name, table_name) in table_name_tuples {
4272            let mut columns = vec![];
4273            let num_tag = tags.len();
4274            for tag in tags {
4275                columns.push(ColumnSchema::new(
4276                    tag.to_string(),
4277                    ConcreteDataType::string_datatype(),
4278                    false,
4279                ));
4280            }
4281            columns.push(
4282                ColumnSchema::new(
4283                    greptime_timestamp().to_string(),
4284                    ConcreteDataType::timestamp_millisecond_datatype(),
4285                    false,
4286                )
4287                .with_time_index(true),
4288            );
4289            columns.push(ColumnSchema::new(
4290                greptime_value().to_string(),
4291                ConcreteDataType::float64_datatype(),
4292                true,
4293            ));
4294            let schema = Arc::new(Schema::new(columns));
4295            let table_meta = TableMetaBuilder::empty()
4296                .schema(schema)
4297                .primary_key_indices((0..num_tag).collect())
4298                .next_column_id(1024)
4299                .build()
4300                .unwrap();
4301            let table_info = TableInfoBuilder::default()
4302                .name(table_name.clone())
4303                .meta(table_meta)
4304                .build()
4305                .unwrap();
4306            let table = EmptyTable::from_table_info(&table_info);
4307
4308            assert!(
4309                catalog_list
4310                    .register_table_sync(RegisterTableRequest {
4311                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4312                        schema: schema_name.clone(),
4313                        table_name: table_name.clone(),
4314                        table_id: 1024,
4315                        table,
4316                    })
4317                    .is_ok()
4318            );
4319        }
4320
4321        DfTableSourceProvider::new(
4322            catalog_list,
4323            false,
4324            QueryContext::arc(),
4325            DummyDecoder::arc(),
4326            false,
4327        )
4328    }
4329
4330    // {
4331    //     input: `abs(some_metric{foo!="bar"})`,
4332    //     expected: &Call{
4333    //         Func: MustGetFunction("abs"),
4334    //         Args: Expressions{
4335    //             &VectorSelector{
4336    //                 Name: "some_metric",
4337    //                 LabelMatchers: []*labels.Matcher{
4338    //                     MustLabelMatcher(labels.MatchNotEqual, "foo", "bar"),
4339    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
4340    //                 },
4341    //             },
4342    //         },
4343    //     },
4344    // },
4345    async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
4346        let prom_expr =
4347            parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
4348        let eval_stmt = EvalStmt {
4349            expr: prom_expr,
4350            start: UNIX_EPOCH,
4351            end: UNIX_EPOCH
4352                .checked_add(Duration::from_secs(100_000))
4353                .unwrap(),
4354            interval: Duration::from_secs(5),
4355            lookback_delta: Duration::from_secs(1),
4356        };
4357
4358        let table_provider = build_test_table_provider(
4359            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4360            1,
4361            1,
4362        )
4363        .await;
4364        let plan =
4365            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4366                .await
4367                .unwrap();
4368
4369        let expected = String::from(
4370            "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(ms), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4371            \n  Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(ms), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4372            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4373            \n      PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4374            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4375	            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
4376            \n            TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"
4377        ).replace("TEMPLATE", plan_name);
4378
4379        assert_eq!(plan.display_indent_schema().to_string(), expected);
4380    }
4381
4382    #[tokio::test]
4383    async fn single_abs() {
4384        do_single_instant_function_call("abs", "abs").await;
4385    }
4386
4387    #[tokio::test]
4388    #[should_panic]
4389    async fn single_absent() {
4390        do_single_instant_function_call("absent", "").await;
4391    }
4392
4393    #[tokio::test]
4394    async fn single_ceil() {
4395        do_single_instant_function_call("ceil", "ceil").await;
4396    }
4397
4398    #[tokio::test]
4399    async fn single_exp() {
4400        do_single_instant_function_call("exp", "exp").await;
4401    }
4402
4403    #[tokio::test]
4404    async fn single_ln() {
4405        do_single_instant_function_call("ln", "ln").await;
4406    }
4407
4408    #[tokio::test]
4409    async fn single_log2() {
4410        do_single_instant_function_call("log2", "log2").await;
4411    }
4412
4413    #[tokio::test]
4414    async fn single_log10() {
4415        do_single_instant_function_call("log10", "log10").await;
4416    }
4417
4418    #[tokio::test]
4419    #[should_panic]
4420    async fn single_scalar() {
4421        do_single_instant_function_call("scalar", "").await;
4422    }
4423
4424    #[tokio::test]
4425    #[should_panic]
4426    async fn single_sgn() {
4427        do_single_instant_function_call("sgn", "").await;
4428    }
4429
4430    #[tokio::test]
4431    #[should_panic]
4432    async fn single_sort() {
4433        do_single_instant_function_call("sort", "").await;
4434    }
4435
4436    #[tokio::test]
4437    #[should_panic]
4438    async fn single_sort_desc() {
4439        do_single_instant_function_call("sort_desc", "").await;
4440    }
4441
4442    #[tokio::test]
4443    async fn single_sqrt() {
4444        do_single_instant_function_call("sqrt", "sqrt").await;
4445    }
4446
4447    #[tokio::test]
4448    #[should_panic]
4449    async fn single_timestamp() {
4450        do_single_instant_function_call("timestamp", "").await;
4451    }
4452
4453    #[tokio::test]
4454    async fn single_acos() {
4455        do_single_instant_function_call("acos", "acos").await;
4456    }
4457
4458    #[tokio::test]
4459    #[should_panic]
4460    async fn single_acosh() {
4461        do_single_instant_function_call("acosh", "").await;
4462    }
4463
4464    #[tokio::test]
4465    async fn single_asin() {
4466        do_single_instant_function_call("asin", "asin").await;
4467    }
4468
4469    #[tokio::test]
4470    #[should_panic]
4471    async fn single_asinh() {
4472        do_single_instant_function_call("asinh", "").await;
4473    }
4474
4475    #[tokio::test]
4476    async fn single_atan() {
4477        do_single_instant_function_call("atan", "atan").await;
4478    }
4479
4480    #[tokio::test]
4481    #[should_panic]
4482    async fn single_atanh() {
4483        do_single_instant_function_call("atanh", "").await;
4484    }
4485
4486    #[tokio::test]
4487    async fn single_cos() {
4488        do_single_instant_function_call("cos", "cos").await;
4489    }
4490
4491    #[tokio::test]
4492    #[should_panic]
4493    async fn single_cosh() {
4494        do_single_instant_function_call("cosh", "").await;
4495    }
4496
4497    #[tokio::test]
4498    async fn single_sin() {
4499        do_single_instant_function_call("sin", "sin").await;
4500    }
4501
4502    #[tokio::test]
4503    #[should_panic]
4504    async fn single_sinh() {
4505        do_single_instant_function_call("sinh", "").await;
4506    }
4507
4508    #[tokio::test]
4509    async fn single_tan() {
4510        do_single_instant_function_call("tan", "tan").await;
4511    }
4512
4513    #[tokio::test]
4514    #[should_panic]
4515    async fn single_tanh() {
4516        do_single_instant_function_call("tanh", "").await;
4517    }
4518
4519    #[tokio::test]
4520    #[should_panic]
4521    async fn single_deg() {
4522        do_single_instant_function_call("deg", "").await;
4523    }
4524
4525    #[tokio::test]
4526    #[should_panic]
4527    async fn single_rad() {
4528        do_single_instant_function_call("rad", "").await;
4529    }
4530
4531    // {
4532    //     input: "avg by (foo)(some_metric)",
4533    //     expected: &AggregateExpr{
4534    //         Op: AVG,
4535    //         Expr: &VectorSelector{
4536    //             Name: "some_metric",
4537    //             LabelMatchers: []*labels.Matcher{
4538    //                 MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
4539    //             },
4540    //             PosRange: PositionRange{
4541    //                 Start: 13,
4542    //                 End:   24,
4543    //             },
4544    //         },
4545    //         Grouping: []string{"foo"},
4546    //         PosRange: PositionRange{
4547    //             Start: 0,
4548    //             End:   25,
4549    //         },
4550    //     },
4551    // },
4552    async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
4553        let prom_expr = parser::parse(&format!(
4554            "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
4555        ))
4556        .unwrap();
4557        let mut eval_stmt = EvalStmt {
4558            expr: prom_expr,
4559            start: UNIX_EPOCH,
4560            end: UNIX_EPOCH
4561                .checked_add(Duration::from_secs(100_000))
4562                .unwrap(),
4563            interval: Duration::from_secs(5),
4564            lookback_delta: Duration::from_secs(1),
4565        };
4566
4567        // test group by
4568        let table_provider = build_test_table_provider(
4569            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4570            2,
4571            2,
4572        )
4573        .await;
4574        let plan =
4575            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4576                .await
4577                .unwrap();
4578        let expected_no_without = String::from(
4579            "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4580            \n  Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4581            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4582            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4583            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4584            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4585            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]"
4586        ).replace("TEMPLATE", plan_name);
4587        assert_eq!(
4588            plan.display_indent_schema().to_string(),
4589            expected_no_without
4590        );
4591
4592        // test group without
4593        if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
4594            *modifier = Some(LabelModifier::Exclude(Labels {
4595                labels: vec![String::from("tag_1")].into_iter().collect(),
4596            }));
4597        }
4598        let table_provider = build_test_table_provider(
4599            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4600            2,
4601            2,
4602        )
4603        .await;
4604        let plan =
4605            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4606                .await
4607                .unwrap();
4608        let expected_without = String::from(
4609            "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4610            \n  Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(ms), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4611            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4612            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4613            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4614            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]\
4615            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N]"
4616        ).replace("TEMPLATE", plan_name);
4617        assert_eq!(plan.display_indent_schema().to_string(), expected_without);
4618    }
4619
4620    #[tokio::test]
4621    async fn aggregate_sum() {
4622        do_aggregate_expr_plan("sum", "sum").await;
4623    }
4624
4625    #[tokio::test]
4626    async fn tsid_is_used_for_series_divide_when_available() {
4627        let prom_expr = parser::parse("some_metric").unwrap();
4628        let eval_stmt = EvalStmt {
4629            expr: prom_expr,
4630            start: UNIX_EPOCH,
4631            end: UNIX_EPOCH
4632                .checked_add(Duration::from_secs(100_000))
4633                .unwrap(),
4634            interval: Duration::from_secs(5),
4635            lookback_delta: Duration::from_secs(1),
4636        };
4637
4638        let table_provider = build_test_table_provider_with_tsid(
4639            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4640            1,
4641            1,
4642        )
4643        .await;
4644        let plan =
4645            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4646                .await
4647                .unwrap();
4648
4649        let plan_str = plan.display_indent_schema().to_string();
4650        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4651        assert!(plan_str.contains("__tsid ASC NULLS FIRST"));
4652        assert!(
4653            !plan
4654                .schema()
4655                .fields()
4656                .iter()
4657                .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4658        );
4659    }
4660
4661    #[tokio::test]
4662    async fn physical_table_name_is_not_leaked_in_plan() {
4663        let prom_expr = parser::parse("some_metric").unwrap();
4664        let eval_stmt = EvalStmt {
4665            expr: prom_expr,
4666            start: UNIX_EPOCH,
4667            end: UNIX_EPOCH
4668                .checked_add(Duration::from_secs(100_000))
4669                .unwrap(),
4670            interval: Duration::from_secs(5),
4671            lookback_delta: Duration::from_secs(1),
4672        };
4673
4674        let table_provider = build_test_table_provider_with_tsid(
4675            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4676            1,
4677            1,
4678        )
4679        .await;
4680        let plan =
4681            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4682                .await
4683                .unwrap();
4684
4685        let plan_str = plan.display_indent_schema().to_string();
4686        assert!(plan_str.contains("TableScan: phy"), "{plan}");
4687        assert!(plan_str.contains("SubqueryAlias: some_metric"));
4688        assert!(plan_str.contains("Filter: phy.__table_id = UInt32(1024)"));
4689        assert!(!plan_str.contains("TableScan: some_metric"));
4690    }
4691
4692    #[tokio::test]
4693    async fn sum_without_does_not_group_by_tsid() {
4694        let prom_expr = parser::parse("sum without (tag_0) (some_metric)").unwrap();
4695        let eval_stmt = EvalStmt {
4696            expr: prom_expr,
4697            start: UNIX_EPOCH,
4698            end: UNIX_EPOCH
4699                .checked_add(Duration::from_secs(100_000))
4700                .unwrap(),
4701            interval: Duration::from_secs(5),
4702            lookback_delta: Duration::from_secs(1),
4703        };
4704
4705        let table_provider = build_test_table_provider_with_tsid(
4706            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4707            1,
4708            1,
4709        )
4710        .await;
4711        let plan =
4712            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4713                .await
4714                .unwrap();
4715
4716        let plan_str = plan.display_indent_schema().to_string();
4717        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4718
4719        let aggr_line = plan_str
4720            .lines()
4721            .find(|line| line.contains("Aggregate: groupBy="))
4722            .unwrap();
4723        assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4724    }
4725
4726    #[tokio::test]
4727    async fn topk_without_does_not_partition_by_tsid() {
4728        let prom_expr = parser::parse("topk without (tag_0) (1, some_metric)").unwrap();
4729        let eval_stmt = EvalStmt {
4730            expr: prom_expr,
4731            start: UNIX_EPOCH,
4732            end: UNIX_EPOCH
4733                .checked_add(Duration::from_secs(100_000))
4734                .unwrap(),
4735            interval: Duration::from_secs(5),
4736            lookback_delta: Duration::from_secs(1),
4737        };
4738
4739        let table_provider = build_test_table_provider_with_tsid(
4740            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4741            1,
4742            1,
4743        )
4744        .await;
4745        let plan =
4746            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4747                .await
4748                .unwrap();
4749
4750        let plan_str = plan.display_indent_schema().to_string();
4751        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4752
4753        let window_line = plan_str
4754            .lines()
4755            .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
4756            .unwrap();
4757        let partition_by = window_line
4758            .split("PARTITION BY [")
4759            .nth(1)
4760            .and_then(|s| s.split("] ORDER BY").next())
4761            .unwrap();
4762        assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4763    }
4764
4765    #[tokio::test]
4766    async fn sum_by_does_not_group_by_tsid() {
4767        let prom_expr = parser::parse("sum by (__tsid) (some_metric)").unwrap();
4768        let eval_stmt = EvalStmt {
4769            expr: prom_expr,
4770            start: UNIX_EPOCH,
4771            end: UNIX_EPOCH
4772                .checked_add(Duration::from_secs(100_000))
4773                .unwrap(),
4774            interval: Duration::from_secs(5),
4775            lookback_delta: Duration::from_secs(1),
4776        };
4777
4778        let table_provider = build_test_table_provider_with_tsid(
4779            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4780            1,
4781            1,
4782        )
4783        .await;
4784        let plan =
4785            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4786                .await
4787                .unwrap();
4788
4789        let plan_str = plan.display_indent_schema().to_string();
4790        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4791
4792        let aggr_line = plan_str
4793            .lines()
4794            .find(|line| line.contains("Aggregate: groupBy="))
4795            .unwrap();
4796        assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4797    }
4798
4799    #[tokio::test]
4800    async fn topk_by_does_not_partition_by_tsid() {
4801        let prom_expr = parser::parse("topk by (__tsid) (1, some_metric)").unwrap();
4802        let eval_stmt = EvalStmt {
4803            expr: prom_expr,
4804            start: UNIX_EPOCH,
4805            end: UNIX_EPOCH
4806                .checked_add(Duration::from_secs(100_000))
4807                .unwrap(),
4808            interval: Duration::from_secs(5),
4809            lookback_delta: Duration::from_secs(1),
4810        };
4811
4812        let table_provider = build_test_table_provider_with_tsid(
4813            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4814            1,
4815            1,
4816        )
4817        .await;
4818        let plan =
4819            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4820                .await
4821                .unwrap();
4822
4823        let plan_str = plan.display_indent_schema().to_string();
4824        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4825
4826        let window_line = plan_str
4827            .lines()
4828            .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
4829            .unwrap();
4830        let partition_by = window_line
4831            .split("PARTITION BY [")
4832            .nth(1)
4833            .and_then(|s| s.split("] ORDER BY").next())
4834            .unwrap();
4835        assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4836    }
4837
4838    #[tokio::test]
4839    async fn selector_matcher_on_tsid_does_not_use_internal_column() {
4840        let prom_expr = parser::parse(r#"some_metric{__tsid="123"}"#).unwrap();
4841        let eval_stmt = EvalStmt {
4842            expr: prom_expr,
4843            start: UNIX_EPOCH,
4844            end: UNIX_EPOCH
4845                .checked_add(Duration::from_secs(100_000))
4846                .unwrap(),
4847            interval: Duration::from_secs(5),
4848            lookback_delta: Duration::from_secs(1),
4849        };
4850
4851        let table_provider = build_test_table_provider_with_tsid(
4852            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4853            1,
4854            1,
4855        )
4856        .await;
4857        let plan =
4858            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4859                .await
4860                .unwrap();
4861
4862        fn collect_filter_cols(plan: &LogicalPlan, out: &mut HashSet<Column>) {
4863            if let LogicalPlan::Filter(filter) = plan {
4864                datafusion_expr::utils::expr_to_columns(&filter.predicate, out).unwrap();
4865            }
4866            for input in plan.inputs() {
4867                collect_filter_cols(input, out);
4868            }
4869        }
4870
4871        let mut filter_cols = HashSet::new();
4872        collect_filter_cols(&plan, &mut filter_cols);
4873        assert!(
4874            !filter_cols
4875                .iter()
4876                .any(|c| c.name == DATA_SCHEMA_TSID_COLUMN_NAME)
4877        );
4878    }
4879
4880    #[tokio::test]
4881    async fn tsid_is_not_used_when_physical_table_is_missing() {
4882        let prom_expr = parser::parse("some_metric").unwrap();
4883        let eval_stmt = EvalStmt {
4884            expr: prom_expr,
4885            start: UNIX_EPOCH,
4886            end: UNIX_EPOCH
4887                .checked_add(Duration::from_secs(100_000))
4888                .unwrap(),
4889            interval: Duration::from_secs(5),
4890            lookback_delta: Duration::from_secs(1),
4891        };
4892
4893        let catalog_list = MemoryCatalogManager::with_default_setup();
4894
4895        // Register a metric engine logical table referencing a missing physical table.
4896        let mut columns = vec![ColumnSchema::new(
4897            "tag_0".to_string(),
4898            ConcreteDataType::string_datatype(),
4899            false,
4900        )];
4901        columns.push(
4902            ColumnSchema::new(
4903                "timestamp".to_string(),
4904                ConcreteDataType::timestamp_millisecond_datatype(),
4905                false,
4906            )
4907            .with_time_index(true),
4908        );
4909        columns.push(ColumnSchema::new(
4910            "field_0".to_string(),
4911            ConcreteDataType::float64_datatype(),
4912            true,
4913        ));
4914        let schema = Arc::new(Schema::new(columns));
4915        let mut options = table::requests::TableOptions::default();
4916        options
4917            .extra_options
4918            .insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "phy".to_string());
4919        let table_meta = TableMetaBuilder::empty()
4920            .schema(schema)
4921            .primary_key_indices(vec![0])
4922            .value_indices(vec![2])
4923            .engine(METRIC_ENGINE_NAME.to_string())
4924            .options(options)
4925            .next_column_id(1024)
4926            .build()
4927            .unwrap();
4928        let table_info = TableInfoBuilder::default()
4929            .table_id(1024)
4930            .name("some_metric")
4931            .meta(table_meta)
4932            .build()
4933            .unwrap();
4934        let table = EmptyTable::from_table_info(&table_info);
4935        catalog_list
4936            .register_table_sync(RegisterTableRequest {
4937                catalog: DEFAULT_CATALOG_NAME.to_string(),
4938                schema: DEFAULT_SCHEMA_NAME.to_string(),
4939                table_name: "some_metric".to_string(),
4940                table_id: 1024,
4941                table,
4942            })
4943            .unwrap();
4944
4945        let table_provider = DfTableSourceProvider::new(
4946            catalog_list,
4947            false,
4948            QueryContext::arc(),
4949            DummyDecoder::arc(),
4950            false,
4951        );
4952
4953        let plan =
4954            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4955                .await
4956                .unwrap();
4957
4958        let plan_str = plan.display_indent_schema().to_string();
4959        assert!(plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
4960        assert!(!plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4961    }
4962
4963    #[tokio::test]
4964    async fn tsid_is_carried_only_when_aggregate_preserves_label_set() {
4965        let prom_expr = parser::parse("sum by (tag_0) (some_metric)").unwrap();
4966        let eval_stmt = EvalStmt {
4967            expr: prom_expr,
4968            start: UNIX_EPOCH,
4969            end: UNIX_EPOCH
4970                .checked_add(Duration::from_secs(100_000))
4971                .unwrap(),
4972            interval: Duration::from_secs(5),
4973            lookback_delta: Duration::from_secs(1),
4974        };
4975
4976        let table_provider = build_test_table_provider_with_tsid(
4977            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4978            1,
4979            1,
4980        )
4981        .await;
4982        let plan =
4983            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4984                .await
4985                .unwrap();
4986
4987        let plan_str = plan.display_indent_schema().to_string();
4988        assert!(plan_str.contains("first_value") && plan_str.contains("__tsid"));
4989        assert!(
4990            !plan
4991                .schema()
4992                .fields()
4993                .iter()
4994                .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4995        );
4996
4997        // Merging aggregate: label set is reduced, tsid should not be carried.
4998        let prom_expr = parser::parse("sum(some_metric)").unwrap();
4999        let eval_stmt = EvalStmt {
5000            expr: prom_expr,
5001            start: UNIX_EPOCH,
5002            end: UNIX_EPOCH
5003                .checked_add(Duration::from_secs(100_000))
5004                .unwrap(),
5005            interval: Duration::from_secs(5),
5006            lookback_delta: Duration::from_secs(1),
5007        };
5008        let table_provider = build_test_table_provider_with_tsid(
5009            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5010            1,
5011            1,
5012        )
5013        .await;
5014        let plan =
5015            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5016                .await
5017                .unwrap();
5018        let plan_str = plan.display_indent_schema().to_string();
5019        assert!(!plan_str.contains("first_value"));
5020    }
5021
5022    #[tokio::test]
5023    async fn or_operator_with_unknown_metric_does_not_require_tsid() {
5024        let prom_expr = parser::parse("unknown_metric or some_metric").unwrap();
5025        let eval_stmt = EvalStmt {
5026            expr: prom_expr,
5027            start: UNIX_EPOCH,
5028            end: UNIX_EPOCH
5029                .checked_add(Duration::from_secs(100_000))
5030                .unwrap(),
5031            interval: Duration::from_secs(5),
5032            lookback_delta: Duration::from_secs(1),
5033        };
5034
5035        let table_provider = build_test_table_provider_with_tsid(
5036            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5037            1,
5038            1,
5039        )
5040        .await;
5041
5042        let plan =
5043            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5044                .await
5045                .unwrap();
5046
5047        assert!(
5048            !plan
5049                .schema()
5050                .fields()
5051                .iter()
5052                .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
5053        );
5054    }
5055
5056    #[tokio::test]
5057    async fn aggregate_avg() {
5058        do_aggregate_expr_plan("avg", "avg").await;
5059    }
5060
5061    #[tokio::test]
5062    #[should_panic] // output type doesn't match
5063    async fn aggregate_count() {
5064        do_aggregate_expr_plan("count", "count").await;
5065    }
5066
5067    #[tokio::test]
5068    async fn aggregate_min() {
5069        do_aggregate_expr_plan("min", "min").await;
5070    }
5071
5072    #[tokio::test]
5073    async fn aggregate_max() {
5074        do_aggregate_expr_plan("max", "max").await;
5075    }
5076
5077    #[tokio::test]
5078    async fn aggregate_group() {
5079        // Regression test for `group()` aggregator.
5080        // PromQL: sum(group by (cluster)(kubernetes_build_info{service="kubernetes",job="apiserver"}))
5081        // should be plannable, and `group()` should produce constant 1 for each group.
5082        let prom_expr = parser::parse(
5083            "sum(group by (cluster)(kubernetes_build_info{service=\"kubernetes\",job=\"apiserver\"}))",
5084        )
5085        .unwrap();
5086        let eval_stmt = EvalStmt {
5087            expr: prom_expr,
5088            start: UNIX_EPOCH,
5089            end: UNIX_EPOCH
5090                .checked_add(Duration::from_secs(100_000))
5091                .unwrap(),
5092            interval: Duration::from_secs(5),
5093            lookback_delta: Duration::from_secs(1),
5094        };
5095
5096        let table_provider = build_test_table_provider_with_fields(
5097            &[(
5098                DEFAULT_SCHEMA_NAME.to_string(),
5099                "kubernetes_build_info".to_string(),
5100            )],
5101            &["cluster", "service", "job"],
5102        )
5103        .await;
5104        let plan =
5105            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5106                .await
5107                .unwrap();
5108
5109        let plan_str = plan.display_indent_schema().to_string();
5110        assert!(plan_str.contains("max(Float64(1"));
5111    }
5112
5113    #[tokio::test]
5114    async fn aggregate_stddev() {
5115        do_aggregate_expr_plan("stddev", "stddev_pop").await;
5116    }
5117
5118    #[tokio::test]
5119    async fn aggregate_stdvar() {
5120        do_aggregate_expr_plan("stdvar", "var_pop").await;
5121    }
5122
5123    // TODO(ruihang): add range fn tests once exprs are ready.
5124
5125    // {
5126    //     input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}",
5127    //     expected: &BinaryExpr{
5128    //         Op: ADD,
5129    //         LHS: &VectorSelector{
5130    //             Name: "a",
5131    //             LabelMatchers: []*labels.Matcher{
5132    //                     MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"),
5133    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
5134    //             },
5135    //         },
5136    //         RHS: &VectorSelector{
5137    //             Name: "sum",
5138    //             LabelMatchers: []*labels.Matcher{
5139    //                     MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"),
5140    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
5141    //             },
5142    //         },
5143    //         VectorMatching: &VectorMatching{},
5144    //     },
5145    // },
5146    #[tokio::test]
5147    async fn binary_op_column_column() {
5148        let prom_expr =
5149            parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
5150        let eval_stmt = EvalStmt {
5151            expr: prom_expr,
5152            start: UNIX_EPOCH,
5153            end: UNIX_EPOCH
5154                .checked_add(Duration::from_secs(100_000))
5155                .unwrap(),
5156            interval: Duration::from_secs(5),
5157            lookback_delta: Duration::from_secs(1),
5158        };
5159
5160        let table_provider = build_test_table_provider(
5161            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5162            1,
5163            1,
5164        )
5165        .await;
5166        let plan =
5167            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5168                .await
5169                .unwrap();
5170
5171        let expected = String::from(
5172            "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), lhs.field_0 + rhs.field_0:Float64;N]\
5173            \n  Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5174            \n    SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5175            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5176            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5177            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5178            \n            Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5179            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5180            \n    SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5181            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5182            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5183            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5184            \n            Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5185            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5186        );
5187
5188        assert_eq!(plan.display_indent_schema().to_string(), expected);
5189    }
5190
5191    async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
5192        let prom_expr = parser::parse(query).unwrap();
5193        let eval_stmt = EvalStmt {
5194            expr: prom_expr,
5195            start: UNIX_EPOCH,
5196            end: UNIX_EPOCH
5197                .checked_add(Duration::from_secs(100_000))
5198                .unwrap(),
5199            interval: Duration::from_secs(5),
5200            lookback_delta: Duration::from_secs(1),
5201        };
5202
5203        let table_provider = build_test_table_provider(
5204            &[
5205                (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5206                (
5207                    "greptime_private".to_string(),
5208                    "some_alt_metric".to_string(),
5209                ),
5210            ],
5211            1,
5212            1,
5213        )
5214        .await;
5215        let plan =
5216            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5217                .await
5218                .unwrap();
5219
5220        assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
5221    }
5222
5223    #[tokio::test]
5224    async fn binary_op_literal_column() {
5225        let query = r#"1 + some_metric{tag_0="bar"}"#;
5226        let expected = String::from(
5227            "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(ms), Float64(1) + field_0:Float64;N]\
5228            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5229            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5230            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5231            \n        Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5232            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5233        );
5234
5235        indie_query_plan_compare(query, expected).await;
5236    }
5237
5238    #[tokio::test]
5239    async fn binary_op_literal_literal() {
5240        let query = r#"1 + 1"#;
5241        let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(ms), value:Float64;N]
5242  TableScan: dummy [time:Timestamp(ms), value:Float64;N]"#;
5243        indie_query_plan_compare(query, expected).await;
5244    }
5245
5246    #[tokio::test]
5247    async fn simple_bool_grammar() {
5248        let query = "some_metric != bool 1.2345";
5249        let expected = String::from(
5250            "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(ms), field_0 != Float64(1.2345):Float64;N]\
5251            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5252            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5253            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5254            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5255            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5256        );
5257
5258        indie_query_plan_compare(query, expected).await;
5259    }
5260
5261    #[tokio::test]
5262    async fn bool_with_additional_arithmetic() {
5263        let query = "some_metric + (1 == bool 2)";
5264        let expected = String::from(
5265            "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(ms), field_0 + Float64(1) = Float64(2):Float64;N]\
5266            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5267            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5268            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5269            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5270            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5271        );
5272
5273        indie_query_plan_compare(query, expected).await;
5274    }
5275
5276    #[tokio::test]
5277    async fn simple_unary() {
5278        let query = "-some_metric";
5279        let expected = String::from(
5280            "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(ms), (- field_0):Float64;N]\
5281            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5282            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5283            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5284            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5285            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5286        );
5287
5288        indie_query_plan_compare(query, expected).await;
5289    }
5290
5291    #[tokio::test]
5292    async fn increase_aggr() {
5293        let query = "increase(some_metric[5m])";
5294        let expected = String::from(
5295            "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(ms), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5296            \n  Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(ms), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5297            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(ms))]\
5298            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5299            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5300            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5301            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5302            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5303        );
5304
5305        indie_query_plan_compare(query, expected).await;
5306    }
5307
5308    #[tokio::test]
5309    async fn less_filter_on_value() {
5310        let query = "some_metric < 1.2345";
5311        let expected = String::from(
5312            "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5313            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5314            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5315            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5316            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5317            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5318        );
5319
5320        indie_query_plan_compare(query, expected).await;
5321    }
5322
5323    #[tokio::test]
5324    async fn count_over_time() {
5325        let query = "count_over_time(some_metric[5m])";
5326        let expected = String::from(
5327            "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5328            \n  Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(ms), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5329            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(ms))]\
5330            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5331            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5332            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5333            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-299999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5334            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5335        );
5336
5337        indie_query_plan_compare(query, expected).await;
5338    }
5339
5340    #[tokio::test]
5341    async fn test_hash_join() {
5342        let mut eval_stmt = EvalStmt {
5343            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5344            start: UNIX_EPOCH,
5345            end: UNIX_EPOCH
5346                .checked_add(Duration::from_secs(100_000))
5347                .unwrap(),
5348            interval: Duration::from_secs(5),
5349            lookback_delta: Duration::from_secs(1),
5350        };
5351
5352        let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
5353
5354        let prom_expr = parser::parse(case).unwrap();
5355        eval_stmt.expr = prom_expr;
5356        let table_provider = build_test_table_provider_with_fields(
5357            &[
5358                (
5359                    DEFAULT_SCHEMA_NAME.to_string(),
5360                    "http_server_requests_seconds_sum".to_string(),
5361                ),
5362                (
5363                    DEFAULT_SCHEMA_NAME.to_string(),
5364                    "http_server_requests_seconds_count".to_string(),
5365                ),
5366            ],
5367            &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
5368        )
5369        .await;
5370        // Should be ok
5371        let plan =
5372            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5373                .await
5374                .unwrap();
5375        let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
5376            \n  Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
5377            \n    SubqueryAlias: http_server_requests_seconds_sum\
5378            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5379            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5380            \n          Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
5381            \n            Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5382            \n              TableScan: http_server_requests_seconds_sum\
5383            \n    SubqueryAlias: http_server_requests_seconds_count\
5384            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5385            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5386            \n          Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
5387            \n            Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-999, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100000000, None)\
5388            \n              TableScan: http_server_requests_seconds_count";
5389        assert_eq!(plan.to_string(), expected);
5390    }
5391
5392    #[tokio::test]
5393    async fn test_nested_histogram_quantile() {
5394        let mut eval_stmt = EvalStmt {
5395            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5396            start: UNIX_EPOCH,
5397            end: UNIX_EPOCH
5398                .checked_add(Duration::from_secs(100_000))
5399                .unwrap(),
5400            interval: Duration::from_secs(5),
5401            lookback_delta: Duration::from_secs(1),
5402        };
5403
5404        let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
5405
5406        let prom_expr = parser::parse(case).unwrap();
5407        eval_stmt.expr = prom_expr;
5408        let table_provider = build_test_table_provider_with_fields(
5409            &[(
5410                DEFAULT_SCHEMA_NAME.to_string(),
5411                "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
5412            )],
5413            &["pod", "le", "path", "code", "container"],
5414        )
5415        .await;
5416        // Should be ok
5417        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5418            .await
5419            .unwrap();
5420    }
5421
5422    #[tokio::test]
5423    async fn test_parse_and_operator() {
5424        let mut eval_stmt = EvalStmt {
5425            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5426            start: UNIX_EPOCH,
5427            end: UNIX_EPOCH
5428                .checked_add(Duration::from_secs(100_000))
5429                .unwrap(),
5430            interval: Duration::from_secs(5),
5431            lookback_delta: Duration::from_secs(1),
5432        };
5433
5434        let cases = [
5435            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5436            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5437        ];
5438
5439        for case in cases {
5440            let prom_expr = parser::parse(case).unwrap();
5441            eval_stmt.expr = prom_expr;
5442            let table_provider = build_test_table_provider_with_fields(
5443                &[
5444                    (
5445                        DEFAULT_SCHEMA_NAME.to_string(),
5446                        "kubelet_volume_stats_used_bytes".to_string(),
5447                    ),
5448                    (
5449                        DEFAULT_SCHEMA_NAME.to_string(),
5450                        "kubelet_volume_stats_capacity_bytes".to_string(),
5451                    ),
5452                ],
5453                &["namespace", "persistentvolumeclaim"],
5454            )
5455            .await;
5456            // Should be ok
5457            let _ =
5458                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5459                    .await
5460                    .unwrap();
5461        }
5462    }
5463
5464    #[tokio::test]
5465    async fn test_nested_binary_op() {
5466        let mut eval_stmt = EvalStmt {
5467            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5468            start: UNIX_EPOCH,
5469            end: UNIX_EPOCH
5470                .checked_add(Duration::from_secs(100_000))
5471                .unwrap(),
5472            interval: Duration::from_secs(5),
5473            lookback_delta: Duration::from_secs(1),
5474        };
5475
5476        let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
5477        (
5478            sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
5479            or
5480            vector(0)
5481        )"#;
5482
5483        let prom_expr = parser::parse(case).unwrap();
5484        eval_stmt.expr = prom_expr;
5485        let table_provider = build_test_table_provider_with_fields(
5486            &[(
5487                DEFAULT_SCHEMA_NAME.to_string(),
5488                "nginx_ingress_controller_requests".to_string(),
5489            )],
5490            &["namespace", "job"],
5491        )
5492        .await;
5493        // Should be ok
5494        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5495            .await
5496            .unwrap();
5497    }
5498
5499    #[tokio::test]
5500    async fn test_parse_or_operator() {
5501        let mut eval_stmt = EvalStmt {
5502            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5503            start: UNIX_EPOCH,
5504            end: UNIX_EPOCH
5505                .checked_add(Duration::from_secs(100_000))
5506                .unwrap(),
5507            interval: Duration::from_secs(5),
5508            lookback_delta: Duration::from_secs(1),
5509        };
5510
5511        let case = r#"
5512        sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
5513        (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
5514            or
5515        200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
5516        sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
5517
5518        let table_provider = build_test_table_provider_with_fields(
5519            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5520            &["tenant_name", "cluster_name"],
5521        )
5522        .await;
5523        eval_stmt.expr = parser::parse(case).unwrap();
5524        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5525            .await
5526            .unwrap();
5527
5528        let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5529            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
5530            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5531            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5532            or
5533            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5534            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5535            or
5536            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5537            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
5538        let table_provider = build_test_table_provider_with_fields(
5539            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5540            &["tenant_name", "cluster_name"],
5541        )
5542        .await;
5543        eval_stmt.expr = parser::parse(case).unwrap();
5544        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5545            .await
5546            .unwrap();
5547
5548        let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
5549            sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5550            (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5551            (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
5552        let table_provider = build_test_table_provider_with_fields(
5553            &[
5554                (
5555                    DEFAULT_SCHEMA_NAME.to_string(),
5556                    "background_waitevent_cnt".to_string(),
5557                ),
5558                (
5559                    DEFAULT_SCHEMA_NAME.to_string(),
5560                    "foreground_waitevent_cnt".to_string(),
5561                ),
5562            ],
5563            &["tenant_name", "cluster_name"],
5564        )
5565        .await;
5566        eval_stmt.expr = parser::parse(case).unwrap();
5567        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5568            .await
5569            .unwrap();
5570
5571        let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
5572        let table_provider = build_test_table_provider_with_fields(
5573            &[
5574                (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
5575                (
5576                    DEFAULT_SCHEMA_NAME.to_string(),
5577                    "container_cpu_load_average_10s".to_string(),
5578                ),
5579                (
5580                    DEFAULT_SCHEMA_NAME.to_string(),
5581                    "container_spec_cpu_quota".to_string(),
5582                ),
5583            ],
5584            &["cluster_name", "host_name"],
5585        )
5586        .await;
5587        eval_stmt.expr = parser::parse(case).unwrap();
5588        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5589            .await
5590            .unwrap();
5591    }
5592
5593    #[tokio::test]
5594    async fn value_matcher() {
5595        // template
5596        let mut eval_stmt = EvalStmt {
5597            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5598            start: UNIX_EPOCH,
5599            end: UNIX_EPOCH
5600                .checked_add(Duration::from_secs(100_000))
5601                .unwrap(),
5602            interval: Duration::from_secs(5),
5603            lookback_delta: Duration::from_secs(1),
5604        };
5605
5606        let cases = [
5607            // single equal matcher
5608            (
5609                r#"some_metric{__field__="field_1"}"#,
5610                vec![
5611                    "some_metric.field_1",
5612                    "some_metric.tag_0",
5613                    "some_metric.tag_1",
5614                    "some_metric.tag_2",
5615                    "some_metric.timestamp",
5616                ],
5617            ),
5618            // two equal matchers
5619            (
5620                r#"some_metric{__field__="field_1", __field__="field_0"}"#,
5621                vec![
5622                    "some_metric.field_0",
5623                    "some_metric.field_1",
5624                    "some_metric.tag_0",
5625                    "some_metric.tag_1",
5626                    "some_metric.tag_2",
5627                    "some_metric.timestamp",
5628                ],
5629            ),
5630            // single not_eq matcher
5631            (
5632                r#"some_metric{__field__!="field_1"}"#,
5633                vec![
5634                    "some_metric.field_0",
5635                    "some_metric.field_2",
5636                    "some_metric.tag_0",
5637                    "some_metric.tag_1",
5638                    "some_metric.tag_2",
5639                    "some_metric.timestamp",
5640                ],
5641            ),
5642            // two not_eq matchers
5643            (
5644                r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
5645                vec![
5646                    "some_metric.field_0",
5647                    "some_metric.tag_0",
5648                    "some_metric.tag_1",
5649                    "some_metric.tag_2",
5650                    "some_metric.timestamp",
5651                ],
5652            ),
5653            // equal and not_eq matchers (no conflict)
5654            (
5655                r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
5656                vec![
5657                    "some_metric.field_1",
5658                    "some_metric.tag_0",
5659                    "some_metric.tag_1",
5660                    "some_metric.tag_2",
5661                    "some_metric.timestamp",
5662                ],
5663            ),
5664            // equal and not_eq matchers (conflict)
5665            (
5666                r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
5667                vec![
5668                    "some_metric.tag_0",
5669                    "some_metric.tag_1",
5670                    "some_metric.tag_2",
5671                    "some_metric.timestamp",
5672                ],
5673            ),
5674            // single regex eq matcher
5675            (
5676                r#"some_metric{__field__=~"field_1|field_2"}"#,
5677                vec![
5678                    "some_metric.field_1",
5679                    "some_metric.field_2",
5680                    "some_metric.tag_0",
5681                    "some_metric.tag_1",
5682                    "some_metric.tag_2",
5683                    "some_metric.timestamp",
5684                ],
5685            ),
5686            // single regex not_eq matcher
5687            (
5688                r#"some_metric{__field__!~"field_1|field_2"}"#,
5689                vec![
5690                    "some_metric.field_0",
5691                    "some_metric.tag_0",
5692                    "some_metric.tag_1",
5693                    "some_metric.tag_2",
5694                    "some_metric.timestamp",
5695                ],
5696            ),
5697        ];
5698
5699        for case in cases {
5700            let prom_expr = parser::parse(case.0).unwrap();
5701            eval_stmt.expr = prom_expr;
5702            let table_provider = build_test_table_provider(
5703                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5704                3,
5705                3,
5706            )
5707            .await;
5708            let plan =
5709                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5710                    .await
5711                    .unwrap();
5712            let mut fields = plan.schema().field_names();
5713            let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
5714            fields.sort();
5715            expected.sort();
5716            assert_eq!(fields, expected, "case: {:?}", case.0);
5717        }
5718
5719        let bad_cases = [
5720            r#"some_metric{__field__="nonexistent"}"#,
5721            r#"some_metric{__field__!="nonexistent"}"#,
5722        ];
5723
5724        for case in bad_cases {
5725            let prom_expr = parser::parse(case).unwrap();
5726            eval_stmt.expr = prom_expr;
5727            let table_provider = build_test_table_provider(
5728                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5729                3,
5730                3,
5731            )
5732            .await;
5733            let plan =
5734                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5735                    .await;
5736            assert!(plan.is_err(), "case: {:?}", case);
5737        }
5738    }
5739
5740    #[tokio::test]
5741    async fn custom_schema() {
5742        let query = "some_alt_metric{__schema__=\"greptime_private\"}";
5743        let expected = String::from(
5744            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5745            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5746            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5747            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5748            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5749        );
5750
5751        indie_query_plan_compare(query, expected).await;
5752
5753        let query = "some_alt_metric{__database__=\"greptime_private\"}";
5754        let expected = String::from(
5755            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5756            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5757            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5758            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5759            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5760        );
5761
5762        indie_query_plan_compare(query, expected).await;
5763
5764        let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
5765        let expected = String::from(
5766            "Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(ms), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
5767            \n  Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5768            \n    SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5769            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5770            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5771            \n          Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5772            \n            Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-999, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5773            \n              TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5774            \n    SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5775            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5776            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5777            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5778            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-999, None) AND some_metric.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]\
5779            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]",
5780        );
5781
5782        indie_query_plan_compare(query, expected).await;
5783    }
5784
5785    #[tokio::test]
5786    async fn only_equals_is_supported_for_special_matcher() {
5787        let queries = &[
5788            "some_alt_metric{__schema__!=\"greptime_private\"}",
5789            "some_alt_metric{__schema__=~\"lalala\"}",
5790            "some_alt_metric{__database__!=\"greptime_private\"}",
5791            "some_alt_metric{__database__=~\"lalala\"}",
5792        ];
5793
5794        for query in queries {
5795            let prom_expr = parser::parse(query).unwrap();
5796            let eval_stmt = EvalStmt {
5797                expr: prom_expr,
5798                start: UNIX_EPOCH,
5799                end: UNIX_EPOCH
5800                    .checked_add(Duration::from_secs(100_000))
5801                    .unwrap(),
5802                interval: Duration::from_secs(5),
5803                lookback_delta: Duration::from_secs(1),
5804            };
5805
5806            let table_provider = build_test_table_provider(
5807                &[
5808                    (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5809                    (
5810                        "greptime_private".to_string(),
5811                        "some_alt_metric".to_string(),
5812                    ),
5813                ],
5814                1,
5815                1,
5816            )
5817            .await;
5818
5819            let plan =
5820                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5821                    .await;
5822            assert!(plan.is_err(), "query: {:?}", query);
5823        }
5824    }
5825
5826    #[tokio::test]
5827    async fn test_non_ms_precision() {
5828        let catalog_list = MemoryCatalogManager::with_default_setup();
5829        let columns = vec![
5830            ColumnSchema::new(
5831                "tag".to_string(),
5832                ConcreteDataType::string_datatype(),
5833                false,
5834            ),
5835            ColumnSchema::new(
5836                "timestamp".to_string(),
5837                ConcreteDataType::timestamp_nanosecond_datatype(),
5838                false,
5839            )
5840            .with_time_index(true),
5841            ColumnSchema::new(
5842                "field".to_string(),
5843                ConcreteDataType::float64_datatype(),
5844                true,
5845            ),
5846        ];
5847        let schema = Arc::new(Schema::new(columns));
5848        let table_meta = TableMetaBuilder::empty()
5849            .schema(schema)
5850            .primary_key_indices(vec![0])
5851            .value_indices(vec![2])
5852            .next_column_id(1024)
5853            .build()
5854            .unwrap();
5855        let table_info = TableInfoBuilder::default()
5856            .name("metrics".to_string())
5857            .meta(table_meta)
5858            .build()
5859            .unwrap();
5860        let table = EmptyTable::from_table_info(&table_info);
5861        assert!(
5862            catalog_list
5863                .register_table_sync(RegisterTableRequest {
5864                    catalog: DEFAULT_CATALOG_NAME.to_string(),
5865                    schema: DEFAULT_SCHEMA_NAME.to_string(),
5866                    table_name: "metrics".to_string(),
5867                    table_id: 1024,
5868                    table,
5869                })
5870                .is_ok()
5871        );
5872
5873        let plan = PromPlanner::stmt_to_plan(
5874            DfTableSourceProvider::new(
5875                catalog_list.clone(),
5876                false,
5877                QueryContext::arc(),
5878                DummyDecoder::arc(),
5879                true,
5880            ),
5881            &EvalStmt {
5882                expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
5883                start: UNIX_EPOCH,
5884                end: UNIX_EPOCH
5885                    .checked_add(Duration::from_secs(100_000))
5886                    .unwrap(),
5887                interval: Duration::from_secs(5),
5888                lookback_delta: Duration::from_secs(1),
5889            },
5890            &build_query_engine_state(),
5891        )
5892        .await
5893        .unwrap();
5894        assert_eq!(
5895            plan.display_indent_schema().to_string(),
5896            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
5897            \n  PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
5898            \n    Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
5899            \n      Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
5900            \n        Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
5901            \n          TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
5902        );
5903        let plan = PromPlanner::stmt_to_plan(
5904            DfTableSourceProvider::new(
5905                catalog_list.clone(),
5906                false,
5907                QueryContext::arc(),
5908                DummyDecoder::arc(),
5909                true,
5910            ),
5911            &EvalStmt {
5912                expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
5913                start: UNIX_EPOCH,
5914                end: UNIX_EPOCH
5915                    .checked_add(Duration::from_secs(100_000))
5916                    .unwrap(),
5917                interval: Duration::from_secs(5),
5918                lookback_delta: Duration::from_secs(1),
5919            },
5920            &build_query_engine_state(),
5921        )
5922        .await
5923        .unwrap();
5924        assert_eq!(
5925            plan.display_indent_schema().to_string(),
5926            "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(ms), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
5927            \n  Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(ms), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
5928            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(ms), timestamp_range:Dictionary(Int64, Timestamp(ms))]\
5929            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
5930            \n        PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
5931            \n          Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
5932            \n            Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-4999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
5933            \n              Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
5934            \n                TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
5935        );
5936    }
5937
5938    #[tokio::test]
5939    async fn test_nonexistent_label() {
5940        // template
5941        let mut eval_stmt = EvalStmt {
5942            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5943            start: UNIX_EPOCH,
5944            end: UNIX_EPOCH
5945                .checked_add(Duration::from_secs(100_000))
5946                .unwrap(),
5947            interval: Duration::from_secs(5),
5948            lookback_delta: Duration::from_secs(1),
5949        };
5950
5951        let case = r#"some_metric{nonexistent="hi"}"#;
5952        let prom_expr = parser::parse(case).unwrap();
5953        eval_stmt.expr = prom_expr;
5954        let table_provider = build_test_table_provider(
5955            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5956            3,
5957            3,
5958        )
5959        .await;
5960        // Should be ok
5961        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5962            .await
5963            .unwrap();
5964    }
5965
5966    #[tokio::test]
5967    async fn test_label_join() {
5968        let prom_expr = parser::parse(
5969            "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
5970        )
5971        .unwrap();
5972        let eval_stmt = EvalStmt {
5973            expr: prom_expr,
5974            start: UNIX_EPOCH,
5975            end: UNIX_EPOCH
5976                .checked_add(Duration::from_secs(100_000))
5977                .unwrap(),
5978            interval: Duration::from_secs(5),
5979            lookback_delta: Duration::from_secs(1),
5980        };
5981
5982        let table_provider =
5983            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
5984                .await;
5985        let plan =
5986            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5987                .await
5988                .unwrap();
5989
5990        let expected = r#"
5991Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
5992  Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
5993    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
5994      PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
5995        Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
5996          Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
5997            TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"#;
5998
5999        let ret = plan.display_indent_schema().to_string();
6000        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
6001    }
6002
6003    #[tokio::test]
6004    async fn test_label_replace() {
6005        let prom_expr = parser::parse(
6006            "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
6007        )
6008        .unwrap();
6009        let eval_stmt = EvalStmt {
6010            expr: prom_expr,
6011            start: UNIX_EPOCH,
6012            end: UNIX_EPOCH
6013                .checked_add(Duration::from_secs(100_000))
6014                .unwrap(),
6015            interval: Duration::from_secs(5),
6016            lookback_delta: Duration::from_secs(1),
6017        };
6018
6019        let table_provider =
6020            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
6021                .await;
6022        let plan =
6023            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6024                .await
6025                .unwrap();
6026
6027        let expected = r#"
6028Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6029  Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(ms), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
6030    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6031      PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6032        Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6033          Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-999, None) AND up.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]
6034            TableScan: up [tag_0:Utf8, timestamp:Timestamp(ms), field_0:Float64;N]"#;
6035
6036        let ret = plan.display_indent_schema().to_string();
6037        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
6038    }
6039
6040    #[tokio::test]
6041    async fn test_matchers_to_expr() {
6042        let mut eval_stmt = EvalStmt {
6043            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6044            start: UNIX_EPOCH,
6045            end: UNIX_EPOCH
6046                .checked_add(Duration::from_secs(100_000))
6047                .unwrap(),
6048            interval: Duration::from_secs(5),
6049            lookback_delta: Duration::from_secs(1),
6050        };
6051        let case =
6052            r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
6053
6054        let prom_expr = parser::parse(case).unwrap();
6055        eval_stmt.expr = prom_expr;
6056        let table_provider = build_test_table_provider(
6057            &[(
6058                DEFAULT_SCHEMA_NAME.to_string(),
6059                "prometheus_tsdb_head_series".to_string(),
6060            )],
6061            3,
6062            3,
6063        )
6064        .await;
6065        let plan =
6066            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6067                .await
6068                .unwrap();
6069        let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6070        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
6071        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6072        \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6073        \n        Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6074        \n          Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
6075        \n            TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(ms), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
6076        assert_eq!(plan.display_indent_schema().to_string(), expected);
6077    }
6078
6079    #[tokio::test]
6080    async fn test_topk_expr() {
6081        let mut eval_stmt = EvalStmt {
6082            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6083            start: UNIX_EPOCH,
6084            end: UNIX_EPOCH
6085                .checked_add(Duration::from_secs(100_000))
6086                .unwrap(),
6087            interval: Duration::from_secs(5),
6088            lookback_delta: Duration::from_secs(1),
6089        };
6090        let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6091
6092        let prom_expr = parser::parse(case).unwrap();
6093        eval_stmt.expr = prom_expr;
6094        let table_provider = build_test_table_provider_with_fields(
6095            &[
6096                (
6097                    DEFAULT_SCHEMA_NAME.to_string(),
6098                    "prometheus_tsdb_head_series".to_string(),
6099                ),
6100                (
6101                    DEFAULT_SCHEMA_NAME.to_string(),
6102                    "http_server_requests_seconds_count".to_string(),
6103                ),
6104            ],
6105            &["ip"],
6106        )
6107        .await;
6108
6109        let plan =
6110            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6111                .await
6112                .unwrap();
6113        let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(ms)]\
6114        \n  Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6115        \n    Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6116        \n      WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
6117        \n        Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6118        \n          Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6119        \n            PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6120        \n              PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6121        \n                Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6122        \n                  Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6123        \n                    TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6124
6125        assert_eq!(plan.display_indent_schema().to_string(), expected);
6126    }
6127
6128    #[tokio::test]
6129    async fn test_count_values_expr() {
6130        let mut eval_stmt = EvalStmt {
6131            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6132            start: UNIX_EPOCH,
6133            end: UNIX_EPOCH
6134                .checked_add(Duration::from_secs(100_000))
6135                .unwrap(),
6136            interval: Duration::from_secs(5),
6137            lookback_delta: Duration::from_secs(1),
6138        };
6139        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6140
6141        let prom_expr = parser::parse(case).unwrap();
6142        eval_stmt.expr = prom_expr;
6143        let table_provider = build_test_table_provider_with_fields(
6144            &[
6145                (
6146                    DEFAULT_SCHEMA_NAME.to_string(),
6147                    "prometheus_tsdb_head_series".to_string(),
6148                ),
6149                (
6150                    DEFAULT_SCHEMA_NAME.to_string(),
6151                    "http_server_requests_seconds_count".to_string(),
6152                ),
6153            ],
6154            &["ip"],
6155        )
6156        .await;
6157
6158        let plan =
6159            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6160                .await
6161                .unwrap();
6162        let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N]\
6163        \n  Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]\
6164        \n    Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]\
6165        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
6166        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6167        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6168        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6169        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6170        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6171
6172        assert_eq!(plan.display_indent_schema().to_string(), expected);
6173    }
6174
6175    #[tokio::test]
6176    async fn test_value_alias() {
6177        let mut eval_stmt = EvalStmt {
6178            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6179            start: UNIX_EPOCH,
6180            end: UNIX_EPOCH
6181                .checked_add(Duration::from_secs(100_000))
6182                .unwrap(),
6183            interval: Duration::from_secs(5),
6184            lookback_delta: Duration::from_secs(1),
6185        };
6186        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6187
6188        let prom_expr = parser::parse(case).unwrap();
6189        eval_stmt.expr = prom_expr;
6190        eval_stmt = QueryLanguageParser::apply_alias_extension(eval_stmt, "my_series");
6191        let table_provider = build_test_table_provider_with_fields(
6192            &[
6193                (
6194                    DEFAULT_SCHEMA_NAME.to_string(),
6195                    "prometheus_tsdb_head_series".to_string(),
6196                ),
6197                (
6198                    DEFAULT_SCHEMA_NAME.to_string(),
6199                    "http_server_requests_seconds_count".to_string(),
6200                ),
6201            ],
6202            &["ip"],
6203        )
6204        .await;
6205
6206        let plan =
6207            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6208                .await
6209                .unwrap();
6210        let expected = r#"
6211Projection: count(prometheus_tsdb_head_series.greptime_value) AS my_series, prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [my_series:Int64, ip:Utf8, greptime_timestamp:Timestamp(ms)]
6212  Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N]
6213    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]
6214      Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(ms), series:Float64;N, greptime_value:Float64;N]
6215        Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
6216          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6217            PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6218              Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6219                Filter: prometheus_tsdb_head_series.ip ~ Utf8("^(?:(10.0.160.237:8080|10.0.160.237:9090))$") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6220                  TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]"#;
6221        assert_eq!(format!("\n{}", plan.display_indent_schema()), expected);
6222    }
6223
6224    #[tokio::test]
6225    async fn test_quantile_expr() {
6226        let mut eval_stmt = EvalStmt {
6227            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6228            start: UNIX_EPOCH,
6229            end: UNIX_EPOCH
6230                .checked_add(Duration::from_secs(100_000))
6231                .unwrap(),
6232            interval: Duration::from_secs(5),
6233            lookback_delta: Duration::from_secs(1),
6234        };
6235        let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6236
6237        let prom_expr = parser::parse(case).unwrap();
6238        eval_stmt.expr = prom_expr;
6239        let table_provider = build_test_table_provider_with_fields(
6240            &[
6241                (
6242                    DEFAULT_SCHEMA_NAME.to_string(),
6243                    "prometheus_tsdb_head_series".to_string(),
6244                ),
6245                (
6246                    DEFAULT_SCHEMA_NAME.to_string(),
6247                    "http_server_requests_seconds_count".to_string(),
6248                ),
6249            ],
6250            &["ip"],
6251        )
6252        .await;
6253
6254        let plan =
6255            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6256                .await
6257                .unwrap();
6258        let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(ms), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6259        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(ms), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6260        \n    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6261        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(ms), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6262        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6263        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6264        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6265        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-999, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100000000, None) [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]\
6266        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]";
6267
6268        assert_eq!(plan.display_indent_schema().to_string(), expected);
6269    }
6270
6271    #[tokio::test]
6272    async fn test_or_not_exists_table_label() {
6273        let mut eval_stmt = EvalStmt {
6274            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6275            start: UNIX_EPOCH,
6276            end: UNIX_EPOCH
6277                .checked_add(Duration::from_secs(100_000))
6278                .unwrap(),
6279            interval: Duration::from_secs(5),
6280            lookback_delta: Duration::from_secs(1),
6281        };
6282        let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
6283
6284        let prom_expr = parser::parse(case).unwrap();
6285        eval_stmt.expr = prom_expr;
6286        let table_provider = build_test_table_provider_with_fields(
6287            &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
6288            &["job"],
6289        )
6290        .await;
6291
6292        let plan =
6293            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6294                .await
6295                .unwrap();
6296        let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6297  SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6298    Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(ms), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6299      Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(ms), sum(metric_exists.greptime_value):Float64;N]
6300        Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(ms), sum(metric_exists.greptime_value):Float64;N]
6301          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6302            PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6303              Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6304                Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-999, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100000000, None) [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6305                  TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(ms), greptime_value:Float64;N]
6306  SubqueryAlias:  [greptime_timestamp:Timestamp(ms), job:Utf8;N, sum(.value):Float64;N]
6307    Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(ms), job:Utf8;N, sum(.value):Float64;N]
6308      Sort: .time ASC NULLS LAST [time:Timestamp(ms), sum(.value):Float64;N]
6309        Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(ms), sum(.value):Float64;N]
6310          EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(ms), value:Float64;N]
6311            TableScan: dummy [time:Timestamp(ms), value:Float64;N]"#;
6312
6313        assert_eq!(plan.display_indent_schema().to_string(), expected);
6314    }
6315
6316    #[tokio::test]
6317    async fn test_histogram_quantile_missing_le_column() {
6318        let mut eval_stmt = EvalStmt {
6319            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6320            start: UNIX_EPOCH,
6321            end: UNIX_EPOCH
6322                .checked_add(Duration::from_secs(100_000))
6323                .unwrap(),
6324            interval: Duration::from_secs(5),
6325            lookback_delta: Duration::from_secs(1),
6326        };
6327
6328        // Test case: histogram_quantile with a table that doesn't have 'le' column
6329        let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
6330
6331        let prom_expr = parser::parse(case).unwrap();
6332        eval_stmt.expr = prom_expr;
6333
6334        // Create a table provider with a table that doesn't have 'le' column
6335        let table_provider = build_test_table_provider_with_fields(
6336            &[(
6337                DEFAULT_SCHEMA_NAME.to_string(),
6338                "non_existent_histogram_bucket".to_string(),
6339            )],
6340            &["pod", "instance"], // Note: no 'le' column
6341        )
6342        .await;
6343
6344        // Should return empty result instead of error
6345        let result =
6346            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6347                .await;
6348
6349        // This should succeed now (returning empty result) instead of failing with "Cannot find column le"
6350        assert!(
6351            result.is_ok(),
6352            "Expected successful plan creation with empty result, but got error: {:?}",
6353            result.err()
6354        );
6355
6356        // Verify that the result is an EmptyRelation
6357        let plan = result.unwrap();
6358        match plan {
6359            LogicalPlan::EmptyRelation(_) => {
6360                // This is what we expect
6361            }
6362            _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
6363        }
6364    }
6365}