query/promql/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::greptime_value;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::grouping::grouping_udaf;
32use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
33use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
34use datafusion::functions_aggregate::sum::sum_udaf;
35use datafusion::functions_aggregate::variance::var_pop_udaf;
36use datafusion::functions_window::row_number::RowNumber;
37use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
38use datafusion::logical_expr::expr_rewriter::normalize_cols;
39use datafusion::logical_expr::{
40    BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
41    ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
42};
43use datafusion::prelude as df_prelude;
44use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
45use datafusion::scalar::ScalarValue;
46use datafusion::sql::TableReference;
47use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
48use datafusion_common::{DFSchema, NullEquality};
49use datafusion_expr::expr::WindowFunctionParams;
50use datafusion_expr::utils::conjunction;
51use datafusion_expr::{
52    ExprSchemable, Literal, Projection, SortExpr, TableScan, TableSource, col, lit,
53};
54use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
55use datatypes::data_type::ConcreteDataType;
56use itertools::Itertools;
57use once_cell::sync::Lazy;
58use promql::extension_plan::{
59    Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
60    ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
61};
62use promql::functions::{
63    AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
64    Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
65    QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
66    quantile_udaf,
67};
68use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
69use promql_parser::parser::token::TokenType;
70use promql_parser::parser::{
71    AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
72    Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
73    Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
74    VectorSelector, token,
75};
76use regex::{self, Regex};
77use snafu::{OptionExt, ResultExt, ensure};
78use store_api::metric_engine_consts::{
79    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
80    METRIC_ENGINE_NAME, is_metric_engine_internal_column,
81};
82use table::table::adapter::DfTableProviderAdapter;
83
84use crate::promql::error::{
85    CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
86    ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
87    InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
88    MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
89    Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
90    UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
91    UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
92    ZeroRangeSelectorSnafu,
93};
94use crate::query_engine::QueryEngineState;
95
96/// `time()` function in PromQL.
97const SPECIAL_TIME_FUNCTION: &str = "time";
98/// `scalar()` function in PromQL.
99const SCALAR_FUNCTION: &str = "scalar";
100/// `absent()` function in PromQL
101const SPECIAL_ABSENT_FUNCTION: &str = "absent";
102/// `histogram_quantile` function in PromQL
103const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
104/// `vector` function in PromQL
105const SPECIAL_VECTOR_FUNCTION: &str = "vector";
106/// `le` column for conventional histogram.
107const LE_COLUMN_NAME: &str = "le";
108
109/// Static regex for validating label names according to Prometheus specification.
110/// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
111static LABEL_NAME_REGEX: Lazy<Regex> =
112    Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
113
114const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
115
116/// default value column name for empty metric
117const DEFAULT_FIELD_COLUMN: &str = "value";
118
119/// Special modifier to project field columns under multi-field mode
120const FIELD_COLUMN_MATCHER: &str = "__field__";
121
122/// Special modifier for cross schema query
123const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
124const DB_COLUMN_MATCHER: &str = "__database__";
125
126/// Threshold for scatter scan mode
127const MAX_SCATTER_POINTS: i64 = 400;
128
129/// Interval 1 hour in millisecond
130const INTERVAL_1H: i64 = 60 * 60 * 1000;
131
132#[derive(Default, Debug, Clone)]
133struct PromPlannerContext {
134    // query parameters
135    start: Millisecond,
136    end: Millisecond,
137    interval: Millisecond,
138    lookback_delta: Millisecond,
139
140    // planner states
141    table_name: Option<String>,
142    time_index_column: Option<String>,
143    field_columns: Vec<String>,
144    tag_columns: Vec<String>,
145    /// Use metric engine internal series identifier column (`__tsid`) as series key.
146    ///
147    /// This is enabled only when the underlying scan can provide `__tsid` (`UInt64`). The planner
148    /// uses it internally (e.g. as the series key for [`SeriesDivide`]) and strips it from the
149    /// final output.
150    use_tsid: bool,
151    /// The matcher for field columns `__field__`.
152    field_column_matcher: Option<Vec<Matcher>>,
153    /// The matcher for selectors (normal matchers).
154    selector_matcher: Vec<Matcher>,
155    schema_name: Option<String>,
156    /// The range in millisecond of range selector. None if there is no range selector.
157    range: Option<Millisecond>,
158}
159
160impl PromPlannerContext {
161    fn from_eval_stmt(stmt: &EvalStmt) -> Self {
162        Self {
163            start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
164            end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
165            interval: stmt.interval.as_millis() as _,
166            lookback_delta: stmt.lookback_delta.as_millis() as _,
167            ..Default::default()
168        }
169    }
170
171    /// Reset all planner states
172    fn reset(&mut self) {
173        self.table_name = None;
174        self.time_index_column = None;
175        self.field_columns = vec![];
176        self.tag_columns = vec![];
177        self.use_tsid = false;
178        self.field_column_matcher = None;
179        self.selector_matcher.clear();
180        self.schema_name = None;
181        self.range = None;
182    }
183
184    /// Reset table name and schema to empty
185    fn reset_table_name_and_schema(&mut self) {
186        self.table_name = Some(String::new());
187        self.schema_name = None;
188        self.use_tsid = false;
189    }
190
191    /// Check if `le` is present in tag columns
192    fn has_le_tag(&self) -> bool {
193        self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
194    }
195}
196
197pub struct PromPlanner {
198    table_provider: DfTableSourceProvider,
199    ctx: PromPlannerContext,
200}
201
202impl PromPlanner {
203    pub async fn stmt_to_plan_with_alias(
204        table_provider: DfTableSourceProvider,
205        stmt: &EvalStmt,
206        alias: Option<String>,
207        query_engine_state: &QueryEngineState,
208    ) -> Result<LogicalPlan> {
209        let mut planner = Self {
210            table_provider,
211            ctx: PromPlannerContext::from_eval_stmt(stmt),
212        };
213
214        let plan = planner
215            .prom_expr_to_plan(&stmt.expr, query_engine_state)
216            .await?;
217
218        // Apply alias if provided
219        let plan = if let Some(alias_name) = alias {
220            planner.apply_alias_projection(plan, alias_name)?
221        } else {
222            plan
223        };
224
225        // Never leak internal series identifier to output.
226        planner.strip_tsid_column(plan)
227    }
228
229    #[cfg(test)]
230    async fn stmt_to_plan(
231        table_provider: DfTableSourceProvider,
232        stmt: &EvalStmt,
233        query_engine_state: &QueryEngineState,
234    ) -> Result<LogicalPlan> {
235        Self::stmt_to_plan_with_alias(table_provider, stmt, None, query_engine_state).await
236    }
237
238    pub async fn prom_expr_to_plan(
239        &mut self,
240        prom_expr: &PromExpr,
241        query_engine_state: &QueryEngineState,
242    ) -> Result<LogicalPlan> {
243        self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
244            .await
245    }
246
247    /**
248    Converts a PromQL expression to a logical plan.
249
250    NOTE:
251        The `timestamp_fn` indicates whether the PromQL `timestamp()` function is being evaluated in the current context.
252        If `true`, the planner generates a logical plan that projects the timestamp (time index) column
253        as the value column for each input row, implementing the PromQL `timestamp()` function semantics.
254        If `false`, the planner generates the standard logical plan for the given PromQL expression.
255    */
256    #[async_recursion]
257    async fn prom_expr_to_plan_inner(
258        &mut self,
259        prom_expr: &PromExpr,
260        timestamp_fn: bool,
261        query_engine_state: &QueryEngineState,
262    ) -> Result<LogicalPlan> {
263        let res = match prom_expr {
264            PromExpr::Aggregate(expr) => {
265                self.prom_aggr_expr_to_plan(query_engine_state, expr)
266                    .await?
267            }
268            PromExpr::Unary(expr) => {
269                self.prom_unary_expr_to_plan(query_engine_state, expr)
270                    .await?
271            }
272            PromExpr::Binary(expr) => {
273                self.prom_binary_expr_to_plan(query_engine_state, expr)
274                    .await?
275            }
276            PromExpr::Paren(ParenExpr { expr }) => {
277                self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
278                    .await?
279            }
280            PromExpr::Subquery(expr) => {
281                self.prom_subquery_expr_to_plan(query_engine_state, expr)
282                    .await?
283            }
284            PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
285            PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
286            PromExpr::VectorSelector(selector) => {
287                self.prom_vector_selector_to_plan(selector, timestamp_fn)
288                    .await?
289            }
290            PromExpr::MatrixSelector(selector) => {
291                self.prom_matrix_selector_to_plan(selector).await?
292            }
293            PromExpr::Call(expr) => {
294                self.prom_call_expr_to_plan(query_engine_state, expr)
295                    .await?
296            }
297            PromExpr::Extension(expr) => {
298                self.prom_ext_expr_to_plan(query_engine_state, expr).await?
299            }
300        };
301
302        Ok(res)
303    }
304
305    async fn prom_subquery_expr_to_plan(
306        &mut self,
307        query_engine_state: &QueryEngineState,
308        subquery_expr: &SubqueryExpr,
309    ) -> Result<LogicalPlan> {
310        let SubqueryExpr {
311            expr, range, step, ..
312        } = subquery_expr;
313
314        let current_interval = self.ctx.interval;
315        if let Some(step) = step {
316            self.ctx.interval = step.as_millis() as _;
317        }
318        let current_start = self.ctx.start;
319        self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
320        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
321        self.ctx.interval = current_interval;
322        self.ctx.start = current_start;
323
324        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
325        let range_ms = range.as_millis() as _;
326        self.ctx.range = Some(range_ms);
327
328        let manipulate = RangeManipulate::new(
329            self.ctx.start,
330            self.ctx.end,
331            self.ctx.interval,
332            range_ms,
333            self.ctx
334                .time_index_column
335                .clone()
336                .expect("time index should be set in `setup_context`"),
337            self.ctx.field_columns.clone(),
338            input,
339        )
340        .context(DataFusionPlanningSnafu)?;
341
342        Ok(LogicalPlan::Extension(Extension {
343            node: Arc::new(manipulate),
344        }))
345    }
346
347    async fn prom_aggr_expr_to_plan(
348        &mut self,
349        query_engine_state: &QueryEngineState,
350        aggr_expr: &AggregateExpr,
351    ) -> Result<LogicalPlan> {
352        let AggregateExpr {
353            op,
354            expr,
355            modifier,
356            param,
357        } = aggr_expr;
358
359        let mut input = self.prom_expr_to_plan(expr, query_engine_state).await?;
360        let input_has_tsid = input.schema().fields().iter().any(|field| {
361            field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
362                && field.data_type() == &ArrowDataType::UInt64
363        });
364
365        // `__tsid` based scan projection may prune tag columns. Ensure tags referenced in
366        // aggregation modifiers (`by`/`without`) are available before planning group keys.
367        let required_group_tags = match modifier {
368            None => BTreeSet::new(),
369            Some(LabelModifier::Include(labels)) => labels
370                .labels
371                .iter()
372                .filter(|label| !is_metric_engine_internal_column(label.as_str()))
373                .cloned()
374                .collect(),
375            Some(LabelModifier::Exclude(labels)) => {
376                let mut all_tags = self.collect_row_key_tag_columns_from_plan(&input)?;
377                for label in &labels.labels {
378                    let _ = all_tags.remove(label);
379                }
380                all_tags
381            }
382        };
383
384        if !required_group_tags.is_empty()
385            && required_group_tags
386                .iter()
387                .any(|tag| Self::find_case_sensitive_column(input.schema(), tag.as_str()).is_none())
388        {
389            input = self.ensure_tag_columns_available(input, &required_group_tags)?;
390            self.refresh_tag_columns_from_schema(input.schema());
391        }
392
393        match (*op).id() {
394            token::T_TOPK | token::T_BOTTOMK => {
395                self.prom_topk_bottomk_to_plan(aggr_expr, input).await
396            }
397            _ => {
398                // When `__tsid` is available, tag columns may have been pruned from the input plan.
399                // For `keep_tsid` decision we should compare against the full row-key label set,
400                // otherwise we may incorrectly treat label-reducing aggregates as preserving labels.
401                let input_tag_columns = if input_has_tsid {
402                    self.collect_row_key_tag_columns_from_plan(&input)?
403                        .into_iter()
404                        .collect::<Vec<_>>()
405                } else {
406                    self.ctx.tag_columns.clone()
407                };
408                // calculate columns to group by
409                // Need to append time index column into group by columns
410                let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
411                // convert op and value columns to aggregate exprs
412                let (mut aggr_exprs, prev_field_exprs) =
413                    self.create_aggregate_exprs(*op, param, &input)?;
414
415                let keep_tsid = op.id() != token::T_COUNT_VALUES
416                    && input_has_tsid
417                    && input_tag_columns.iter().collect::<HashSet<_>>()
418                        == self.ctx.tag_columns.iter().collect::<HashSet<_>>();
419
420                if keep_tsid {
421                    aggr_exprs.push(
422                        first_value(
423                            DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)),
424                            vec![],
425                        )
426                        .alias(DATA_SCHEMA_TSID_COLUMN_NAME),
427                    );
428                }
429                self.ctx.use_tsid = keep_tsid;
430
431                // create plan
432                let builder = LogicalPlanBuilder::from(input);
433                let builder = if op.id() == token::T_COUNT_VALUES {
434                    let label = Self::get_param_value_as_str(*op, param)?;
435                    // `count_values` must be grouped by fields,
436                    // and project the fields to the new label.
437                    group_exprs.extend(prev_field_exprs.clone());
438                    let project_fields = self
439                        .create_field_column_exprs()?
440                        .into_iter()
441                        .chain(self.create_tag_column_exprs()?)
442                        .chain(Some(self.create_time_index_column_expr()?))
443                        .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
444
445                    builder
446                        .aggregate(group_exprs.clone(), aggr_exprs)
447                        .context(DataFusionPlanningSnafu)?
448                        .project(project_fields)
449                        .context(DataFusionPlanningSnafu)?
450                } else {
451                    builder
452                        .aggregate(group_exprs.clone(), aggr_exprs)
453                        .context(DataFusionPlanningSnafu)?
454                };
455
456                let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
457
458                builder
459                    .sort(sort_expr)
460                    .context(DataFusionPlanningSnafu)?
461                    .build()
462                    .context(DataFusionPlanningSnafu)
463            }
464        }
465    }
466
467    /// Create logical plan for PromQL topk and bottomk expr.
468    async fn prom_topk_bottomk_to_plan(
469        &mut self,
470        aggr_expr: &AggregateExpr,
471        input: LogicalPlan,
472    ) -> Result<LogicalPlan> {
473        let AggregateExpr {
474            op,
475            param,
476            modifier,
477            ..
478        } = aggr_expr;
479
480        let input_has_tsid = input.schema().fields().iter().any(|field| {
481            field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
482                && field.data_type() == &ArrowDataType::UInt64
483        });
484        self.ctx.use_tsid = input_has_tsid;
485
486        let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
487
488        let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
489
490        // convert op and value columns to window exprs.
491        let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
492
493        let rank_columns: Vec<_> = window_exprs
494            .iter()
495            .map(|expr| expr.schema_name().to_string())
496            .collect();
497
498        // Create ranks filter with `Operator::Or`.
499        // Safety: at least one rank column
500        let filter: DfExpr = rank_columns
501            .iter()
502            .fold(None, |expr, rank| {
503                let predicate = DfExpr::BinaryExpr(BinaryExpr {
504                    left: Box::new(col(rank)),
505                    op: Operator::LtEq,
506                    right: Box::new(val.clone()),
507                });
508
509                match expr {
510                    None => Some(predicate),
511                    Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
512                        left: Box::new(expr),
513                        op: Operator::Or,
514                        right: Box::new(predicate),
515                    })),
516                }
517            })
518            .unwrap();
519
520        let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
521
522        let mut new_group_exprs = group_exprs.clone();
523        // Order by ranks
524        new_group_exprs.extend(rank_columns);
525
526        let group_sort_expr = new_group_exprs
527            .into_iter()
528            .map(|expr| expr.sort(true, false));
529
530        let project_fields = self
531            .create_field_column_exprs()?
532            .into_iter()
533            .chain(self.create_tag_column_exprs()?)
534            .chain(
535                self.ctx
536                    .use_tsid
537                    .then_some(DfExpr::Column(Column::from_name(
538                        DATA_SCHEMA_TSID_COLUMN_NAME,
539                    )))
540                    .into_iter(),
541            )
542            .chain(Some(self.create_time_index_column_expr()?));
543
544        LogicalPlanBuilder::from(input)
545            .window(window_exprs)
546            .context(DataFusionPlanningSnafu)?
547            .filter(filter)
548            .context(DataFusionPlanningSnafu)?
549            .sort(group_sort_expr)
550            .context(DataFusionPlanningSnafu)?
551            .project(project_fields)
552            .context(DataFusionPlanningSnafu)?
553            .build()
554            .context(DataFusionPlanningSnafu)
555    }
556
557    async fn prom_unary_expr_to_plan(
558        &mut self,
559        query_engine_state: &QueryEngineState,
560        unary_expr: &UnaryExpr,
561    ) -> Result<LogicalPlan> {
562        let UnaryExpr { expr } = unary_expr;
563        // Unary Expr in PromQL implys the `-` operator
564        let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
565        self.projection_for_each_field_column(input, |col| {
566            Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
567        })
568    }
569
570    async fn prom_binary_expr_to_plan(
571        &mut self,
572        query_engine_state: &QueryEngineState,
573        binary_expr: &PromBinaryExpr,
574    ) -> Result<LogicalPlan> {
575        let PromBinaryExpr {
576            lhs,
577            rhs,
578            op,
579            modifier,
580        } = binary_expr;
581
582        // if set to true, comparison operator will return 0/1 (for true/false) instead of
583        // filter on the result column
584        let should_return_bool = if let Some(m) = modifier {
585            m.return_bool
586        } else {
587            false
588        };
589        let is_comparison_op = Self::is_token_a_comparison_op(*op);
590
591        // we should build a filter plan here if the op is comparison op and need not
592        // to return 0/1. Otherwise, we should build a projection plan
593        match (
594            Self::try_build_literal_expr(lhs),
595            Self::try_build_literal_expr(rhs),
596        ) {
597            (Some(lhs), Some(rhs)) => {
598                self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
599                self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
600                self.ctx.reset_table_name_and_schema();
601                let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
602                let mut field_expr = field_expr_builder(lhs, rhs)?;
603
604                if is_comparison_op && should_return_bool {
605                    field_expr = DfExpr::Cast(Cast {
606                        expr: Box::new(field_expr),
607                        data_type: ArrowDataType::Float64,
608                    });
609                }
610
611                Ok(LogicalPlan::Extension(Extension {
612                    node: Arc::new(
613                        EmptyMetric::new(
614                            self.ctx.start,
615                            self.ctx.end,
616                            self.ctx.interval,
617                            SPECIAL_TIME_FUNCTION.to_string(),
618                            DEFAULT_FIELD_COLUMN.to_string(),
619                            Some(field_expr),
620                        )
621                        .context(DataFusionPlanningSnafu)?,
622                    ),
623                }))
624            }
625            // lhs is a literal, rhs is a column
626            (Some(mut expr), None) => {
627                let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
628                // check if the literal is a special time expr
629                if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
630                    expr = time_expr
631                }
632                let bin_expr_builder = |col: &String| {
633                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
634                    let mut binary_expr =
635                        binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
636
637                    if is_comparison_op && should_return_bool {
638                        binary_expr = DfExpr::Cast(Cast {
639                            expr: Box::new(binary_expr),
640                            data_type: ArrowDataType::Float64,
641                        });
642                    }
643                    Ok(binary_expr)
644                };
645                if is_comparison_op && !should_return_bool {
646                    self.filter_on_field_column(input, bin_expr_builder)
647                } else {
648                    self.projection_for_each_field_column(input, bin_expr_builder)
649                }
650            }
651            // lhs is a column, rhs is a literal
652            (None, Some(mut expr)) => {
653                let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
654                // check if the literal is a special time expr
655                if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
656                    expr = time_expr
657                }
658                let bin_expr_builder = |col: &String| {
659                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
660                    let mut binary_expr =
661                        binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
662
663                    if is_comparison_op && should_return_bool {
664                        binary_expr = DfExpr::Cast(Cast {
665                            expr: Box::new(binary_expr),
666                            data_type: ArrowDataType::Float64,
667                        });
668                    }
669                    Ok(binary_expr)
670                };
671                if is_comparison_op && !should_return_bool {
672                    self.filter_on_field_column(input, bin_expr_builder)
673                } else {
674                    self.projection_for_each_field_column(input, bin_expr_builder)
675                }
676            }
677            // both are columns. join them on time index
678            (None, None) => {
679                let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
680                let left_field_columns = self.ctx.field_columns.clone();
681                let left_time_index_column = self.ctx.time_index_column.clone();
682                let mut left_table_ref = self
683                    .table_ref()
684                    .unwrap_or_else(|_| TableReference::bare(""));
685                let left_context = self.ctx.clone();
686
687                let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
688                let right_field_columns = self.ctx.field_columns.clone();
689                let right_time_index_column = self.ctx.time_index_column.clone();
690                let mut right_table_ref = self
691                    .table_ref()
692                    .unwrap_or_else(|_| TableReference::bare(""));
693                let right_context = self.ctx.clone();
694
695                // TODO(ruihang): avoid join if left and right are the same table
696
697                // set op has "special" join semantics
698                if Self::is_token_a_set_op(*op) {
699                    return self.set_op_on_non_field_columns(
700                        left_input,
701                        right_input,
702                        left_context,
703                        right_context,
704                        *op,
705                        modifier,
706                    );
707                }
708
709                // normal join
710                if left_table_ref == right_table_ref {
711                    // rename table references to avoid ambiguity
712                    left_table_ref = TableReference::bare("lhs");
713                    right_table_ref = TableReference::bare("rhs");
714                    // `self.ctx` have ctx in right plan, if right plan have no tag,
715                    // we use left plan ctx as the ctx for subsequent calculations,
716                    // to avoid case like `host + scalar(...)`
717                    // we need preserve tag column on `host` table in subsequent projection,
718                    // which only show in left plan ctx.
719                    if self.ctx.tag_columns.is_empty() {
720                        self.ctx = left_context.clone();
721                        self.ctx.table_name = Some("lhs".to_string());
722                    } else {
723                        self.ctx.table_name = Some("rhs".to_string());
724                    }
725                }
726                let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
727
728                let join_plan = self.join_on_non_field_columns(
729                    left_input,
730                    right_input,
731                    left_table_ref.clone(),
732                    right_table_ref.clone(),
733                    left_time_index_column,
734                    right_time_index_column,
735                    // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
736                    // under this case we only join on time index
737                    left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
738                    modifier,
739                )?;
740                let join_plan_schema = join_plan.schema().clone();
741
742                let bin_expr_builder = |_: &String| {
743                    let (left_col_name, right_col_name) = field_columns.next().unwrap();
744                    let left_col = join_plan_schema
745                        .qualified_field_with_name(Some(&left_table_ref), left_col_name)
746                        .context(DataFusionPlanningSnafu)?
747                        .into();
748                    let right_col = join_plan_schema
749                        .qualified_field_with_name(Some(&right_table_ref), right_col_name)
750                        .context(DataFusionPlanningSnafu)?
751                        .into();
752
753                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
754                    let mut binary_expr =
755                        binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
756                    if is_comparison_op && should_return_bool {
757                        binary_expr = DfExpr::Cast(Cast {
758                            expr: Box::new(binary_expr),
759                            data_type: ArrowDataType::Float64,
760                        });
761                    }
762                    Ok(binary_expr)
763                };
764                if is_comparison_op && !should_return_bool {
765                    self.filter_on_field_column(join_plan, bin_expr_builder)
766                } else {
767                    self.projection_for_each_field_column(join_plan, bin_expr_builder)
768                }
769            }
770        }
771    }
772
773    fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
774        let NumberLiteral { val } = number_literal;
775        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
776        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
777        self.ctx.reset_table_name_and_schema();
778        let literal_expr = df_prelude::lit(*val);
779
780        let plan = LogicalPlan::Extension(Extension {
781            node: Arc::new(
782                EmptyMetric::new(
783                    self.ctx.start,
784                    self.ctx.end,
785                    self.ctx.interval,
786                    SPECIAL_TIME_FUNCTION.to_string(),
787                    DEFAULT_FIELD_COLUMN.to_string(),
788                    Some(literal_expr),
789                )
790                .context(DataFusionPlanningSnafu)?,
791            ),
792        });
793        Ok(plan)
794    }
795
796    fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
797        let StringLiteral { val } = string_literal;
798        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
799        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
800        self.ctx.reset_table_name_and_schema();
801        let literal_expr = df_prelude::lit(val.clone());
802
803        let plan = LogicalPlan::Extension(Extension {
804            node: Arc::new(
805                EmptyMetric::new(
806                    self.ctx.start,
807                    self.ctx.end,
808                    self.ctx.interval,
809                    SPECIAL_TIME_FUNCTION.to_string(),
810                    DEFAULT_FIELD_COLUMN.to_string(),
811                    Some(literal_expr),
812                )
813                .context(DataFusionPlanningSnafu)?,
814            ),
815        });
816        Ok(plan)
817    }
818
819    async fn prom_vector_selector_to_plan(
820        &mut self,
821        vector_selector: &VectorSelector,
822        timestamp_fn: bool,
823    ) -> Result<LogicalPlan> {
824        let VectorSelector {
825            name,
826            offset,
827            matchers,
828            at: _,
829        } = vector_selector;
830        let matchers = self.preprocess_label_matchers(matchers, name)?;
831        if let Some(empty_plan) = self.setup_context().await? {
832            return Ok(empty_plan);
833        }
834        let normalize = self
835            .selector_to_series_normalize_plan(offset, matchers, false)
836            .await?;
837
838        let normalize = if timestamp_fn {
839            // If evaluating the PromQL `timestamp()` function, project the time index column as the value column
840            // before wrapping with [`InstantManipulate`], so the output matches PromQL's `timestamp()` semantics.
841            self.create_timestamp_func_plan(normalize)?
842        } else {
843            normalize
844        };
845
846        let manipulate = InstantManipulate::new(
847            self.ctx.start,
848            self.ctx.end,
849            self.ctx.lookback_delta,
850            self.ctx.interval,
851            self.ctx
852                .time_index_column
853                .clone()
854                .expect("time index should be set in `setup_context`"),
855            self.ctx.field_columns.first().cloned(),
856            normalize,
857        );
858        Ok(LogicalPlan::Extension(Extension {
859            node: Arc::new(manipulate),
860        }))
861    }
862
863    /// Builds a projection plan for the PromQL `timestamp()` function.
864    /// Projects the time index column as the value column for each row.
865    ///
866    /// # Arguments
867    /// * `normalize` - Input [`LogicalPlan`] for the normalized series.
868    ///
869    /// # Returns
870    /// Returns a [`Result<LogicalPlan>`] where the resulting logical plan projects the timestamp
871    /// column as the value column, along with the original tag and time index columns.
872    ///
873    /// # Timestamp vs. Time Function
874    ///
875    /// - **Timestamp Function (`timestamp()`)**: In PromQL, the `timestamp()` function returns the
876    ///   timestamp (time index) of each sample as the value column.
877    ///
878    /// - **Time Function (`time()`)**: The `time()` function returns the evaluation time of the query
879    ///   as a scalar value.
880    ///
881    /// # Side Effects
882    /// Updates the planner context's field columns to the timestamp column name.
883    ///
884    fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
885        let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
886            .alias(DEFAULT_FIELD_COLUMN);
887        self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
888        let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
889        project_exprs.push(self.create_time_index_column_expr()?);
890        project_exprs.push(time_expr);
891        project_exprs.extend(self.create_tag_column_exprs()?);
892
893        LogicalPlanBuilder::from(normalize)
894            .project(project_exprs)
895            .context(DataFusionPlanningSnafu)?
896            .build()
897            .context(DataFusionPlanningSnafu)
898    }
899
900    async fn prom_matrix_selector_to_plan(
901        &mut self,
902        matrix_selector: &MatrixSelector,
903    ) -> Result<LogicalPlan> {
904        let MatrixSelector { vs, range } = matrix_selector;
905        let VectorSelector {
906            name,
907            offset,
908            matchers,
909            ..
910        } = vs;
911        let matchers = self.preprocess_label_matchers(matchers, name)?;
912        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
913        let range_ms = range.as_millis() as _;
914        self.ctx.range = Some(range_ms);
915
916        // Some functions like rate may require special fields in the RangeManipulate plan
917        // so we can't skip RangeManipulate.
918        let normalize = match self.setup_context().await? {
919            Some(empty_plan) => empty_plan,
920            None => {
921                self.selector_to_series_normalize_plan(offset, matchers, true)
922                    .await?
923            }
924        };
925        let manipulate = RangeManipulate::new(
926            self.ctx.start,
927            self.ctx.end,
928            self.ctx.interval,
929            // TODO(ruihang): convert via Timestamp datatypes to support different time units
930            range_ms,
931            self.ctx
932                .time_index_column
933                .clone()
934                .expect("time index should be set in `setup_context`"),
935            self.ctx.field_columns.clone(),
936            normalize,
937        )
938        .context(DataFusionPlanningSnafu)?;
939
940        Ok(LogicalPlan::Extension(Extension {
941            node: Arc::new(manipulate),
942        }))
943    }
944
945    async fn prom_call_expr_to_plan(
946        &mut self,
947        query_engine_state: &QueryEngineState,
948        call_expr: &Call,
949    ) -> Result<LogicalPlan> {
950        let Call { func, args } = call_expr;
951        // some special functions that are not expression but a plan
952        match func.name {
953            SPECIAL_HISTOGRAM_QUANTILE => {
954                return self.create_histogram_plan(args, query_engine_state).await;
955            }
956            SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
957            SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
958            SPECIAL_ABSENT_FUNCTION => {
959                return self.create_absent_plan(args, query_engine_state).await;
960            }
961            _ => {}
962        }
963
964        // transform function arguments
965        let args = self.create_function_args(&args.args)?;
966        let input = if let Some(prom_expr) = &args.input {
967            self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
968                .await?
969        } else {
970            self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
971            self.ctx.reset_table_name_and_schema();
972            self.ctx.tag_columns = vec![];
973            self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
974            LogicalPlan::Extension(Extension {
975                node: Arc::new(
976                    EmptyMetric::new(
977                        self.ctx.start,
978                        self.ctx.end,
979                        self.ctx.interval,
980                        SPECIAL_TIME_FUNCTION.to_string(),
981                        DEFAULT_FIELD_COLUMN.to_string(),
982                        None,
983                    )
984                    .context(DataFusionPlanningSnafu)?,
985                ),
986            })
987        };
988        let (mut func_exprs, new_tags) =
989            self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
990        func_exprs.insert(0, self.create_time_index_column_expr()?);
991        func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
992
993        let builder = LogicalPlanBuilder::from(input)
994            .project(func_exprs)
995            .context(DataFusionPlanningSnafu)?
996            .filter(self.create_empty_values_filter_expr()?)
997            .context(DataFusionPlanningSnafu)?;
998
999        let builder = match func.name {
1000            "sort" => builder
1001                .sort(self.create_field_columns_sort_exprs(true))
1002                .context(DataFusionPlanningSnafu)?,
1003            "sort_desc" => builder
1004                .sort(self.create_field_columns_sort_exprs(false))
1005                .context(DataFusionPlanningSnafu)?,
1006            "sort_by_label" => builder
1007                .sort(Self::create_sort_exprs_by_tags(
1008                    func.name,
1009                    args.literals,
1010                    true,
1011                )?)
1012                .context(DataFusionPlanningSnafu)?,
1013            "sort_by_label_desc" => builder
1014                .sort(Self::create_sort_exprs_by_tags(
1015                    func.name,
1016                    args.literals,
1017                    false,
1018                )?)
1019                .context(DataFusionPlanningSnafu)?,
1020
1021            _ => builder,
1022        };
1023
1024        // Update context tags after building plan
1025        // We can't push them before planning, because they won't exist until projection.
1026        for tag in new_tags {
1027            self.ctx.tag_columns.push(tag);
1028        }
1029
1030        let plan = builder.build().context(DataFusionPlanningSnafu)?;
1031        common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
1032
1033        Ok(plan)
1034    }
1035
1036    async fn prom_ext_expr_to_plan(
1037        &mut self,
1038        query_engine_state: &QueryEngineState,
1039        ext_expr: &promql_parser::parser::ast::Extension,
1040    ) -> Result<LogicalPlan> {
1041        // let promql_parser::parser::ast::Extension { expr } = ext_expr;
1042        let expr = &ext_expr.expr;
1043        let children = expr.children();
1044        let plan = self
1045            .prom_expr_to_plan(&children[0], query_engine_state)
1046            .await?;
1047        // Wrapper for the explanation/analyze of the existing plan
1048        // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
1049        // if `analyze` is true, runs the actual plan and produces
1050        // information about metrics during run.
1051        // if `verbose` is true, prints out additional details when VERBOSE keyword is specified
1052        match expr.name() {
1053            "ANALYZE" => LogicalPlanBuilder::from(plan)
1054                .explain(false, true)
1055                .unwrap()
1056                .build()
1057                .context(DataFusionPlanningSnafu),
1058            "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
1059                .explain(true, true)
1060                .unwrap()
1061                .build()
1062                .context(DataFusionPlanningSnafu),
1063            "EXPLAIN" => LogicalPlanBuilder::from(plan)
1064                .explain(false, false)
1065                .unwrap()
1066                .build()
1067                .context(DataFusionPlanningSnafu),
1068            "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
1069                .explain(true, false)
1070                .unwrap()
1071                .build()
1072                .context(DataFusionPlanningSnafu),
1073            _ => LogicalPlanBuilder::empty(true)
1074                .build()
1075                .context(DataFusionPlanningSnafu),
1076        }
1077    }
1078
1079    /// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].
1080    /// Returns a new [Matchers] that doesn't contain metric name matcher.
1081    ///
1082    /// Each call to this function means new selector is started. Thus, the context will be reset
1083    /// at first.
1084    ///
1085    /// Name rule:
1086    /// - if `name` is some, then the matchers MUST NOT contain `__name__` matcher.
1087    /// - if `name` is none, then the matchers MAY contain NONE OR MULTIPLE `__name__` matchers.
1088    #[allow(clippy::mutable_key_type)]
1089    fn preprocess_label_matchers(
1090        &mut self,
1091        label_matchers: &Matchers,
1092        name: &Option<String>,
1093    ) -> Result<Matchers> {
1094        self.ctx.reset();
1095
1096        let metric_name;
1097        if let Some(name) = name.clone() {
1098            metric_name = Some(name);
1099            ensure!(
1100                label_matchers.find_matchers(METRIC_NAME).is_empty(),
1101                MultipleMetricMatchersSnafu
1102            );
1103        } else {
1104            let mut matches = label_matchers.find_matchers(METRIC_NAME);
1105            ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1106            ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1107            ensure!(
1108                matches[0].op == MatchOp::Equal,
1109                UnsupportedMatcherOpSnafu {
1110                    matcher_op: matches[0].op.to_string(),
1111                    matcher: METRIC_NAME
1112                }
1113            );
1114            metric_name = matches.pop().map(|m| m.value);
1115        }
1116
1117        self.ctx.table_name = metric_name;
1118
1119        let mut matchers = HashSet::new();
1120        for matcher in &label_matchers.matchers {
1121            // TODO(ruihang): support other metric match ops
1122            if matcher.name == FIELD_COLUMN_MATCHER {
1123                self.ctx
1124                    .field_column_matcher
1125                    .get_or_insert_default()
1126                    .push(matcher.clone());
1127            } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1128                ensure!(
1129                    matcher.op == MatchOp::Equal,
1130                    UnsupportedMatcherOpSnafu {
1131                        matcher: matcher.name.clone(),
1132                        matcher_op: matcher.op.to_string(),
1133                    }
1134                );
1135                self.ctx.schema_name = Some(matcher.value.clone());
1136            } else if matcher.name != METRIC_NAME {
1137                self.ctx.selector_matcher.push(matcher.clone());
1138                let _ = matchers.insert(matcher.clone());
1139            }
1140        }
1141
1142        Ok(Matchers::new(matchers.into_iter().collect()))
1143    }
1144
1145    async fn selector_to_series_normalize_plan(
1146        &mut self,
1147        offset: &Option<Offset>,
1148        label_matchers: Matchers,
1149        is_range_selector: bool,
1150    ) -> Result<LogicalPlan> {
1151        // make table scan plan
1152        let table_ref = self.table_ref()?;
1153        let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1154        let table_schema = table_scan.schema();
1155
1156        // make filter exprs
1157        let offset_duration = match offset {
1158            Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1159            Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1160            None => 0,
1161        };
1162        let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1163        if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1164            scan_filters.push(time_index_filter);
1165        }
1166        table_scan = LogicalPlanBuilder::from(table_scan)
1167            .filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
1168            .context(DataFusionPlanningSnafu)?
1169            .build()
1170            .context(DataFusionPlanningSnafu)?;
1171
1172        // make a projection plan if there is any `__field__` matcher
1173        if let Some(field_matchers) = &self.ctx.field_column_matcher {
1174            let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1175            // opt-in set
1176            let mut result_set = HashSet::new();
1177            // opt-out set
1178            let mut reverse_set = HashSet::new();
1179            for matcher in field_matchers {
1180                match &matcher.op {
1181                    MatchOp::Equal => {
1182                        if col_set.contains(&matcher.value) {
1183                            let _ = result_set.insert(matcher.value.clone());
1184                        } else {
1185                            return Err(ColumnNotFoundSnafu {
1186                                col: matcher.value.clone(),
1187                            }
1188                            .build());
1189                        }
1190                    }
1191                    MatchOp::NotEqual => {
1192                        if col_set.contains(&matcher.value) {
1193                            let _ = reverse_set.insert(matcher.value.clone());
1194                        } else {
1195                            return Err(ColumnNotFoundSnafu {
1196                                col: matcher.value.clone(),
1197                            }
1198                            .build());
1199                        }
1200                    }
1201                    MatchOp::Re(regex) => {
1202                        for col in &self.ctx.field_columns {
1203                            if regex.is_match(col) {
1204                                let _ = result_set.insert(col.clone());
1205                            }
1206                        }
1207                    }
1208                    MatchOp::NotRe(regex) => {
1209                        for col in &self.ctx.field_columns {
1210                            if regex.is_match(col) {
1211                                let _ = reverse_set.insert(col.clone());
1212                            }
1213                        }
1214                    }
1215                }
1216            }
1217            // merge two set
1218            if result_set.is_empty() {
1219                result_set = col_set.into_iter().cloned().collect();
1220            }
1221            for col in reverse_set {
1222                let _ = result_set.remove(&col);
1223            }
1224
1225            // mask the field columns in context using computed result set
1226            self.ctx.field_columns = self
1227                .ctx
1228                .field_columns
1229                .drain(..)
1230                .filter(|col| result_set.contains(col))
1231                .collect();
1232
1233            let exprs = result_set
1234                .into_iter()
1235                .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1236                .chain(self.create_tag_column_exprs()?)
1237                .chain(
1238                    self.ctx
1239                        .use_tsid
1240                        .then_some(DfExpr::Column(Column::new_unqualified(
1241                            DATA_SCHEMA_TSID_COLUMN_NAME,
1242                        )))
1243                        .into_iter(),
1244                )
1245                .chain(Some(self.create_time_index_column_expr()?))
1246                .collect::<Vec<_>>();
1247
1248            // reuse this variable for simplicity
1249            table_scan = LogicalPlanBuilder::from(table_scan)
1250                .project(exprs)
1251                .context(DataFusionPlanningSnafu)?
1252                .build()
1253                .context(DataFusionPlanningSnafu)?;
1254        }
1255
1256        // make sort plan
1257        let series_key_columns = if self.ctx.use_tsid {
1258            vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
1259        } else {
1260            self.ctx.tag_columns.clone()
1261        };
1262
1263        let sort_exprs = if self.ctx.use_tsid {
1264            vec![
1265                DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true),
1266                self.create_time_index_column_expr()?.sort(true, true),
1267            ]
1268        } else {
1269            self.create_tag_and_time_index_column_sort_exprs()?
1270        };
1271
1272        let sort_plan = LogicalPlanBuilder::from(table_scan)
1273            .sort(sort_exprs)
1274            .context(DataFusionPlanningSnafu)?
1275            .build()
1276            .context(DataFusionPlanningSnafu)?;
1277
1278        // make divide plan
1279        let time_index_column =
1280            self.ctx
1281                .time_index_column
1282                .clone()
1283                .with_context(|| TimeIndexNotFoundSnafu {
1284                    table: table_ref.to_string(),
1285                })?;
1286        let divide_plan = LogicalPlan::Extension(Extension {
1287            node: Arc::new(SeriesDivide::new(
1288                series_key_columns.clone(),
1289                time_index_column,
1290                sort_plan,
1291            )),
1292        });
1293
1294        // make series_normalize plan
1295        if !is_range_selector && offset_duration == 0 {
1296            return Ok(divide_plan);
1297        }
1298        let series_normalize = SeriesNormalize::new(
1299            offset_duration,
1300            self.ctx
1301                .time_index_column
1302                .clone()
1303                .with_context(|| TimeIndexNotFoundSnafu {
1304                    table: table_ref.to_quoted_string(),
1305                })?,
1306            is_range_selector,
1307            series_key_columns,
1308            divide_plan,
1309        );
1310        let logical_plan = LogicalPlan::Extension(Extension {
1311            node: Arc::new(series_normalize),
1312        });
1313
1314        Ok(logical_plan)
1315    }
1316
1317    /// Convert [LabelModifier] to [Column] exprs for aggregation.
1318    /// Timestamp column and tag columns will be included.
1319    ///
1320    /// # Side effect
1321    ///
1322    /// This method will also change the tag columns in ctx if `update_ctx` is true.
1323    fn agg_modifier_to_col(
1324        &mut self,
1325        input_schema: &DFSchemaRef,
1326        modifier: &Option<LabelModifier>,
1327        update_ctx: bool,
1328    ) -> Result<Vec<DfExpr>> {
1329        match modifier {
1330            None => {
1331                if update_ctx {
1332                    self.ctx.tag_columns.clear();
1333                }
1334                Ok(vec![self.create_time_index_column_expr()?])
1335            }
1336            Some(LabelModifier::Include(labels)) => {
1337                if update_ctx {
1338                    self.ctx.tag_columns.clear();
1339                }
1340                let mut exprs = Vec::with_capacity(labels.labels.len());
1341                for label in &labels.labels {
1342                    if is_metric_engine_internal_column(label) {
1343                        continue;
1344                    }
1345                    // nonexistence label will be ignored
1346                    if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label)
1347                    {
1348                        exprs.push(DfExpr::Column(Column::from_name(column_name.clone())));
1349
1350                        if update_ctx {
1351                            // update the tag columns in context
1352                            self.ctx.tag_columns.push(column_name);
1353                        }
1354                    }
1355                }
1356                // add timestamp column
1357                exprs.push(self.create_time_index_column_expr()?);
1358
1359                Ok(exprs)
1360            }
1361            Some(LabelModifier::Exclude(labels)) => {
1362                let mut all_fields = input_schema
1363                    .fields()
1364                    .iter()
1365                    .map(|f| f.name())
1366                    .collect::<BTreeSet<_>>();
1367
1368                // Exclude metric engine internal columns (not PromQL labels) from the implicit
1369                // "without" label set.
1370                all_fields.retain(|col| !is_metric_engine_internal_column(col.as_str()));
1371
1372                // remove "without"-ed fields
1373                // nonexistence label will be ignored
1374                for label in &labels.labels {
1375                    let _ = all_fields.remove(label);
1376                }
1377
1378                // remove time index and value fields
1379                if let Some(time_index) = &self.ctx.time_index_column {
1380                    let _ = all_fields.remove(time_index);
1381                }
1382                for value in &self.ctx.field_columns {
1383                    let _ = all_fields.remove(value);
1384                }
1385
1386                if update_ctx {
1387                    // change the tag columns in context
1388                    self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1389                }
1390
1391                // collect remaining fields and convert to col expr
1392                let mut exprs = all_fields
1393                    .into_iter()
1394                    .map(|c| DfExpr::Column(Column::from(c)))
1395                    .collect::<Vec<_>>();
1396
1397                // add timestamp column
1398                exprs.push(self.create_time_index_column_expr()?);
1399
1400                Ok(exprs)
1401            }
1402        }
1403    }
1404
1405    // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
1406    pub fn matchers_to_expr(
1407        label_matchers: Matchers,
1408        table_schema: &DFSchemaRef,
1409    ) -> Result<Vec<DfExpr>> {
1410        let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1411        for matcher in label_matchers.matchers {
1412            if matcher.name == SCHEMA_COLUMN_MATCHER
1413                || matcher.name == DB_COLUMN_MATCHER
1414                || matcher.name == FIELD_COLUMN_MATCHER
1415            {
1416                continue;
1417            }
1418
1419            let column_name = Self::find_case_sensitive_column(table_schema, matcher.name.as_str());
1420            let col = if let Some(column_name) = column_name {
1421                DfExpr::Column(Column::from_name(column_name))
1422            } else {
1423                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
1424                    .alias(matcher.name.clone())
1425            };
1426            let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1427            let expr = match matcher.op {
1428                MatchOp::Equal => col.eq(lit),
1429                MatchOp::NotEqual => col.not_eq(lit),
1430                MatchOp::Re(re) => {
1431                    // TODO(ruihang): a more programmatic way to handle this in datafusion
1432
1433                    // This is a hack to handle `.+` and `.*`, and is not strictly correct
1434                    // `.` doesn't match newline (`\n`). Given this is in PromQL context,
1435                    // most of the time it's fine.
1436                    if re.as_str() == "^(?:.*)$" {
1437                        continue;
1438                    }
1439                    if re.as_str() == "^(?:.+)$" {
1440                        col.not_eq(DfExpr::Literal(
1441                            ScalarValue::Utf8(Some(String::new())),
1442                            None,
1443                        ))
1444                    } else {
1445                        DfExpr::BinaryExpr(BinaryExpr {
1446                            left: Box::new(col),
1447                            op: Operator::RegexMatch,
1448                            right: Box::new(DfExpr::Literal(
1449                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1450                                None,
1451                            )),
1452                        })
1453                    }
1454                }
1455                MatchOp::NotRe(re) => {
1456                    if re.as_str() == "^(?:.*)$" {
1457                        DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1458                    } else if re.as_str() == "^(?:.+)$" {
1459                        col.eq(DfExpr::Literal(
1460                            ScalarValue::Utf8(Some(String::new())),
1461                            None,
1462                        ))
1463                    } else {
1464                        DfExpr::BinaryExpr(BinaryExpr {
1465                            left: Box::new(col),
1466                            op: Operator::RegexNotMatch,
1467                            right: Box::new(DfExpr::Literal(
1468                                ScalarValue::Utf8(Some(re.as_str().to_string())),
1469                                None,
1470                            )),
1471                        })
1472                    }
1473                }
1474            };
1475            exprs.push(expr);
1476        }
1477
1478        Ok(exprs)
1479    }
1480
1481    fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option<String> {
1482        if is_metric_engine_internal_column(column) {
1483            return None;
1484        }
1485        schema
1486            .fields()
1487            .iter()
1488            .find(|field| field.name() == column)
1489            .map(|field| field.name().clone())
1490    }
1491
1492    fn table_from_source(&self, source: &Arc<dyn TableSource>) -> Result<table::TableRef> {
1493        Ok(source
1494            .as_any()
1495            .downcast_ref::<DefaultTableSource>()
1496            .context(UnknownTableSnafu)?
1497            .table_provider
1498            .as_any()
1499            .downcast_ref::<DfTableProviderAdapter>()
1500            .context(UnknownTableSnafu)?
1501            .table())
1502    }
1503
1504    fn table_ref(&self) -> Result<TableReference> {
1505        let table_name = self
1506            .ctx
1507            .table_name
1508            .clone()
1509            .context(TableNameNotFoundSnafu)?;
1510
1511        // set schema name if `__schema__` is given
1512        let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1513            TableReference::partial(schema_name.as_str(), table_name.as_str())
1514        } else {
1515            TableReference::bare(table_name.as_str())
1516        };
1517
1518        Ok(table_ref)
1519    }
1520
1521    fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1522        let start = self.ctx.start;
1523        let end = self.ctx.end;
1524        if end < start {
1525            return InvalidTimeRangeSnafu { start, end }.fail();
1526        }
1527        let lookback_delta = self.ctx.lookback_delta;
1528        let range = self.ctx.range.unwrap_or_default();
1529        let interval = self.ctx.interval;
1530        let time_index_expr = self.create_time_index_column_expr()?;
1531        let num_points = (end - start) / interval;
1532
1533        // Scan a continuous time range
1534        if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1535            let single_time_range = time_index_expr
1536                .clone()
1537                .gt_eq(DfExpr::Literal(
1538                    ScalarValue::TimestampMillisecond(
1539                        Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
1540                        None,
1541                    ),
1542                    None,
1543                ))
1544                .and(time_index_expr.lt_eq(DfExpr::Literal(
1545                    ScalarValue::TimestampMillisecond(
1546                        Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
1547                        None,
1548                    ),
1549                    None,
1550                )));
1551            return Ok(Some(single_time_range));
1552        }
1553
1554        // Otherwise scan scatter ranges separately
1555        let mut filters = Vec::with_capacity(num_points as usize);
1556        for timestamp in (start..end).step_by(interval as usize) {
1557            filters.push(
1558                time_index_expr
1559                    .clone()
1560                    .gt_eq(DfExpr::Literal(
1561                        ScalarValue::TimestampMillisecond(
1562                            Some(timestamp - offset_duration - lookback_delta - range),
1563                            None,
1564                        ),
1565                        None,
1566                    ))
1567                    .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1568                        ScalarValue::TimestampMillisecond(
1569                            Some(timestamp - offset_duration + lookback_delta),
1570                            None,
1571                        ),
1572                        None,
1573                    ))),
1574            )
1575        }
1576
1577        Ok(filters.into_iter().reduce(DfExpr::or))
1578    }
1579
1580    /// Create a table scan plan and a filter plan with given filter.
1581    ///
1582    /// # Panic
1583    /// If the filter is empty
1584    async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1585        let provider = self
1586            .table_provider
1587            .resolve_table(table_ref.clone())
1588            .await
1589            .context(CatalogSnafu)?;
1590
1591        let logical_table = self.table_from_source(&provider)?;
1592
1593        // Try to rewrite the table scan to physical table scan if possible.
1594        let mut maybe_phy_table_ref = table_ref.clone();
1595        let mut scan_provider = provider;
1596        let mut table_id_filter: Option<u32> = None;
1597
1598        // If it's a metric engine logical table, scan its physical table directly and filter by
1599        // `__table_id = logical_table_id` to get access to internal columns like `__tsid`.
1600        if logical_table.table_info().meta.engine == METRIC_ENGINE_NAME
1601            && let Some(physical_table_name) = logical_table
1602                .table_info()
1603                .meta
1604                .options
1605                .extra_options
1606                .get(LOGICAL_TABLE_METADATA_KEY)
1607        {
1608            let physical_table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1609                TableReference::partial(schema_name.as_str(), physical_table_name.as_str())
1610            } else {
1611                TableReference::bare(physical_table_name.as_str())
1612            };
1613
1614            let physical_provider = match self
1615                .table_provider
1616                .resolve_table(physical_table_ref.clone())
1617                .await
1618            {
1619                Ok(provider) => provider,
1620                Err(e) if e.status_code() == StatusCode::TableNotFound => {
1621                    // Fall back to scanning the logical table. It still works, but without
1622                    // `__tsid` optimization.
1623                    scan_provider.clone()
1624                }
1625                Err(e) => return Err(e).context(CatalogSnafu),
1626            };
1627
1628            if !Arc::ptr_eq(&physical_provider, &scan_provider) {
1629                // Only rewrite when internal columns exist in physical schema.
1630                let physical_table = self.table_from_source(&physical_provider)?;
1631
1632                let has_table_id = physical_table
1633                    .schema()
1634                    .column_schema_by_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
1635                    .is_some();
1636                let has_tsid = physical_table
1637                    .schema()
1638                    .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1639                    .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1640
1641                if has_table_id && has_tsid {
1642                    scan_provider = physical_provider;
1643                    maybe_phy_table_ref = physical_table_ref;
1644                    table_id_filter = Some(logical_table.table_info().ident.table_id);
1645                }
1646            }
1647        }
1648
1649        let scan_table = self.table_from_source(&scan_provider)?;
1650
1651        let use_tsid = table_id_filter.is_some()
1652            && scan_table
1653                .schema()
1654                .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1655                .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1656        self.ctx.use_tsid = use_tsid;
1657
1658        let all_table_tags = self.ctx.tag_columns.clone();
1659
1660        let scan_tag_columns = if use_tsid {
1661            let mut scan_tags = self.ctx.tag_columns.clone();
1662            for matcher in &self.ctx.selector_matcher {
1663                if is_metric_engine_internal_column(&matcher.name) {
1664                    continue;
1665                }
1666                if all_table_tags.iter().any(|tag| tag == &matcher.name) {
1667                    scan_tags.push(matcher.name.clone());
1668                }
1669            }
1670            scan_tags.sort_unstable();
1671            scan_tags.dedup();
1672            scan_tags
1673        } else {
1674            self.ctx.tag_columns.clone()
1675        };
1676
1677        let is_time_index_ms = scan_table
1678            .schema()
1679            .timestamp_column()
1680            .with_context(|| TimeIndexNotFoundSnafu {
1681                table: maybe_phy_table_ref.to_quoted_string(),
1682            })?
1683            .data_type
1684            == ConcreteDataType::timestamp_millisecond_datatype();
1685
1686        let scan_projection = if table_id_filter.is_some() {
1687            let mut required_columns = HashSet::new();
1688            required_columns.insert(DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string());
1689            required_columns.insert(self.ctx.time_index_column.clone().with_context(|| {
1690                TimeIndexNotFoundSnafu {
1691                    table: maybe_phy_table_ref.to_quoted_string(),
1692                }
1693            })?);
1694            for col in &scan_tag_columns {
1695                required_columns.insert(col.clone());
1696            }
1697            for col in &self.ctx.field_columns {
1698                required_columns.insert(col.clone());
1699            }
1700            if use_tsid {
1701                required_columns.insert(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
1702            }
1703
1704            let arrow_schema = scan_table.schema().arrow_schema().clone();
1705            Some(
1706                arrow_schema
1707                    .fields()
1708                    .iter()
1709                    .enumerate()
1710                    .filter(|(_, field)| required_columns.contains(field.name().as_str()))
1711                    .map(|(idx, _)| idx)
1712                    .collect::<Vec<_>>(),
1713            )
1714        } else {
1715            None
1716        };
1717
1718        let mut scan_plan =
1719            LogicalPlanBuilder::scan(maybe_phy_table_ref.clone(), scan_provider, scan_projection)
1720                .context(DataFusionPlanningSnafu)?
1721                .build()
1722                .context(DataFusionPlanningSnafu)?;
1723
1724        if let Some(table_id) = table_id_filter {
1725            scan_plan = LogicalPlanBuilder::from(scan_plan)
1726                .filter(
1727                    DfExpr::Column(Column::from_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME))
1728                        .eq(lit(table_id)),
1729                )
1730                .context(DataFusionPlanningSnafu)?
1731                .alias(table_ref.clone()) // rename the relation back to logical table's name after filtering
1732                .context(DataFusionPlanningSnafu)?
1733                .build()
1734                .context(DataFusionPlanningSnafu)?;
1735        }
1736
1737        if !is_time_index_ms {
1738            // cast to ms if time_index not in Millisecond precision
1739            let expr: Vec<_> = self
1740                .create_field_column_exprs()?
1741                .into_iter()
1742                .chain(
1743                    scan_tag_columns
1744                        .iter()
1745                        .map(|tag| DfExpr::Column(Column::from_name(tag))),
1746                )
1747                .chain(
1748                    self.ctx
1749                        .use_tsid
1750                        .then_some(DfExpr::Column(Column::new(
1751                            Some(table_ref.clone()),
1752                            DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
1753                        )))
1754                        .into_iter(),
1755                )
1756                .chain(Some(DfExpr::Alias(Alias {
1757                    expr: Box::new(DfExpr::Cast(Cast {
1758                        expr: Box::new(self.create_time_index_column_expr()?),
1759                        data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1760                    })),
1761                    relation: Some(table_ref.clone()),
1762                    name: self
1763                        .ctx
1764                        .time_index_column
1765                        .as_ref()
1766                        .with_context(|| TimeIndexNotFoundSnafu {
1767                            table: table_ref.to_quoted_string(),
1768                        })?
1769                        .clone(),
1770                    metadata: None,
1771                })))
1772                .collect::<Vec<_>>();
1773            scan_plan = LogicalPlanBuilder::from(scan_plan)
1774                .project(expr)
1775                .context(DataFusionPlanningSnafu)?
1776                .build()
1777                .context(DataFusionPlanningSnafu)?;
1778        } else if table_id_filter.is_some() {
1779            // Drop the internal `__table_id` column after filtering.
1780            let project_exprs = self
1781                .create_field_column_exprs()?
1782                .into_iter()
1783                .chain(
1784                    scan_tag_columns
1785                        .iter()
1786                        .map(|tag| DfExpr::Column(Column::from_name(tag))),
1787                )
1788                .chain(
1789                    self.ctx
1790                        .use_tsid
1791                        .then_some(DfExpr::Column(Column::from_name(
1792                            DATA_SCHEMA_TSID_COLUMN_NAME,
1793                        )))
1794                        .into_iter(),
1795                )
1796                .chain(Some(self.create_time_index_column_expr()?))
1797                .collect::<Vec<_>>();
1798
1799            scan_plan = LogicalPlanBuilder::from(scan_plan)
1800                .project(project_exprs)
1801                .context(DataFusionPlanningSnafu)?
1802                .build()
1803                .context(DataFusionPlanningSnafu)?;
1804        }
1805
1806        let result = LogicalPlanBuilder::from(scan_plan)
1807            .build()
1808            .context(DataFusionPlanningSnafu)?;
1809        Ok(result)
1810    }
1811
1812    fn collect_row_key_tag_columns_from_plan(
1813        &self,
1814        plan: &LogicalPlan,
1815    ) -> Result<BTreeSet<String>> {
1816        fn walk(
1817            planner: &PromPlanner,
1818            plan: &LogicalPlan,
1819            out: &mut BTreeSet<String>,
1820        ) -> Result<()> {
1821            if let LogicalPlan::TableScan(scan) = plan {
1822                let table = planner.table_from_source(&scan.source)?;
1823                for col in table.table_info().meta.row_key_column_names() {
1824                    if col != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
1825                        && col != DATA_SCHEMA_TSID_COLUMN_NAME
1826                        && !is_metric_engine_internal_column(col)
1827                    {
1828                        out.insert(col.clone());
1829                    }
1830                }
1831            }
1832
1833            for input in plan.inputs() {
1834                walk(planner, input, out)?;
1835            }
1836            Ok(())
1837        }
1838
1839        let mut out = BTreeSet::new();
1840        walk(self, plan, &mut out)?;
1841        Ok(out)
1842    }
1843
1844    fn ensure_tag_columns_available(
1845        &self,
1846        plan: LogicalPlan,
1847        required_tags: &BTreeSet<String>,
1848    ) -> Result<LogicalPlan> {
1849        if required_tags.is_empty() {
1850            return Ok(plan);
1851        }
1852
1853        struct Rewriter {
1854            required_tags: BTreeSet<String>,
1855        }
1856
1857        impl TreeNodeRewriter for Rewriter {
1858            type Node = LogicalPlan;
1859
1860            fn f_up(
1861                &mut self,
1862                node: Self::Node,
1863            ) -> datafusion_common::Result<Transformed<Self::Node>> {
1864                match node {
1865                    LogicalPlan::TableScan(scan) => {
1866                        let schema = scan.source.schema();
1867                        let mut projection = match scan.projection.clone() {
1868                            Some(p) => p,
1869                            None => {
1870                                // Scanning all columns already covers required tags.
1871                                return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1872                            }
1873                        };
1874
1875                        let mut changed = false;
1876                        for tag in &self.required_tags {
1877                            if let Some((idx, _)) = schema
1878                                .fields()
1879                                .iter()
1880                                .enumerate()
1881                                .find(|(_, field)| field.name() == tag)
1882                                && !projection.contains(&idx)
1883                            {
1884                                projection.push(idx);
1885                                changed = true;
1886                            }
1887                        }
1888
1889                        if !changed {
1890                            return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1891                        }
1892
1893                        projection.sort_unstable();
1894                        projection.dedup();
1895
1896                        let new_scan = TableScan::try_new(
1897                            scan.table_name.clone(),
1898                            scan.source.clone(),
1899                            Some(projection),
1900                            scan.filters,
1901                            scan.fetch,
1902                        )?;
1903                        Ok(Transformed::yes(LogicalPlan::TableScan(new_scan)))
1904                    }
1905                    LogicalPlan::Projection(proj) => {
1906                        let input_schema = proj.input.schema();
1907
1908                        let existing = proj
1909                            .schema
1910                            .fields()
1911                            .iter()
1912                            .map(|f| f.name().as_str())
1913                            .collect::<HashSet<_>>();
1914
1915                        let mut expr = proj.expr.clone();
1916                        let mut has_changed = false;
1917                        for tag in &self.required_tags {
1918                            if existing.contains(tag.as_str()) {
1919                                continue;
1920                            }
1921
1922                            if let Some(idx) = input_schema.index_of_column_by_name(None, tag) {
1923                                expr.push(DfExpr::Column(Column::from(
1924                                    input_schema.qualified_field(idx),
1925                                )));
1926                                has_changed = true;
1927                            }
1928                        }
1929
1930                        if !has_changed {
1931                            return Ok(Transformed::no(LogicalPlan::Projection(proj)));
1932                        }
1933
1934                        let new_proj = Projection::try_new(expr, proj.input)?;
1935                        Ok(Transformed::yes(LogicalPlan::Projection(new_proj)))
1936                    }
1937                    other => Ok(Transformed::no(other)),
1938                }
1939            }
1940        }
1941
1942        let mut rewriter = Rewriter {
1943            required_tags: required_tags.clone(),
1944        };
1945        let rewritten = plan
1946            .rewrite(&mut rewriter)
1947            .context(DataFusionPlanningSnafu)?;
1948        Ok(rewritten.data)
1949    }
1950
1951    fn refresh_tag_columns_from_schema(&mut self, schema: &DFSchemaRef) {
1952        let time_index = self.ctx.time_index_column.as_deref();
1953        let field_columns = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1954
1955        let mut tags = schema
1956            .fields()
1957            .iter()
1958            .map(|f| f.name())
1959            .filter(|name| Some(name.as_str()) != time_index)
1960            .filter(|name| !field_columns.contains(name))
1961            .filter(|name| !is_metric_engine_internal_column(name))
1962            .cloned()
1963            .collect::<Vec<_>>();
1964        tags.sort_unstable();
1965        tags.dedup();
1966        self.ctx.tag_columns = tags;
1967    }
1968
1969    /// Setup [PromPlannerContext]'s state fields.
1970    ///
1971    /// Returns a logical plan for an empty metric.
1972    async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1973        let table_ref = self.table_ref()?;
1974        let source = match self.table_provider.resolve_table(table_ref.clone()).await {
1975            Err(e) if e.status_code() == StatusCode::TableNotFound => {
1976                let plan = self.setup_context_for_empty_metric()?;
1977                return Ok(Some(plan));
1978            }
1979            res => res.context(CatalogSnafu)?,
1980        };
1981        let table = self.table_from_source(&source)?;
1982
1983        // set time index column name
1984        let time_index = table
1985            .schema()
1986            .timestamp_column()
1987            .with_context(|| TimeIndexNotFoundSnafu {
1988                table: table_ref.to_quoted_string(),
1989            })?
1990            .name
1991            .clone();
1992        self.ctx.time_index_column = Some(time_index);
1993
1994        // set values columns
1995        let values = table
1996            .table_info()
1997            .meta
1998            .field_column_names()
1999            .cloned()
2000            .collect();
2001        self.ctx.field_columns = values;
2002
2003        // set primary key (tag) columns
2004        let tags = table
2005            .table_info()
2006            .meta
2007            .row_key_column_names()
2008            .filter(|col| {
2009                // remove metric engine's internal columns
2010                col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
2011            })
2012            .cloned()
2013            .collect();
2014        self.ctx.tag_columns = tags;
2015
2016        self.ctx.use_tsid = false;
2017
2018        Ok(None)
2019    }
2020
2021    /// Setup [PromPlannerContext]'s state fields for a non existent table
2022    /// without any rows.
2023    fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
2024        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2025        self.ctx.reset_table_name_and_schema();
2026        self.ctx.tag_columns = vec![];
2027        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
2028        self.ctx.use_tsid = false;
2029
2030        // The table doesn't have any data, so we set start to 0 and end to -1.
2031        let plan = LogicalPlan::Extension(Extension {
2032            node: Arc::new(
2033                EmptyMetric::new(
2034                    0,
2035                    -1,
2036                    self.ctx.interval,
2037                    SPECIAL_TIME_FUNCTION.to_string(),
2038                    DEFAULT_FIELD_COLUMN.to_string(),
2039                    Some(lit(0.0f64)),
2040                )
2041                .context(DataFusionPlanningSnafu)?,
2042            ),
2043        });
2044        Ok(plan)
2045    }
2046
2047    // TODO(ruihang): insert column expr
2048    fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
2049        let mut result = FunctionArgs::default();
2050
2051        for arg in args {
2052            // First try to parse as literal expression (including binary expressions like 100.0 + 3.0)
2053            if let Some(expr) = Self::try_build_literal_expr(arg) {
2054                result.literals.push(expr);
2055            } else {
2056                // If not a literal, treat as vector input
2057                match arg.as_ref() {
2058                    PromExpr::Subquery(_)
2059                    | PromExpr::VectorSelector(_)
2060                    | PromExpr::MatrixSelector(_)
2061                    | PromExpr::Extension(_)
2062                    | PromExpr::Aggregate(_)
2063                    | PromExpr::Paren(_)
2064                    | PromExpr::Call(_)
2065                    | PromExpr::Binary(_)
2066                    | PromExpr::Unary(_) => {
2067                        if result.input.replace(*arg.clone()).is_some() {
2068                            MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
2069                        }
2070                    }
2071
2072                    _ => {
2073                        let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
2074                        result.literals.push(expr);
2075                    }
2076                }
2077            }
2078        }
2079
2080        Ok(result)
2081    }
2082
2083    /// Creates function expressions for projection and returns the expressions and new tags.
2084    ///
2085    /// # Side Effects
2086    ///
2087    /// This method will update [PromPlannerContext]'s fields and tags if needed.
2088    fn create_function_expr(
2089        &mut self,
2090        func: &Function,
2091        other_input_exprs: Vec<DfExpr>,
2092        query_engine_state: &QueryEngineState,
2093    ) -> Result<(Vec<DfExpr>, Vec<String>)> {
2094        // TODO(ruihang): check function args list
2095        let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
2096
2097        // TODO(ruihang): set this according to in-param list
2098        let field_column_pos = 0;
2099        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2100        // New labels after executing the function, e.g. `label_replace` etc.
2101        let mut new_tags = vec![];
2102        let scalar_func = match func.name {
2103            "increase" => ScalarFunc::ExtrapolateUdf(
2104                Arc::new(Increase::scalar_udf()),
2105                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2106            ),
2107            "rate" => ScalarFunc::ExtrapolateUdf(
2108                Arc::new(Rate::scalar_udf()),
2109                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2110            ),
2111            "delta" => ScalarFunc::ExtrapolateUdf(
2112                Arc::new(Delta::scalar_udf()),
2113                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2114            ),
2115            "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
2116            "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
2117            "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
2118            "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
2119            "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
2120            "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
2121            "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
2122            "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
2123            "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
2124            "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
2125            "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
2126            "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
2127            "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
2128            "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
2129            "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
2130            "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
2131            "predict_linear" => {
2132                other_input_exprs[0] = DfExpr::Cast(Cast {
2133                    expr: Box::new(other_input_exprs[0].clone()),
2134                    data_type: ArrowDataType::Int64,
2135                });
2136                ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
2137            }
2138            "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
2139            "time" => {
2140                exprs.push(build_special_time_expr(
2141                    self.ctx.time_index_column.as_ref().unwrap(),
2142                ));
2143                ScalarFunc::GeneratedExpr
2144            }
2145            "minute" => {
2146                // date_part('minute', time_index)
2147                let expr = self.date_part_on_time_index("minute")?;
2148                exprs.push(expr);
2149                ScalarFunc::GeneratedExpr
2150            }
2151            "hour" => {
2152                // date_part('hour', time_index)
2153                let expr = self.date_part_on_time_index("hour")?;
2154                exprs.push(expr);
2155                ScalarFunc::GeneratedExpr
2156            }
2157            "month" => {
2158                // date_part('month', time_index)
2159                let expr = self.date_part_on_time_index("month")?;
2160                exprs.push(expr);
2161                ScalarFunc::GeneratedExpr
2162            }
2163            "year" => {
2164                // date_part('year', time_index)
2165                let expr = self.date_part_on_time_index("year")?;
2166                exprs.push(expr);
2167                ScalarFunc::GeneratedExpr
2168            }
2169            "day_of_month" => {
2170                // date_part('day', time_index)
2171                let expr = self.date_part_on_time_index("day")?;
2172                exprs.push(expr);
2173                ScalarFunc::GeneratedExpr
2174            }
2175            "day_of_week" => {
2176                // date_part('dow', time_index)
2177                let expr = self.date_part_on_time_index("dow")?;
2178                exprs.push(expr);
2179                ScalarFunc::GeneratedExpr
2180            }
2181            "day_of_year" => {
2182                // date_part('doy', time_index)
2183                let expr = self.date_part_on_time_index("doy")?;
2184                exprs.push(expr);
2185                ScalarFunc::GeneratedExpr
2186            }
2187            "days_in_month" => {
2188                // date_part(
2189                //     'days',
2190                //     (date_trunc('month', <TIME INDEX>::date) + interval '1 month - 1 day')
2191                // );
2192                let day_lit_expr = "day".lit();
2193                let month_lit_expr = "month".lit();
2194                let interval_1month_lit_expr =
2195                    DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
2196                let interval_1day_lit_expr = DfExpr::Literal(
2197                    ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
2198                    None,
2199                );
2200                let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
2201                    left: Box::new(interval_1month_lit_expr),
2202                    op: Operator::Minus,
2203                    right: Box::new(interval_1day_lit_expr),
2204                });
2205                let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
2206                    func: datafusion_functions::datetime::date_trunc(),
2207                    args: vec![month_lit_expr, self.create_time_index_column_expr()?],
2208                });
2209                let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
2210                    left: Box::new(date_trunc_expr),
2211                    op: Operator::Plus,
2212                    right: Box::new(the_1month_minus_1day_expr),
2213                });
2214                let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
2215                    func: datafusion_functions::datetime::date_part(),
2216                    args: vec![day_lit_expr, date_trunc_plus_interval_expr],
2217                });
2218
2219                exprs.push(date_part_expr);
2220                ScalarFunc::GeneratedExpr
2221            }
2222
2223            "label_join" => {
2224                let (concat_expr, dst_label) = Self::build_concat_labels_expr(
2225                    &mut other_input_exprs,
2226                    &self.ctx,
2227                    query_engine_state,
2228                )?;
2229
2230                // Reserve the current field columns except the `dst_label`.
2231                for value in &self.ctx.field_columns {
2232                    if *value != dst_label {
2233                        let expr = DfExpr::Column(Column::from_name(value));
2234                        exprs.push(expr);
2235                    }
2236                }
2237
2238                // Remove it from tag columns if exists to avoid duplicated column names
2239                self.ctx.tag_columns.retain(|tag| *tag != dst_label);
2240                new_tags.push(dst_label);
2241                // Add the new label expr to evaluate
2242                exprs.push(concat_expr);
2243
2244                ScalarFunc::GeneratedExpr
2245            }
2246            "label_replace" => {
2247                if let Some((replace_expr, dst_label)) = self
2248                    .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
2249                {
2250                    // Reserve the current field columns except the `dst_label`.
2251                    for value in &self.ctx.field_columns {
2252                        if *value != dst_label {
2253                            let expr = DfExpr::Column(Column::from_name(value));
2254                            exprs.push(expr);
2255                        }
2256                    }
2257
2258                    ensure!(
2259                        !self.ctx.tag_columns.contains(&dst_label),
2260                        SameLabelSetSnafu
2261                    );
2262                    new_tags.push(dst_label);
2263                    // Add the new label expr to evaluate
2264                    exprs.push(replace_expr);
2265                } else {
2266                    // Keep the current field columns
2267                    for value in &self.ctx.field_columns {
2268                        let expr = DfExpr::Column(Column::from_name(value));
2269                        exprs.push(expr);
2270                    }
2271                }
2272
2273                ScalarFunc::GeneratedExpr
2274            }
2275            "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
2276                // These functions are not expression but a part of plan,
2277                // they are processed by `prom_call_expr_to_plan`.
2278                for value in &self.ctx.field_columns {
2279                    let expr = DfExpr::Column(Column::from_name(value));
2280                    exprs.push(expr);
2281                }
2282
2283                ScalarFunc::GeneratedExpr
2284            }
2285            "round" => {
2286                if other_input_exprs.is_empty() {
2287                    other_input_exprs.push_front(0.0f64.lit());
2288                }
2289                ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
2290            }
2291            "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
2292            "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
2293            "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
2294            "pi" => {
2295                // pi functions doesn't accepts any arguments, needs special processing
2296                let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2297                    func: datafusion::functions::math::pi(),
2298                    args: vec![],
2299                });
2300                exprs.push(fn_expr);
2301
2302                ScalarFunc::GeneratedExpr
2303            }
2304            _ => {
2305                if let Some(f) = query_engine_state
2306                    .session_state()
2307                    .scalar_functions()
2308                    .get(func.name)
2309                {
2310                    ScalarFunc::DataFusionBuiltin(f.clone())
2311                } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
2312                    let func_state = query_engine_state.function_state();
2313                    let query_ctx = self.table_provider.query_ctx();
2314
2315                    ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
2316                        state: func_state,
2317                        query_ctx: query_ctx.clone(),
2318                    })))
2319                } else if let Some(f) = datafusion_functions::math::functions()
2320                    .iter()
2321                    .find(|f| f.name() == func.name)
2322                {
2323                    ScalarFunc::DataFusionUdf(f.clone())
2324                } else {
2325                    return UnsupportedExprSnafu {
2326                        name: func.name.to_string(),
2327                    }
2328                    .fail();
2329                }
2330            }
2331        };
2332
2333        for value in &self.ctx.field_columns {
2334            let col_expr = DfExpr::Column(Column::from_name(value));
2335
2336            match scalar_func.clone() {
2337                ScalarFunc::DataFusionBuiltin(func) => {
2338                    other_input_exprs.insert(field_column_pos, col_expr);
2339                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2340                        func,
2341                        args: other_input_exprs.clone().into(),
2342                    });
2343                    exprs.push(fn_expr);
2344                    let _ = other_input_exprs.remove(field_column_pos);
2345                }
2346                ScalarFunc::DataFusionUdf(func) => {
2347                    let args = itertools::chain!(
2348                        other_input_exprs.iter().take(field_column_pos).cloned(),
2349                        std::iter::once(col_expr),
2350                        other_input_exprs.iter().skip(field_column_pos).cloned()
2351                    )
2352                    .collect_vec();
2353                    exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
2354                }
2355                ScalarFunc::Udf(func) => {
2356                    let ts_range_expr = DfExpr::Column(Column::from_name(
2357                        RangeManipulate::build_timestamp_range_name(
2358                            self.ctx.time_index_column.as_ref().unwrap(),
2359                        ),
2360                    ));
2361                    other_input_exprs.insert(field_column_pos, ts_range_expr);
2362                    other_input_exprs.insert(field_column_pos + 1, col_expr);
2363                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2364                        func,
2365                        args: other_input_exprs.clone().into(),
2366                    });
2367                    exprs.push(fn_expr);
2368                    let _ = other_input_exprs.remove(field_column_pos + 1);
2369                    let _ = other_input_exprs.remove(field_column_pos);
2370                }
2371                ScalarFunc::ExtrapolateUdf(func, range_length) => {
2372                    let ts_range_expr = DfExpr::Column(Column::from_name(
2373                        RangeManipulate::build_timestamp_range_name(
2374                            self.ctx.time_index_column.as_ref().unwrap(),
2375                        ),
2376                    ));
2377                    other_input_exprs.insert(field_column_pos, ts_range_expr);
2378                    other_input_exprs.insert(field_column_pos + 1, col_expr);
2379                    other_input_exprs
2380                        .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
2381                    other_input_exprs.push_back(lit(range_length));
2382                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2383                        func,
2384                        args: other_input_exprs.clone().into(),
2385                    });
2386                    exprs.push(fn_expr);
2387                    let _ = other_input_exprs.pop_back();
2388                    let _ = other_input_exprs.remove(field_column_pos + 2);
2389                    let _ = other_input_exprs.remove(field_column_pos + 1);
2390                    let _ = other_input_exprs.remove(field_column_pos);
2391                }
2392                ScalarFunc::GeneratedExpr => {}
2393            }
2394        }
2395
2396        // Update value columns' name, and alias them to remove qualifiers
2397        // For label functions such as `label_join`, `label_replace`, etc.,
2398        // we keep the fields unchanged.
2399        if !matches!(func.name, "label_join" | "label_replace") {
2400            let mut new_field_columns = Vec::with_capacity(exprs.len());
2401
2402            exprs = exprs
2403                .into_iter()
2404                .map(|expr| {
2405                    let display_name = expr.schema_name().to_string();
2406                    new_field_columns.push(display_name.clone());
2407                    Ok(expr.alias(display_name))
2408                })
2409                .collect::<std::result::Result<Vec<_>, _>>()
2410                .context(DataFusionPlanningSnafu)?;
2411
2412            self.ctx.field_columns = new_field_columns;
2413        }
2414
2415        Ok((exprs, new_tags))
2416    }
2417
2418    /// Validate label name according to Prometheus specification.
2419    /// Label names must match the regex: [a-zA-Z_][a-zA-Z0-9_]*
2420    /// Additionally, label names starting with double underscores are reserved for internal use.
2421    fn validate_label_name(label_name: &str) -> Result<()> {
2422        // Check if label name starts with double underscores (reserved)
2423        if label_name.starts_with("__") {
2424            return InvalidDestinationLabelNameSnafu { label_name }.fail();
2425        }
2426        // Check if label name matches the required pattern
2427        if !LABEL_NAME_REGEX.is_match(label_name) {
2428            return InvalidDestinationLabelNameSnafu { label_name }.fail();
2429        }
2430
2431        Ok(())
2432    }
2433
2434    /// Build expr for `label_replace` function
2435    fn build_regexp_replace_label_expr(
2436        &self,
2437        other_input_exprs: &mut VecDeque<DfExpr>,
2438        query_engine_state: &QueryEngineState,
2439    ) -> Result<Option<(DfExpr, String)>> {
2440        // label_replace(vector, dst_label, replacement, src_label, regex)
2441        let dst_label = match other_input_exprs.pop_front() {
2442            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2443            other => UnexpectedPlanExprSnafu {
2444                desc: format!("expected dst_label string literal, but found {:?}", other),
2445            }
2446            .fail()?,
2447        };
2448
2449        // Validate the destination label name
2450        Self::validate_label_name(&dst_label)?;
2451        let replacement = match other_input_exprs.pop_front() {
2452            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
2453            other => UnexpectedPlanExprSnafu {
2454                desc: format!("expected replacement string literal, but found {:?}", other),
2455            }
2456            .fail()?,
2457        };
2458        let src_label = match other_input_exprs.pop_front() {
2459            Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2460            other => UnexpectedPlanExprSnafu {
2461                desc: format!("expected src_label string literal, but found {:?}", other),
2462            }
2463            .fail()?,
2464        };
2465
2466        let regex = match other_input_exprs.pop_front() {
2467            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2468            other => UnexpectedPlanExprSnafu {
2469                desc: format!("expected regex string literal, but found {:?}", other),
2470            }
2471            .fail()?,
2472        };
2473
2474        // Validate the regex before using it
2475        // doc: https://prometheus.io/docs/prometheus/latest/querying/functions/#label_replace
2476        regex::Regex::new(&regex).map_err(|_| {
2477            InvalidRegularExpressionSnafu {
2478                regex: regex.clone(),
2479            }
2480            .build()
2481        })?;
2482
2483        // If the src_label exists and regex is empty, keep everything unchanged.
2484        if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2485            return Ok(None);
2486        }
2487
2488        // If the src_label doesn't exists, and
2489        if !self.ctx.tag_columns.contains(&src_label) {
2490            if replacement.is_empty() {
2491                // the replacement is empty, keep everything unchanged.
2492                return Ok(None);
2493            } else {
2494                // the replacement is not empty, always adds dst_label with replacement value.
2495                return Ok(Some((
2496                    // alias literal `replacement` as dst_label
2497                    lit(replacement).alias(&dst_label),
2498                    dst_label,
2499                )));
2500            }
2501        }
2502
2503        // Preprocess the regex:
2504        // https://github.com/prometheus/prometheus/blob/d902abc50d6652ba8fe9a81ff8e5cce936114eba/promql/functions.go#L1575C32-L1575C37
2505        let regex = format!("^(?s:{regex})$");
2506
2507        let session_state = query_engine_state.session_state();
2508        let func = session_state
2509            .scalar_functions()
2510            .get("regexp_replace")
2511            .context(UnsupportedExprSnafu {
2512                name: "regexp_replace",
2513            })?;
2514
2515        // regexp_replace(src_label, regex, replacement)
2516        let args = vec![
2517            if src_label.is_empty() {
2518                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2519            } else {
2520                DfExpr::Column(Column::from_name(src_label))
2521            },
2522            DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2523            DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2524        ];
2525
2526        Ok(Some((
2527            DfExpr::ScalarFunction(ScalarFunction {
2528                func: func.clone(),
2529                args,
2530            })
2531            .alias(&dst_label),
2532            dst_label,
2533        )))
2534    }
2535
2536    /// Build expr for `label_join` function
2537    fn build_concat_labels_expr(
2538        other_input_exprs: &mut VecDeque<DfExpr>,
2539        ctx: &PromPlannerContext,
2540        query_engine_state: &QueryEngineState,
2541    ) -> Result<(DfExpr, String)> {
2542        // label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)
2543
2544        let dst_label = match other_input_exprs.pop_front() {
2545            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2546            other => UnexpectedPlanExprSnafu {
2547                desc: format!("expected dst_label string literal, but found {:?}", other),
2548            }
2549            .fail()?,
2550        };
2551        let separator = match other_input_exprs.pop_front() {
2552            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2553            other => UnexpectedPlanExprSnafu {
2554                desc: format!("expected separator string literal, but found {:?}", other),
2555            }
2556            .fail()?,
2557        };
2558
2559        // Create a set of available columns (tag columns + field columns + time index column)
2560        let available_columns: HashSet<&str> = ctx
2561            .tag_columns
2562            .iter()
2563            .chain(ctx.field_columns.iter())
2564            .chain(ctx.time_index_column.as_ref())
2565            .map(|s| s.as_str())
2566            .collect();
2567
2568        let src_labels = other_input_exprs
2569            .iter()
2570            .map(|expr| {
2571                // Cast source label into column or null literal
2572                match expr {
2573                    DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2574                        if label.is_empty() {
2575                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2576                        } else if available_columns.contains(label.as_str()) {
2577                            // Label exists in the table schema
2578                            Ok(DfExpr::Column(Column::from_name(label)))
2579                        } else {
2580                            // Label doesn't exist, treat as empty string (null)
2581                            Ok(DfExpr::Literal(ScalarValue::Null, None))
2582                        }
2583                    }
2584                    other => UnexpectedPlanExprSnafu {
2585                        desc: format!(
2586                            "expected source label string literal, but found {:?}",
2587                            other
2588                        ),
2589                    }
2590                    .fail(),
2591                }
2592            })
2593            .collect::<Result<Vec<_>>>()?;
2594        ensure!(
2595            !src_labels.is_empty(),
2596            FunctionInvalidArgumentSnafu {
2597                fn_name: "label_join"
2598            }
2599        );
2600
2601        let session_state = query_engine_state.session_state();
2602        let func = session_state
2603            .scalar_functions()
2604            .get("concat_ws")
2605            .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2606
2607        // concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
2608        let mut args = Vec::with_capacity(1 + src_labels.len());
2609        args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2610        args.extend(src_labels);
2611
2612        Ok((
2613            DfExpr::ScalarFunction(ScalarFunction {
2614                func: func.clone(),
2615                args,
2616            })
2617            .alias(&dst_label),
2618            dst_label,
2619        ))
2620    }
2621
2622    fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2623        Ok(DfExpr::Column(Column::from_name(
2624            self.ctx
2625                .time_index_column
2626                .clone()
2627                .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2628        )))
2629    }
2630
2631    fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2632        let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2633        for tag in &self.ctx.tag_columns {
2634            let expr = DfExpr::Column(Column::from_name(tag));
2635            result.push(expr);
2636        }
2637        Ok(result)
2638    }
2639
2640    fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2641        let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2642        for field in &self.ctx.field_columns {
2643            let expr = DfExpr::Column(Column::from_name(field));
2644            result.push(expr);
2645        }
2646        Ok(result)
2647    }
2648
2649    fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2650        let mut result = self
2651            .ctx
2652            .tag_columns
2653            .iter()
2654            .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2655            .collect::<Vec<_>>();
2656        result.push(self.create_time_index_column_expr()?.sort(true, true));
2657        Ok(result)
2658    }
2659
2660    fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2661        self.ctx
2662            .field_columns
2663            .iter()
2664            .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2665            .collect::<Vec<_>>()
2666    }
2667
2668    fn create_sort_exprs_by_tags(
2669        func: &str,
2670        tags: Vec<DfExpr>,
2671        asc: bool,
2672    ) -> Result<Vec<SortExpr>> {
2673        ensure!(
2674            !tags.is_empty(),
2675            FunctionInvalidArgumentSnafu { fn_name: func }
2676        );
2677
2678        tags.iter()
2679            .map(|col| match col {
2680                DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2681                    Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2682                }
2683                other => UnexpectedPlanExprSnafu {
2684                    desc: format!("expected label string literal, but found {:?}", other),
2685                }
2686                .fail(),
2687            })
2688            .collect::<Result<Vec<_>>>()
2689    }
2690
2691    fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2692        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2693        for value in &self.ctx.field_columns {
2694            let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2695            exprs.push(expr);
2696        }
2697
2698        conjunction(exprs).context(ValueNotFoundSnafu {
2699            table: self.table_ref()?.to_quoted_string(),
2700        })
2701    }
2702
2703    /// Creates a set of DataFusion `DfExpr::AggregateFunction` expressions for each value column using the specified aggregate function.
2704    ///
2705    /// # Side Effects
2706    ///
2707    /// This method modifies the value columns in the context by replacing them with the new columns
2708    /// created by the aggregate function application.
2709    ///
2710    /// # Returns
2711    ///
2712    /// Returns a tuple of `(aggregate_expressions, previous_field_expressions)` where:
2713    /// - `aggregate_expressions`: Expressions that apply the aggregate function to the original fields
2714    /// - `previous_field_expressions`: Original field expressions before aggregation. This is non-empty
2715    ///   only when the operation is `count_values`, as this operation requires preserving the original
2716    ///   values for grouping.
2717    ///
2718    fn create_aggregate_exprs(
2719        &mut self,
2720        op: TokenType,
2721        param: &Option<Box<PromExpr>>,
2722        input_plan: &LogicalPlan,
2723    ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2724        let mut non_col_args = Vec::new();
2725        let aggr = match op.id() {
2726            token::T_SUM => sum_udaf(),
2727            token::T_QUANTILE => {
2728                let q =
2729                    Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2730                non_col_args.push(q);
2731                quantile_udaf()
2732            }
2733            token::T_AVG => avg_udaf(),
2734            token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2735            token::T_MIN => min_udaf(),
2736            token::T_MAX => max_udaf(),
2737            token::T_GROUP => grouping_udaf(),
2738            token::T_STDDEV => stddev_pop_udaf(),
2739            token::T_STDVAR => var_pop_udaf(),
2740            token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2741                name: format!("{op:?}"),
2742            }
2743            .fail()?,
2744            _ => UnexpectedTokenSnafu { token: op }.fail()?,
2745        };
2746
2747        // perform aggregate operation to each value column
2748        let exprs: Vec<DfExpr> = self
2749            .ctx
2750            .field_columns
2751            .iter()
2752            .map(|col| {
2753                non_col_args.push(DfExpr::Column(Column::from_name(col)));
2754                let expr = aggr.call(non_col_args.clone());
2755                non_col_args.pop();
2756                expr
2757            })
2758            .collect::<Vec<_>>();
2759
2760        // if the aggregator is `count_values`, it must be grouped by current fields.
2761        let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2762            let prev_field_exprs: Vec<_> = self
2763                .ctx
2764                .field_columns
2765                .iter()
2766                .map(|col| DfExpr::Column(Column::from_name(col)))
2767                .collect();
2768
2769            ensure!(
2770                self.ctx.field_columns.len() == 1,
2771                UnsupportedExprSnafu {
2772                    name: "count_values on multi-value input"
2773                }
2774            );
2775
2776            prev_field_exprs
2777        } else {
2778            vec![]
2779        };
2780
2781        // update value column name according to the aggregators,
2782        let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2783
2784        let normalized_exprs =
2785            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2786        for expr in normalized_exprs {
2787            new_field_columns.push(expr.schema_name().to_string());
2788        }
2789        self.ctx.field_columns = new_field_columns;
2790
2791        Ok((exprs, prev_field_exprs))
2792    }
2793
2794    fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2795        let param = param
2796            .as_deref()
2797            .with_context(|| FunctionInvalidArgumentSnafu {
2798                fn_name: op.to_string(),
2799            })?;
2800        let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2801            return FunctionInvalidArgumentSnafu {
2802                fn_name: op.to_string(),
2803            }
2804            .fail();
2805        };
2806
2807        Ok(val)
2808    }
2809
2810    fn get_param_as_literal_expr(
2811        param: &Option<Box<PromExpr>>,
2812        op: Option<TokenType>,
2813        expected_type: Option<ArrowDataType>,
2814    ) -> Result<DfExpr> {
2815        let prom_param = param.as_deref().with_context(|| {
2816            if let Some(op) = op {
2817                FunctionInvalidArgumentSnafu {
2818                    fn_name: op.to_string(),
2819                }
2820            } else {
2821                FunctionInvalidArgumentSnafu {
2822                    fn_name: "unknown".to_string(),
2823                }
2824            }
2825        })?;
2826
2827        let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2828            if let Some(op) = op {
2829                FunctionInvalidArgumentSnafu {
2830                    fn_name: op.to_string(),
2831                }
2832            } else {
2833                FunctionInvalidArgumentSnafu {
2834                    fn_name: "unknown".to_string(),
2835                }
2836            }
2837        })?;
2838
2839        // check if the type is expected
2840        if let Some(expected_type) = expected_type {
2841            // literal should not have reference to column
2842            let expr_type = expr
2843                .get_type(&DFSchema::empty())
2844                .context(DataFusionPlanningSnafu)?;
2845            if expected_type != expr_type {
2846                return FunctionInvalidArgumentSnafu {
2847                    fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2848                }
2849                .fail();
2850            }
2851        }
2852
2853        Ok(expr)
2854    }
2855
2856    /// Create [DfExpr::WindowFunction] expr for each value column with given window function.
2857    ///
2858    fn create_window_exprs(
2859        &mut self,
2860        op: TokenType,
2861        group_exprs: Vec<DfExpr>,
2862        input_plan: &LogicalPlan,
2863    ) -> Result<Vec<DfExpr>> {
2864        ensure!(
2865            self.ctx.field_columns.len() == 1,
2866            UnsupportedExprSnafu {
2867                name: "topk or bottomk on multi-value input"
2868            }
2869        );
2870
2871        assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2872
2873        let asc = matches!(op.id(), token::T_BOTTOMK);
2874
2875        let tag_sort_exprs = self
2876            .create_tag_column_exprs()?
2877            .into_iter()
2878            .map(|expr| expr.sort(asc, true));
2879
2880        // perform window operation to each value column
2881        let exprs: Vec<DfExpr> = self
2882            .ctx
2883            .field_columns
2884            .iter()
2885            .map(|col| {
2886                let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2887                // Order by value in the specific order
2888                sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2889                // Then tags if the values are equal,
2890                // Try to ensure the relative stability of the output results.
2891                sort_exprs.extend(tag_sort_exprs.clone());
2892
2893                DfExpr::WindowFunction(Box::new(WindowFunction {
2894                    fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2895                    params: WindowFunctionParams {
2896                        args: vec![],
2897                        partition_by: group_exprs.clone(),
2898                        order_by: sort_exprs,
2899                        window_frame: WindowFrame::new(Some(true)),
2900                        null_treatment: None,
2901                        distinct: false,
2902                        filter: None,
2903                    },
2904                }))
2905            })
2906            .collect();
2907
2908        let normalized_exprs =
2909            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2910        Ok(normalized_exprs)
2911    }
2912
2913    /// Try to build a [f64] from [PromExpr].
2914    #[deprecated(
2915        note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2916    )]
2917    fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2918        match expr {
2919            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2920            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2921            PromExpr::Unary(UnaryExpr { expr, .. }) => {
2922                Self::try_build_float_literal(expr).map(|f| -f)
2923            }
2924            PromExpr::StringLiteral(_)
2925            | PromExpr::Binary(_)
2926            | PromExpr::VectorSelector(_)
2927            | PromExpr::MatrixSelector(_)
2928            | PromExpr::Call(_)
2929            | PromExpr::Extension(_)
2930            | PromExpr::Aggregate(_)
2931            | PromExpr::Subquery(_) => None,
2932        }
2933    }
2934
2935    /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
2936    async fn create_histogram_plan(
2937        &mut self,
2938        args: &PromFunctionArgs,
2939        query_engine_state: &QueryEngineState,
2940    ) -> Result<LogicalPlan> {
2941        if args.args.len() != 2 {
2942            return FunctionInvalidArgumentSnafu {
2943                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2944            }
2945            .fail();
2946        }
2947        #[allow(deprecated)]
2948        let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2949            FunctionInvalidArgumentSnafu {
2950                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2951            }
2952        })?;
2953
2954        let input = args.args[1].as_ref().clone();
2955        let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
2956        // `histogram_quantile` folds buckets across `le`, so `__tsid` (which includes `le`) is not
2957        // a stable series identifier anymore. Also, HistogramFold infers label columns from the
2958        // input schema and must not treat `__tsid` as a label column.
2959        let input_plan = self.strip_tsid_column(input_plan)?;
2960        self.ctx.use_tsid = false;
2961
2962        if !self.ctx.has_le_tag() {
2963            // Return empty result instead of error when 'le' column is not found
2964            // This handles the case when histogram metrics don't exist
2965            return Ok(LogicalPlan::EmptyRelation(
2966                datafusion::logical_expr::EmptyRelation {
2967                    produce_one_row: false,
2968                    schema: Arc::new(DFSchema::empty()),
2969                },
2970            ));
2971        }
2972        let time_index_column =
2973            self.ctx
2974                .time_index_column
2975                .clone()
2976                .with_context(|| TimeIndexNotFoundSnafu {
2977                    table: self.ctx.table_name.clone().unwrap_or_default(),
2978                })?;
2979        // FIXME(ruihang): support multi fields
2980        let field_column = self
2981            .ctx
2982            .field_columns
2983            .first()
2984            .with_context(|| FunctionInvalidArgumentSnafu {
2985                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2986            })?
2987            .clone();
2988        // remove le column from tag columns
2989        self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2990
2991        Ok(LogicalPlan::Extension(Extension {
2992            node: Arc::new(
2993                HistogramFold::new(
2994                    LE_COLUMN_NAME.to_string(),
2995                    field_column,
2996                    time_index_column,
2997                    phi,
2998                    input_plan,
2999                )
3000                .context(DataFusionPlanningSnafu)?,
3001            ),
3002        }))
3003    }
3004
3005    /// Create a [SPECIAL_VECTOR_FUNCTION] plan
3006    async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
3007        if args.args.len() != 1 {
3008            return FunctionInvalidArgumentSnafu {
3009                fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
3010            }
3011            .fail();
3012        }
3013        let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
3014
3015        // reuse `SPECIAL_TIME_FUNCTION` as name of time index column
3016        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
3017        self.ctx.reset_table_name_and_schema();
3018        self.ctx.tag_columns = vec![];
3019        self.ctx.field_columns = vec![greptime_value().to_string()];
3020        Ok(LogicalPlan::Extension(Extension {
3021            node: Arc::new(
3022                EmptyMetric::new(
3023                    self.ctx.start,
3024                    self.ctx.end,
3025                    self.ctx.interval,
3026                    SPECIAL_TIME_FUNCTION.to_string(),
3027                    greptime_value().to_string(),
3028                    Some(lit),
3029                )
3030                .context(DataFusionPlanningSnafu)?,
3031            ),
3032        }))
3033    }
3034
3035    /// Create a [SCALAR_FUNCTION] plan
3036    async fn create_scalar_plan(
3037        &mut self,
3038        args: &PromFunctionArgs,
3039        query_engine_state: &QueryEngineState,
3040    ) -> Result<LogicalPlan> {
3041        ensure!(
3042            args.len() == 1,
3043            FunctionInvalidArgumentSnafu {
3044                fn_name: SCALAR_FUNCTION
3045            }
3046        );
3047        let input = self
3048            .prom_expr_to_plan(&args.args[0], query_engine_state)
3049            .await?;
3050        ensure!(
3051            self.ctx.field_columns.len() == 1,
3052            MultiFieldsNotSupportedSnafu {
3053                operator: SCALAR_FUNCTION
3054            },
3055        );
3056        let scalar_plan = LogicalPlan::Extension(Extension {
3057            node: Arc::new(
3058                ScalarCalculate::new(
3059                    self.ctx.start,
3060                    self.ctx.end,
3061                    self.ctx.interval,
3062                    input,
3063                    self.ctx.time_index_column.as_ref().unwrap(),
3064                    &self.ctx.tag_columns,
3065                    &self.ctx.field_columns[0],
3066                    self.ctx.table_name.as_deref(),
3067                )
3068                .context(PromqlPlanNodeSnafu)?,
3069            ),
3070        });
3071        // scalar plan have no tag columns
3072        self.ctx.tag_columns.clear();
3073        self.ctx.field_columns.clear();
3074        self.ctx
3075            .field_columns
3076            .push(scalar_plan.schema().field(1).name().clone());
3077        Ok(scalar_plan)
3078    }
3079
3080    /// Create a [SPECIAL_ABSENT_FUNCTION] plan
3081    async fn create_absent_plan(
3082        &mut self,
3083        args: &PromFunctionArgs,
3084        query_engine_state: &QueryEngineState,
3085    ) -> Result<LogicalPlan> {
3086        if args.args.len() != 1 {
3087            return FunctionInvalidArgumentSnafu {
3088                fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
3089            }
3090            .fail();
3091        }
3092        let input = self
3093            .prom_expr_to_plan(&args.args[0], query_engine_state)
3094            .await?;
3095
3096        let time_index_expr = self.create_time_index_column_expr()?;
3097        let first_field_expr =
3098            self.create_field_column_exprs()?
3099                .pop()
3100                .with_context(|| ValueNotFoundSnafu {
3101                    table: self.ctx.table_name.clone().unwrap_or_default(),
3102                })?;
3103        let first_value_expr = first_value(first_field_expr, vec![]);
3104
3105        let ordered_aggregated_input = LogicalPlanBuilder::from(input)
3106            .aggregate(
3107                vec![time_index_expr.clone()],
3108                vec![first_value_expr.clone()],
3109            )
3110            .context(DataFusionPlanningSnafu)?
3111            .sort(vec![time_index_expr.sort(true, false)])
3112            .context(DataFusionPlanningSnafu)?
3113            .build()
3114            .context(DataFusionPlanningSnafu)?;
3115
3116        let fake_labels = self
3117            .ctx
3118            .selector_matcher
3119            .iter()
3120            .filter_map(|matcher| match matcher.op {
3121                MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
3122                _ => None,
3123            })
3124            .collect::<Vec<_>>();
3125
3126        // Create the absent plan
3127        let absent_plan = LogicalPlan::Extension(Extension {
3128            node: Arc::new(
3129                Absent::try_new(
3130                    self.ctx.start,
3131                    self.ctx.end,
3132                    self.ctx.interval,
3133                    self.ctx.time_index_column.as_ref().unwrap().clone(),
3134                    self.ctx.field_columns[0].clone(),
3135                    fake_labels,
3136                    ordered_aggregated_input,
3137                )
3138                .context(DataFusionPlanningSnafu)?,
3139            ),
3140        });
3141
3142        Ok(absent_plan)
3143    }
3144
3145    /// Try to build a DataFusion Literal Expression from PromQL Expr, return
3146    /// `None` if the input is not a literal expression.
3147    fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
3148        match expr {
3149            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
3150            PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
3151            PromExpr::VectorSelector(_)
3152            | PromExpr::MatrixSelector(_)
3153            | PromExpr::Extension(_)
3154            | PromExpr::Aggregate(_)
3155            | PromExpr::Subquery(_) => None,
3156            PromExpr::Call(Call { func, .. }) => {
3157                if func.name == SPECIAL_TIME_FUNCTION {
3158                    // For time() function, don't treat it as a literal
3159                    // Let it be handled as a regular function call
3160                    None
3161                } else {
3162                    None
3163                }
3164            }
3165            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
3166            // TODO(ruihang): support Unary operator
3167            PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
3168            PromExpr::Binary(PromBinaryExpr {
3169                lhs,
3170                rhs,
3171                op,
3172                modifier,
3173            }) => {
3174                let lhs = Self::try_build_literal_expr(lhs)?;
3175                let rhs = Self::try_build_literal_expr(rhs)?;
3176                let is_comparison_op = Self::is_token_a_comparison_op(*op);
3177                let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
3178                let expr = expr_builder(lhs, rhs).ok()?;
3179
3180                let should_return_bool = if let Some(m) = modifier {
3181                    m.return_bool
3182                } else {
3183                    false
3184                };
3185                if is_comparison_op && should_return_bool {
3186                    Some(DfExpr::Cast(Cast {
3187                        expr: Box::new(expr),
3188                        data_type: ArrowDataType::Float64,
3189                    }))
3190                } else {
3191                    Some(expr)
3192                }
3193            }
3194        }
3195    }
3196
3197    fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
3198        match expr {
3199            PromExpr::Call(Call { func, .. }) => {
3200                if func.name == SPECIAL_TIME_FUNCTION
3201                    && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
3202                {
3203                    Some(build_special_time_expr(time_index_col))
3204                } else {
3205                    None
3206                }
3207            }
3208            _ => None,
3209        }
3210    }
3211
3212    /// Return a lambda to build binary expression from token.
3213    /// Because some binary operator are function in DataFusion like `atan2` or `^`.
3214    #[allow(clippy::type_complexity)]
3215    fn prom_token_to_binary_expr_builder(
3216        token: TokenType,
3217    ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
3218        match token.id() {
3219            token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
3220            token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
3221            token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
3222            token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
3223            token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
3224            token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
3225            token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
3226            token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
3227            token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
3228            token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
3229            token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
3230            token::T_POW => Ok(Box::new(|lhs, rhs| {
3231                Ok(DfExpr::ScalarFunction(ScalarFunction {
3232                    func: datafusion_functions::math::power(),
3233                    args: vec![lhs, rhs],
3234                }))
3235            })),
3236            token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
3237                Ok(DfExpr::ScalarFunction(ScalarFunction {
3238                    func: datafusion_functions::math::atan2(),
3239                    args: vec![lhs, rhs],
3240                }))
3241            })),
3242            _ => UnexpectedTokenSnafu { token }.fail(),
3243        }
3244    }
3245
3246    /// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
3247    fn is_token_a_comparison_op(token: TokenType) -> bool {
3248        matches!(
3249            token.id(),
3250            token::T_EQLC
3251                | token::T_NEQ
3252                | token::T_GTR
3253                | token::T_LSS
3254                | token::T_GTE
3255                | token::T_LTE
3256        )
3257    }
3258
3259    /// Check if the given op is a set operator (UNION, INTERSECT and EXCEPT in SQL).
3260    fn is_token_a_set_op(token: TokenType) -> bool {
3261        matches!(
3262            token.id(),
3263            token::T_LAND // INTERSECT
3264                | token::T_LOR // UNION
3265                | token::T_LUNLESS // EXCEPT
3266        )
3267    }
3268
3269    /// Build a inner join on time index column and tag columns to concat two logical plans.
3270    /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
3271    #[allow(clippy::too_many_arguments)]
3272    fn join_on_non_field_columns(
3273        &self,
3274        left: LogicalPlan,
3275        right: LogicalPlan,
3276        left_table_ref: TableReference,
3277        right_table_ref: TableReference,
3278        left_time_index_column: Option<String>,
3279        right_time_index_column: Option<String>,
3280        only_join_time_index: bool,
3281        modifier: &Option<BinModifier>,
3282    ) -> Result<LogicalPlan> {
3283        let mut left_tag_columns = if only_join_time_index {
3284            BTreeSet::new()
3285        } else {
3286            self.ctx
3287                .tag_columns
3288                .iter()
3289                .cloned()
3290                .collect::<BTreeSet<_>>()
3291        };
3292        let mut right_tag_columns = left_tag_columns.clone();
3293
3294        // apply modifier
3295        if let Some(modifier) = modifier {
3296            // apply label modifier
3297            if let Some(matching) = &modifier.matching {
3298                match matching {
3299                    // keeps columns mentioned in `on`
3300                    LabelModifier::Include(on) => {
3301                        let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
3302                        left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
3303                        right_tag_columns =
3304                            right_tag_columns.intersection(&mask).cloned().collect();
3305                    }
3306                    // removes columns memtioned in `ignoring`
3307                    LabelModifier::Exclude(ignoring) => {
3308                        // doesn't check existence of label
3309                        for label in &ignoring.labels {
3310                            let _ = left_tag_columns.remove(label);
3311                            let _ = right_tag_columns.remove(label);
3312                        }
3313                    }
3314                }
3315            }
3316        }
3317
3318        // push time index column if it exists
3319        if let (Some(left_time_index_column), Some(right_time_index_column)) =
3320            (left_time_index_column, right_time_index_column)
3321        {
3322            left_tag_columns.insert(left_time_index_column);
3323            right_tag_columns.insert(right_time_index_column);
3324        }
3325
3326        let right = LogicalPlanBuilder::from(right)
3327            .alias(right_table_ref)
3328            .context(DataFusionPlanningSnafu)?
3329            .build()
3330            .context(DataFusionPlanningSnafu)?;
3331
3332        // Inner Join on time index column to concat two operator
3333        LogicalPlanBuilder::from(left)
3334            .alias(left_table_ref)
3335            .context(DataFusionPlanningSnafu)?
3336            .join_detailed(
3337                right,
3338                JoinType::Inner,
3339                (
3340                    left_tag_columns
3341                        .into_iter()
3342                        .map(Column::from_name)
3343                        .collect::<Vec<_>>(),
3344                    right_tag_columns
3345                        .into_iter()
3346                        .map(Column::from_name)
3347                        .collect::<Vec<_>>(),
3348                ),
3349                None,
3350                NullEquality::NullEqualsNull,
3351            )
3352            .context(DataFusionPlanningSnafu)?
3353            .build()
3354            .context(DataFusionPlanningSnafu)
3355    }
3356
3357    /// Build a set operator (AND/OR/UNLESS)
3358    fn set_op_on_non_field_columns(
3359        &mut self,
3360        left: LogicalPlan,
3361        mut right: LogicalPlan,
3362        left_context: PromPlannerContext,
3363        right_context: PromPlannerContext,
3364        op: TokenType,
3365        modifier: &Option<BinModifier>,
3366    ) -> Result<LogicalPlan> {
3367        let mut left_tag_col_set = left_context
3368            .tag_columns
3369            .iter()
3370            .cloned()
3371            .collect::<HashSet<_>>();
3372        let mut right_tag_col_set = right_context
3373            .tag_columns
3374            .iter()
3375            .cloned()
3376            .collect::<HashSet<_>>();
3377
3378        if matches!(op.id(), token::T_LOR) {
3379            return self.or_operator(
3380                left,
3381                right,
3382                left_tag_col_set,
3383                right_tag_col_set,
3384                left_context,
3385                right_context,
3386                modifier,
3387            );
3388        }
3389
3390        // apply modifier
3391        if let Some(modifier) = modifier {
3392            // one-to-many and many-to-one are not supported
3393            ensure!(
3394                matches!(
3395                    modifier.card,
3396                    VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
3397                ),
3398                UnsupportedVectorMatchSnafu {
3399                    name: modifier.card.clone(),
3400                },
3401            );
3402            // apply label modifier
3403            if let Some(matching) = &modifier.matching {
3404                match matching {
3405                    // keeps columns mentioned in `on`
3406                    LabelModifier::Include(on) => {
3407                        let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
3408                        left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
3409                        right_tag_col_set =
3410                            right_tag_col_set.intersection(&mask).cloned().collect();
3411                    }
3412                    // removes columns memtioned in `ignoring`
3413                    LabelModifier::Exclude(ignoring) => {
3414                        // doesn't check existence of label
3415                        for label in &ignoring.labels {
3416                            let _ = left_tag_col_set.remove(label);
3417                            let _ = right_tag_col_set.remove(label);
3418                        }
3419                    }
3420                }
3421            }
3422        }
3423        // ensure two sides have the same tag columns
3424        if !matches!(op.id(), token::T_LOR) {
3425            ensure!(
3426                left_tag_col_set == right_tag_col_set,
3427                CombineTableColumnMismatchSnafu {
3428                    left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
3429                    right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
3430                }
3431            )
3432        };
3433        let left_time_index = left_context.time_index_column.clone().unwrap();
3434        let right_time_index = right_context.time_index_column.clone().unwrap();
3435        let join_keys = left_tag_col_set
3436            .iter()
3437            .cloned()
3438            .chain([left_time_index.clone()])
3439            .collect::<Vec<_>>();
3440        self.ctx.time_index_column = Some(left_time_index.clone());
3441        self.ctx.use_tsid = left_context.use_tsid;
3442
3443        // alias right time index column if necessary
3444        if left_context.time_index_column != right_context.time_index_column {
3445            let right_project_exprs = right
3446                .schema()
3447                .fields()
3448                .iter()
3449                .map(|field| {
3450                    if field.name() == &right_time_index {
3451                        DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
3452                    } else {
3453                        DfExpr::Column(Column::from_name(field.name()))
3454                    }
3455                })
3456                .collect::<Vec<_>>();
3457
3458            right = LogicalPlanBuilder::from(right)
3459                .project(right_project_exprs)
3460                .context(DataFusionPlanningSnafu)?
3461                .build()
3462                .context(DataFusionPlanningSnafu)?;
3463        }
3464
3465        ensure!(
3466            left_context.field_columns.len() == 1,
3467            MultiFieldsNotSupportedSnafu {
3468                operator: "AND operator"
3469            }
3470        );
3471        // Update the field column in context.
3472        // The AND/UNLESS operator only keep the field column in left input.
3473        let left_field_col = left_context.field_columns.first().unwrap();
3474        self.ctx.field_columns = vec![left_field_col.clone()];
3475
3476        // Generate join plan.
3477        // All set operations in PromQL are "distinct"
3478        match op.id() {
3479            token::T_LAND => LogicalPlanBuilder::from(left)
3480                .distinct()
3481                .context(DataFusionPlanningSnafu)?
3482                .join_detailed(
3483                    right,
3484                    JoinType::LeftSemi,
3485                    (join_keys.clone(), join_keys),
3486                    None,
3487                    NullEquality::NullEqualsNull,
3488                )
3489                .context(DataFusionPlanningSnafu)?
3490                .build()
3491                .context(DataFusionPlanningSnafu),
3492            token::T_LUNLESS => LogicalPlanBuilder::from(left)
3493                .distinct()
3494                .context(DataFusionPlanningSnafu)?
3495                .join_detailed(
3496                    right,
3497                    JoinType::LeftAnti,
3498                    (join_keys.clone(), join_keys),
3499                    None,
3500                    NullEquality::NullEqualsNull,
3501                )
3502                .context(DataFusionPlanningSnafu)?
3503                .build()
3504                .context(DataFusionPlanningSnafu),
3505            token::T_LOR => {
3506                // OR is handled at the beginning of this function, as it cannot
3507                // be expressed using JOIN like AND and UNLESS.
3508                unreachable!()
3509            }
3510            _ => UnexpectedTokenSnafu { token: op }.fail(),
3511        }
3512    }
3513
3514    // TODO(ruihang): change function name
3515    #[allow(clippy::too_many_arguments)]
3516    fn or_operator(
3517        &mut self,
3518        left: LogicalPlan,
3519        right: LogicalPlan,
3520        left_tag_cols_set: HashSet<String>,
3521        right_tag_cols_set: HashSet<String>,
3522        left_context: PromPlannerContext,
3523        right_context: PromPlannerContext,
3524        modifier: &Option<BinModifier>,
3525    ) -> Result<LogicalPlan> {
3526        // checks
3527        ensure!(
3528            left_context.field_columns.len() == right_context.field_columns.len(),
3529            CombineTableColumnMismatchSnafu {
3530                left: left_context.field_columns.clone(),
3531                right: right_context.field_columns.clone()
3532            }
3533        );
3534        ensure!(
3535            left_context.field_columns.len() == 1,
3536            MultiFieldsNotSupportedSnafu {
3537                operator: "OR operator"
3538            }
3539        );
3540
3541        // prepare hash sets
3542        let all_tags = left_tag_cols_set
3543            .union(&right_tag_cols_set)
3544            .cloned()
3545            .collect::<HashSet<_>>();
3546        let tags_not_in_left = all_tags
3547            .difference(&left_tag_cols_set)
3548            .cloned()
3549            .collect::<Vec<_>>();
3550        let tags_not_in_right = all_tags
3551            .difference(&right_tag_cols_set)
3552            .cloned()
3553            .collect::<Vec<_>>();
3554        let left_qualifier = left.schema().qualified_field(0).0.cloned();
3555        let right_qualifier = right.schema().qualified_field(0).0.cloned();
3556        let left_qualifier_string = left_qualifier
3557            .as_ref()
3558            .map(|l| l.to_string())
3559            .unwrap_or_default();
3560        let right_qualifier_string = right_qualifier
3561            .as_ref()
3562            .map(|r| r.to_string())
3563            .unwrap_or_default();
3564        let left_time_index_column =
3565            left_context
3566                .time_index_column
3567                .clone()
3568                .with_context(|| TimeIndexNotFoundSnafu {
3569                    table: left_qualifier_string.clone(),
3570                })?;
3571        let right_time_index_column =
3572            right_context
3573                .time_index_column
3574                .clone()
3575                .with_context(|| TimeIndexNotFoundSnafu {
3576                    table: right_qualifier_string.clone(),
3577                })?;
3578        // Take the name of first field column. The length is checked above.
3579        let left_field_col = left_context.field_columns.first().unwrap();
3580        let right_field_col = right_context.field_columns.first().unwrap();
3581        let left_has_tsid = left
3582            .schema()
3583            .fields()
3584            .iter()
3585            .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3586        let right_has_tsid = right
3587            .schema()
3588            .fields()
3589            .iter()
3590            .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3591
3592        // step 0: fill all columns in output schema
3593        let mut all_columns_set = left
3594            .schema()
3595            .fields()
3596            .iter()
3597            .chain(right.schema().fields().iter())
3598            .map(|field| field.name().clone())
3599            .collect::<HashSet<_>>();
3600        // Keep `__tsid` only when both sides contain it, otherwise it may break schema alignment
3601        // (e.g. `unknown_metric or some_metric`).
3602        if !(left_has_tsid && right_has_tsid) {
3603            all_columns_set.remove(DATA_SCHEMA_TSID_COLUMN_NAME);
3604        }
3605        // remove time index column
3606        all_columns_set.remove(&left_time_index_column);
3607        all_columns_set.remove(&right_time_index_column);
3608        // remove field column in the right
3609        if left_field_col != right_field_col {
3610            all_columns_set.remove(right_field_col);
3611        }
3612        let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3613        // sort to ensure the generated schema is not volatile
3614        all_columns.sort_unstable();
3615        // use left time index column name as the result time index column name
3616        all_columns.insert(0, left_time_index_column.clone());
3617
3618        // step 1: align schema using project, fill non-exist columns with null
3619        let left_proj_exprs = all_columns.iter().map(|col| {
3620            if tags_not_in_left.contains(col) {
3621                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3622            } else {
3623                DfExpr::Column(Column::new(None::<String>, col))
3624            }
3625        });
3626        let right_time_index_expr = DfExpr::Column(Column::new(
3627            right_qualifier.clone(),
3628            right_time_index_column,
3629        ))
3630        .alias(left_time_index_column.clone());
3631        // The field column in right side may not have qualifier (it may be removed by join operation),
3632        // so we need to find it from the schema.
3633        let right_qualifier_for_field = right
3634            .schema()
3635            .iter()
3636            .find(|(_, f)| f.name() == right_field_col)
3637            .map(|(q, _)| q)
3638            .with_context(|| ColumnNotFoundSnafu {
3639                col: right_field_col.clone(),
3640            })?
3641            .cloned();
3642
3643        // `skip(1)` to skip the time index column
3644        let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3645            // expr
3646            if col == left_field_col && left_field_col != right_field_col {
3647                // qualify field in right side if necessary to handle different field name
3648                DfExpr::Column(Column::new(
3649                    right_qualifier_for_field.clone(),
3650                    right_field_col,
3651                ))
3652            } else if tags_not_in_right.contains(col) {
3653                DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3654            } else {
3655                DfExpr::Column(Column::new(None::<String>, col))
3656            }
3657        });
3658        let right_proj_exprs = [right_time_index_expr]
3659            .into_iter()
3660            .chain(right_proj_exprs_without_time_index);
3661
3662        let left_projected = LogicalPlanBuilder::from(left)
3663            .project(left_proj_exprs)
3664            .context(DataFusionPlanningSnafu)?
3665            .alias(left_qualifier_string.clone())
3666            .context(DataFusionPlanningSnafu)?
3667            .build()
3668            .context(DataFusionPlanningSnafu)?;
3669        let right_projected = LogicalPlanBuilder::from(right)
3670            .project(right_proj_exprs)
3671            .context(DataFusionPlanningSnafu)?
3672            .alias(right_qualifier_string.clone())
3673            .context(DataFusionPlanningSnafu)?
3674            .build()
3675            .context(DataFusionPlanningSnafu)?;
3676
3677        // step 2: compute match columns
3678        let mut match_columns = if let Some(modifier) = modifier
3679            && let Some(matching) = &modifier.matching
3680        {
3681            match matching {
3682                // keeps columns mentioned in `on`
3683                LabelModifier::Include(on) => on.labels.clone(),
3684                // removes columns memtioned in `ignoring`
3685                LabelModifier::Exclude(ignoring) => {
3686                    let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3687                    all_tags.difference(&ignoring).cloned().collect()
3688                }
3689            }
3690        } else {
3691            all_tags.iter().cloned().collect()
3692        };
3693        // sort to ensure the generated plan is not volatile
3694        match_columns.sort_unstable();
3695        // step 3: build `UnionDistinctOn` plan
3696        let schema = left_projected.schema().clone();
3697        let union_distinct_on = UnionDistinctOn::new(
3698            left_projected,
3699            right_projected,
3700            match_columns,
3701            left_time_index_column.clone(),
3702            schema,
3703        );
3704        let result = LogicalPlan::Extension(Extension {
3705            node: Arc::new(union_distinct_on),
3706        });
3707
3708        // step 4: update context
3709        self.ctx.time_index_column = Some(left_time_index_column);
3710        self.ctx.tag_columns = all_tags.into_iter().collect();
3711        self.ctx.field_columns = vec![left_field_col.clone()];
3712        self.ctx.use_tsid = left_has_tsid && right_has_tsid;
3713
3714        Ok(result)
3715    }
3716
3717    /// Build a projection that project and perform operation expr for every value columns.
3718    /// Non-value columns (tag and timestamp) will be preserved in the projection.
3719    ///
3720    /// # Side effect
3721    ///
3722    /// This function will update the value columns in the context. Those new column names
3723    /// don't contains qualifier.
3724    fn projection_for_each_field_column<F>(
3725        &mut self,
3726        input: LogicalPlan,
3727        name_to_expr: F,
3728    ) -> Result<LogicalPlan>
3729    where
3730        F: FnMut(&String) -> Result<DfExpr>,
3731    {
3732        let non_field_columns_iter = self
3733            .ctx
3734            .tag_columns
3735            .iter()
3736            .chain(self.ctx.time_index_column.iter())
3737            .map(|col| {
3738                Ok(DfExpr::Column(Column::new(
3739                    self.ctx.table_name.clone().map(TableReference::bare),
3740                    col,
3741                )))
3742            });
3743
3744        // build computation exprs
3745        let result_field_columns = self
3746            .ctx
3747            .field_columns
3748            .iter()
3749            .map(name_to_expr)
3750            .collect::<Result<Vec<_>>>()?;
3751
3752        // alias the computation exprs to remove qualifier
3753        self.ctx.field_columns = result_field_columns
3754            .iter()
3755            .map(|expr| expr.schema_name().to_string())
3756            .collect();
3757        let field_columns_iter = result_field_columns
3758            .into_iter()
3759            .zip(self.ctx.field_columns.iter())
3760            .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3761
3762        // chain non-field columns (unchanged) and field columns (applied computation then alias)
3763        let project_fields = non_field_columns_iter
3764            .chain(field_columns_iter)
3765            .collect::<Result<Vec<_>>>()?;
3766
3767        LogicalPlanBuilder::from(input)
3768            .project(project_fields)
3769            .context(DataFusionPlanningSnafu)?
3770            .build()
3771            .context(DataFusionPlanningSnafu)
3772    }
3773
3774    /// Build a filter plan that filter on value column. Notice that only one value column
3775    /// is expected.
3776    fn filter_on_field_column<F>(
3777        &self,
3778        input: LogicalPlan,
3779        mut name_to_expr: F,
3780    ) -> Result<LogicalPlan>
3781    where
3782        F: FnMut(&String) -> Result<DfExpr>,
3783    {
3784        ensure!(
3785            self.ctx.field_columns.len() == 1,
3786            UnsupportedExprSnafu {
3787                name: "filter on multi-value input"
3788            }
3789        );
3790
3791        let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3792
3793        LogicalPlanBuilder::from(input)
3794            .filter(field_column_filter)
3795            .context(DataFusionPlanningSnafu)?
3796            .build()
3797            .context(DataFusionPlanningSnafu)
3798    }
3799
3800    /// Generate an expr like `date_part("hour", <TIME_INDEX>)`. Caller should ensure the
3801    /// time index column in context is set
3802    fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3803        let input_expr = datafusion::logical_expr::col(
3804            self.ctx
3805                .time_index_column
3806                .as_ref()
3807                // table name doesn't matters here
3808                .with_context(|| TimeIndexNotFoundSnafu {
3809                    table: "<doesn't matter>",
3810                })?,
3811        );
3812        let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3813            func: datafusion_functions::datetime::date_part(),
3814            args: vec![date_part.lit(), input_expr],
3815        });
3816        Ok(fn_expr)
3817    }
3818
3819    fn strip_tsid_column(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
3820        let schema = plan.schema();
3821        if !schema
3822            .fields()
3823            .iter()
3824            .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
3825        {
3826            return Ok(plan);
3827        }
3828
3829        let project_exprs = schema
3830            .fields()
3831            .iter()
3832            .filter(|field| field.name() != DATA_SCHEMA_TSID_COLUMN_NAME)
3833            .map(|field| Ok(DfExpr::Column(Column::from_name(field.name().clone()))))
3834            .collect::<Result<Vec<_>>>()?;
3835
3836        LogicalPlanBuilder::from(plan)
3837            .project(project_exprs)
3838            .context(DataFusionPlanningSnafu)?
3839            .build()
3840            .context(DataFusionPlanningSnafu)
3841    }
3842
3843    /// Apply an alias to the query result by adding a projection with the alias name
3844    fn apply_alias_projection(
3845        &mut self,
3846        plan: LogicalPlan,
3847        alias_name: String,
3848    ) -> Result<LogicalPlan> {
3849        let fields_expr = self.create_field_column_exprs()?;
3850
3851        // TODO(dennis): how to support multi-value aliasing?
3852        ensure!(
3853            fields_expr.len() == 1,
3854            UnsupportedExprSnafu {
3855                name: "alias on multi-value result"
3856            }
3857        );
3858
3859        let project_fields = fields_expr
3860            .into_iter()
3861            .map(|expr| expr.alias(&alias_name))
3862            .chain(self.create_tag_column_exprs()?)
3863            .chain(Some(self.create_time_index_column_expr()?));
3864
3865        LogicalPlanBuilder::from(plan)
3866            .project(project_fields)
3867            .context(DataFusionPlanningSnafu)?
3868            .build()
3869            .context(DataFusionPlanningSnafu)
3870    }
3871}
3872
3873#[derive(Default, Debug)]
3874struct FunctionArgs {
3875    input: Option<PromExpr>,
3876    literals: Vec<DfExpr>,
3877}
3878
3879/// Represents different types of scalar functions supported in PromQL expressions.
3880/// Each variant defines how the function should be processed and what arguments it expects.
3881#[derive(Debug, Clone)]
3882enum ScalarFunc {
3883    /// DataFusion's registered(including built-in) scalar functions (e.g., abs, sqrt, round, clamp).
3884    /// These are passed through directly to DataFusion's execution engine.
3885    /// Processing: Simple argument insertion at the specified position.
3886    DataFusionBuiltin(Arc<ScalarUdfDef>),
3887    /// User-defined functions registered in DataFusion's function registry.
3888    /// Similar to DataFusionBuiltin but for custom functions not built into DataFusion.
3889    /// Processing: Direct pass-through with argument positioning.
3890    DataFusionUdf(Arc<ScalarUdfDef>),
3891    /// PromQL-specific functions that operate on time series data with temporal context.
3892    /// These functions require both timestamp ranges and values to perform calculations.
3893    /// Processing: Automatically injects timestamp_range and value columns as first arguments.
3894    /// Examples: idelta, irate, resets, changes, deriv, *_over_time function
3895    Udf(Arc<ScalarUdfDef>),
3896    /// PromQL functions requiring extrapolation calculations with explicit range information.
3897    /// These functions need to know the time range length to perform rate calculations.
3898    /// The second field contains the range length in milliseconds.
3899    /// Processing: Injects timestamp_range, value, time_index columns and appends range_length.
3900    /// Examples: increase, rate, delta
3901    // TODO(ruihang): maybe merge with Udf later
3902    ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3903    /// Functions that generate expressions directly without external UDF calls.
3904    /// The expression is constructed during function matching and requires no additional processing.
3905    /// Examples: time(), minute(), hour(), month(), year() and other date/time extractors
3906    GeneratedExpr,
3907}
3908
3909#[cfg(test)]
3910mod test {
3911    use std::time::{Duration, UNIX_EPOCH};
3912
3913    use catalog::RegisterTableRequest;
3914    use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
3915    use common_base::Plugins;
3916    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3917    use common_query::prelude::greptime_timestamp;
3918    use common_query::test_util::DummyDecoder;
3919    use datatypes::prelude::ConcreteDataType;
3920    use datatypes::schema::{ColumnSchema, Schema};
3921    use promql_parser::label::Labels;
3922    use promql_parser::parser;
3923    use session::context::QueryContext;
3924    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3925    use table::test_util::EmptyTable;
3926
3927    use super::*;
3928    use crate::options::QueryOptions;
3929
3930    fn build_query_engine_state() -> QueryEngineState {
3931        QueryEngineState::new(
3932            new_memory_catalog_manager().unwrap(),
3933            None,
3934            None,
3935            None,
3936            None,
3937            None,
3938            false,
3939            Plugins::default(),
3940            QueryOptions::default(),
3941        )
3942    }
3943
3944    async fn build_test_table_provider(
3945        table_name_tuples: &[(String, String)],
3946        num_tag: usize,
3947        num_field: usize,
3948    ) -> DfTableSourceProvider {
3949        let catalog_list = MemoryCatalogManager::with_default_setup();
3950        for (schema_name, table_name) in table_name_tuples {
3951            let mut columns = vec![];
3952            for i in 0..num_tag {
3953                columns.push(ColumnSchema::new(
3954                    format!("tag_{i}"),
3955                    ConcreteDataType::string_datatype(),
3956                    false,
3957                ));
3958            }
3959            columns.push(
3960                ColumnSchema::new(
3961                    "timestamp".to_string(),
3962                    ConcreteDataType::timestamp_millisecond_datatype(),
3963                    false,
3964                )
3965                .with_time_index(true),
3966            );
3967            for i in 0..num_field {
3968                columns.push(ColumnSchema::new(
3969                    format!("field_{i}"),
3970                    ConcreteDataType::float64_datatype(),
3971                    true,
3972                ));
3973            }
3974            let schema = Arc::new(Schema::new(columns));
3975            let table_meta = TableMetaBuilder::empty()
3976                .schema(schema)
3977                .primary_key_indices((0..num_tag).collect())
3978                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3979                .next_column_id(1024)
3980                .build()
3981                .unwrap();
3982            let table_info = TableInfoBuilder::default()
3983                .name(table_name.clone())
3984                .meta(table_meta)
3985                .build()
3986                .unwrap();
3987            let table = EmptyTable::from_table_info(&table_info);
3988
3989            assert!(
3990                catalog_list
3991                    .register_table_sync(RegisterTableRequest {
3992                        catalog: DEFAULT_CATALOG_NAME.to_string(),
3993                        schema: schema_name.clone(),
3994                        table_name: table_name.clone(),
3995                        table_id: 1024,
3996                        table,
3997                    })
3998                    .is_ok()
3999            );
4000        }
4001
4002        DfTableSourceProvider::new(
4003            catalog_list,
4004            false,
4005            QueryContext::arc(),
4006            DummyDecoder::arc(),
4007            false,
4008        )
4009    }
4010
4011    async fn build_test_table_provider_with_tsid(
4012        table_name_tuples: &[(String, String)],
4013        num_tag: usize,
4014        num_field: usize,
4015    ) -> DfTableSourceProvider {
4016        let catalog_list = MemoryCatalogManager::with_default_setup();
4017
4018        let physical_table_name = "phy";
4019        let physical_table_id = 999u32;
4020
4021        // Register a metric engine physical table with internal columns.
4022        {
4023            let mut columns = vec![
4024                ColumnSchema::new(
4025                    DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
4026                    ConcreteDataType::uint32_datatype(),
4027                    false,
4028                ),
4029                ColumnSchema::new(
4030                    DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
4031                    ConcreteDataType::uint64_datatype(),
4032                    false,
4033                ),
4034            ];
4035            for i in 0..num_tag {
4036                columns.push(ColumnSchema::new(
4037                    format!("tag_{i}"),
4038                    ConcreteDataType::string_datatype(),
4039                    false,
4040                ));
4041            }
4042            columns.push(
4043                ColumnSchema::new(
4044                    "timestamp".to_string(),
4045                    ConcreteDataType::timestamp_millisecond_datatype(),
4046                    false,
4047                )
4048                .with_time_index(true),
4049            );
4050            for i in 0..num_field {
4051                columns.push(ColumnSchema::new(
4052                    format!("field_{i}"),
4053                    ConcreteDataType::float64_datatype(),
4054                    true,
4055                ));
4056            }
4057
4058            let schema = Arc::new(Schema::new(columns));
4059            let primary_key_indices = (0..(2 + num_tag)).collect::<Vec<_>>();
4060            let table_meta = TableMetaBuilder::empty()
4061                .schema(schema)
4062                .primary_key_indices(primary_key_indices)
4063                .value_indices((2 + num_tag..2 + num_tag + 1 + num_field).collect())
4064                .engine(METRIC_ENGINE_NAME.to_string())
4065                .next_column_id(1024)
4066                .build()
4067                .unwrap();
4068            let table_info = TableInfoBuilder::default()
4069                .table_id(physical_table_id)
4070                .name(physical_table_name)
4071                .meta(table_meta)
4072                .build()
4073                .unwrap();
4074            let table = EmptyTable::from_table_info(&table_info);
4075
4076            assert!(
4077                catalog_list
4078                    .register_table_sync(RegisterTableRequest {
4079                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4080                        schema: DEFAULT_SCHEMA_NAME.to_string(),
4081                        table_name: physical_table_name.to_string(),
4082                        table_id: physical_table_id,
4083                        table,
4084                    })
4085                    .is_ok()
4086            );
4087        }
4088
4089        // Register metric engine logical tables without `__tsid`, referencing the physical table.
4090        for (idx, (schema_name, table_name)) in table_name_tuples.iter().enumerate() {
4091            let mut columns = vec![];
4092            for i in 0..num_tag {
4093                columns.push(ColumnSchema::new(
4094                    format!("tag_{i}"),
4095                    ConcreteDataType::string_datatype(),
4096                    false,
4097                ));
4098            }
4099            columns.push(
4100                ColumnSchema::new(
4101                    "timestamp".to_string(),
4102                    ConcreteDataType::timestamp_millisecond_datatype(),
4103                    false,
4104                )
4105                .with_time_index(true),
4106            );
4107            for i in 0..num_field {
4108                columns.push(ColumnSchema::new(
4109                    format!("field_{i}"),
4110                    ConcreteDataType::float64_datatype(),
4111                    true,
4112                ));
4113            }
4114
4115            let schema = Arc::new(Schema::new(columns));
4116            let mut options = table::requests::TableOptions::default();
4117            options.extra_options.insert(
4118                LOGICAL_TABLE_METADATA_KEY.to_string(),
4119                physical_table_name.to_string(),
4120            );
4121            let table_id = 1024u32 + idx as u32;
4122            let table_meta = TableMetaBuilder::empty()
4123                .schema(schema)
4124                .primary_key_indices((0..num_tag).collect())
4125                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4126                .engine(METRIC_ENGINE_NAME.to_string())
4127                .options(options)
4128                .next_column_id(1024)
4129                .build()
4130                .unwrap();
4131            let table_info = TableInfoBuilder::default()
4132                .table_id(table_id)
4133                .name(table_name.clone())
4134                .meta(table_meta)
4135                .build()
4136                .unwrap();
4137            let table = EmptyTable::from_table_info(&table_info);
4138
4139            assert!(
4140                catalog_list
4141                    .register_table_sync(RegisterTableRequest {
4142                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4143                        schema: schema_name.clone(),
4144                        table_name: table_name.clone(),
4145                        table_id,
4146                        table,
4147                    })
4148                    .is_ok()
4149            );
4150        }
4151
4152        DfTableSourceProvider::new(
4153            catalog_list,
4154            false,
4155            QueryContext::arc(),
4156            DummyDecoder::arc(),
4157            false,
4158        )
4159    }
4160
4161    async fn build_test_table_provider_with_fields(
4162        table_name_tuples: &[(String, String)],
4163        tags: &[&str],
4164    ) -> DfTableSourceProvider {
4165        let catalog_list = MemoryCatalogManager::with_default_setup();
4166        for (schema_name, table_name) in table_name_tuples {
4167            let mut columns = vec![];
4168            let num_tag = tags.len();
4169            for tag in tags {
4170                columns.push(ColumnSchema::new(
4171                    tag.to_string(),
4172                    ConcreteDataType::string_datatype(),
4173                    false,
4174                ));
4175            }
4176            columns.push(
4177                ColumnSchema::new(
4178                    greptime_timestamp().to_string(),
4179                    ConcreteDataType::timestamp_millisecond_datatype(),
4180                    false,
4181                )
4182                .with_time_index(true),
4183            );
4184            columns.push(ColumnSchema::new(
4185                greptime_value().to_string(),
4186                ConcreteDataType::float64_datatype(),
4187                true,
4188            ));
4189            let schema = Arc::new(Schema::new(columns));
4190            let table_meta = TableMetaBuilder::empty()
4191                .schema(schema)
4192                .primary_key_indices((0..num_tag).collect())
4193                .next_column_id(1024)
4194                .build()
4195                .unwrap();
4196            let table_info = TableInfoBuilder::default()
4197                .name(table_name.clone())
4198                .meta(table_meta)
4199                .build()
4200                .unwrap();
4201            let table = EmptyTable::from_table_info(&table_info);
4202
4203            assert!(
4204                catalog_list
4205                    .register_table_sync(RegisterTableRequest {
4206                        catalog: DEFAULT_CATALOG_NAME.to_string(),
4207                        schema: schema_name.clone(),
4208                        table_name: table_name.clone(),
4209                        table_id: 1024,
4210                        table,
4211                    })
4212                    .is_ok()
4213            );
4214        }
4215
4216        DfTableSourceProvider::new(
4217            catalog_list,
4218            false,
4219            QueryContext::arc(),
4220            DummyDecoder::arc(),
4221            false,
4222        )
4223    }
4224
4225    // {
4226    //     input: `abs(some_metric{foo!="bar"})`,
4227    //     expected: &Call{
4228    //         Func: MustGetFunction("abs"),
4229    //         Args: Expressions{
4230    //             &VectorSelector{
4231    //                 Name: "some_metric",
4232    //                 LabelMatchers: []*labels.Matcher{
4233    //                     MustLabelMatcher(labels.MatchNotEqual, "foo", "bar"),
4234    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
4235    //                 },
4236    //             },
4237    //         },
4238    //     },
4239    // },
4240    async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
4241        let prom_expr =
4242            parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
4243        let eval_stmt = EvalStmt {
4244            expr: prom_expr,
4245            start: UNIX_EPOCH,
4246            end: UNIX_EPOCH
4247                .checked_add(Duration::from_secs(100_000))
4248                .unwrap(),
4249            interval: Duration::from_secs(5),
4250            lookback_delta: Duration::from_secs(1),
4251        };
4252
4253        let table_provider = build_test_table_provider(
4254            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4255            1,
4256            1,
4257        )
4258        .await;
4259        let plan =
4260            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4261                .await
4262                .unwrap();
4263
4264        let expected = String::from(
4265            "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4266            \n  Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4267            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4268            \n      PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4269            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4270            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4271            \n            TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4272        ).replace("TEMPLATE", plan_name);
4273
4274        assert_eq!(plan.display_indent_schema().to_string(), expected);
4275    }
4276
4277    #[tokio::test]
4278    async fn single_abs() {
4279        do_single_instant_function_call("abs", "abs").await;
4280    }
4281
4282    #[tokio::test]
4283    #[should_panic]
4284    async fn single_absent() {
4285        do_single_instant_function_call("absent", "").await;
4286    }
4287
4288    #[tokio::test]
4289    async fn single_ceil() {
4290        do_single_instant_function_call("ceil", "ceil").await;
4291    }
4292
4293    #[tokio::test]
4294    async fn single_exp() {
4295        do_single_instant_function_call("exp", "exp").await;
4296    }
4297
4298    #[tokio::test]
4299    async fn single_ln() {
4300        do_single_instant_function_call("ln", "ln").await;
4301    }
4302
4303    #[tokio::test]
4304    async fn single_log2() {
4305        do_single_instant_function_call("log2", "log2").await;
4306    }
4307
4308    #[tokio::test]
4309    async fn single_log10() {
4310        do_single_instant_function_call("log10", "log10").await;
4311    }
4312
4313    #[tokio::test]
4314    #[should_panic]
4315    async fn single_scalar() {
4316        do_single_instant_function_call("scalar", "").await;
4317    }
4318
4319    #[tokio::test]
4320    #[should_panic]
4321    async fn single_sgn() {
4322        do_single_instant_function_call("sgn", "").await;
4323    }
4324
4325    #[tokio::test]
4326    #[should_panic]
4327    async fn single_sort() {
4328        do_single_instant_function_call("sort", "").await;
4329    }
4330
4331    #[tokio::test]
4332    #[should_panic]
4333    async fn single_sort_desc() {
4334        do_single_instant_function_call("sort_desc", "").await;
4335    }
4336
4337    #[tokio::test]
4338    async fn single_sqrt() {
4339        do_single_instant_function_call("sqrt", "sqrt").await;
4340    }
4341
4342    #[tokio::test]
4343    #[should_panic]
4344    async fn single_timestamp() {
4345        do_single_instant_function_call("timestamp", "").await;
4346    }
4347
4348    #[tokio::test]
4349    async fn single_acos() {
4350        do_single_instant_function_call("acos", "acos").await;
4351    }
4352
4353    #[tokio::test]
4354    #[should_panic]
4355    async fn single_acosh() {
4356        do_single_instant_function_call("acosh", "").await;
4357    }
4358
4359    #[tokio::test]
4360    async fn single_asin() {
4361        do_single_instant_function_call("asin", "asin").await;
4362    }
4363
4364    #[tokio::test]
4365    #[should_panic]
4366    async fn single_asinh() {
4367        do_single_instant_function_call("asinh", "").await;
4368    }
4369
4370    #[tokio::test]
4371    async fn single_atan() {
4372        do_single_instant_function_call("atan", "atan").await;
4373    }
4374
4375    #[tokio::test]
4376    #[should_panic]
4377    async fn single_atanh() {
4378        do_single_instant_function_call("atanh", "").await;
4379    }
4380
4381    #[tokio::test]
4382    async fn single_cos() {
4383        do_single_instant_function_call("cos", "cos").await;
4384    }
4385
4386    #[tokio::test]
4387    #[should_panic]
4388    async fn single_cosh() {
4389        do_single_instant_function_call("cosh", "").await;
4390    }
4391
4392    #[tokio::test]
4393    async fn single_sin() {
4394        do_single_instant_function_call("sin", "sin").await;
4395    }
4396
4397    #[tokio::test]
4398    #[should_panic]
4399    async fn single_sinh() {
4400        do_single_instant_function_call("sinh", "").await;
4401    }
4402
4403    #[tokio::test]
4404    async fn single_tan() {
4405        do_single_instant_function_call("tan", "tan").await;
4406    }
4407
4408    #[tokio::test]
4409    #[should_panic]
4410    async fn single_tanh() {
4411        do_single_instant_function_call("tanh", "").await;
4412    }
4413
4414    #[tokio::test]
4415    #[should_panic]
4416    async fn single_deg() {
4417        do_single_instant_function_call("deg", "").await;
4418    }
4419
4420    #[tokio::test]
4421    #[should_panic]
4422    async fn single_rad() {
4423        do_single_instant_function_call("rad", "").await;
4424    }
4425
4426    // {
4427    //     input: "avg by (foo)(some_metric)",
4428    //     expected: &AggregateExpr{
4429    //         Op: AVG,
4430    //         Expr: &VectorSelector{
4431    //             Name: "some_metric",
4432    //             LabelMatchers: []*labels.Matcher{
4433    //                 MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
4434    //             },
4435    //             PosRange: PositionRange{
4436    //                 Start: 13,
4437    //                 End:   24,
4438    //             },
4439    //         },
4440    //         Grouping: []string{"foo"},
4441    //         PosRange: PositionRange{
4442    //             Start: 0,
4443    //             End:   25,
4444    //         },
4445    //     },
4446    // },
4447    async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
4448        let prom_expr = parser::parse(&format!(
4449            "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
4450        ))
4451        .unwrap();
4452        let mut eval_stmt = EvalStmt {
4453            expr: prom_expr,
4454            start: UNIX_EPOCH,
4455            end: UNIX_EPOCH
4456                .checked_add(Duration::from_secs(100_000))
4457                .unwrap(),
4458            interval: Duration::from_secs(5),
4459            lookback_delta: Duration::from_secs(1),
4460        };
4461
4462        // test group by
4463        let table_provider = build_test_table_provider(
4464            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4465            2,
4466            2,
4467        )
4468        .await;
4469        let plan =
4470            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4471                .await
4472                .unwrap();
4473        let expected_no_without = String::from(
4474            "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4475            \n  Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4476            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4477            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4478            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4479            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4480            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
4481        ).replace("TEMPLATE", plan_name);
4482        assert_eq!(
4483            plan.display_indent_schema().to_string(),
4484            expected_no_without
4485        );
4486
4487        // test group without
4488        if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
4489            *modifier = Some(LabelModifier::Exclude(Labels {
4490                labels: vec![String::from("tag_1")].into_iter().collect(),
4491            }));
4492        }
4493        let table_provider = build_test_table_provider(
4494            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4495            2,
4496            2,
4497        )
4498        .await;
4499        let plan =
4500            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4501                .await
4502                .unwrap();
4503        let expected_without = String::from(
4504            "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4505            \n  Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4506            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4507            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4508            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4509            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4510            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
4511        ).replace("TEMPLATE", plan_name);
4512        assert_eq!(plan.display_indent_schema().to_string(), expected_without);
4513    }
4514
4515    #[tokio::test]
4516    async fn aggregate_sum() {
4517        do_aggregate_expr_plan("sum", "sum").await;
4518    }
4519
4520    #[tokio::test]
4521    async fn tsid_is_used_for_series_divide_when_available() {
4522        let prom_expr = parser::parse("some_metric").unwrap();
4523        let eval_stmt = EvalStmt {
4524            expr: prom_expr,
4525            start: UNIX_EPOCH,
4526            end: UNIX_EPOCH
4527                .checked_add(Duration::from_secs(100_000))
4528                .unwrap(),
4529            interval: Duration::from_secs(5),
4530            lookback_delta: Duration::from_secs(1),
4531        };
4532
4533        let table_provider = build_test_table_provider_with_tsid(
4534            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4535            1,
4536            1,
4537        )
4538        .await;
4539        let plan =
4540            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4541                .await
4542                .unwrap();
4543
4544        let plan_str = plan.display_indent_schema().to_string();
4545        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4546        assert!(plan_str.contains("__tsid ASC NULLS FIRST"));
4547        assert!(
4548            !plan
4549                .schema()
4550                .fields()
4551                .iter()
4552                .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4553        );
4554    }
4555
4556    #[tokio::test]
4557    async fn physical_table_name_is_not_leaked_in_plan() {
4558        let prom_expr = parser::parse("some_metric").unwrap();
4559        let eval_stmt = EvalStmt {
4560            expr: prom_expr,
4561            start: UNIX_EPOCH,
4562            end: UNIX_EPOCH
4563                .checked_add(Duration::from_secs(100_000))
4564                .unwrap(),
4565            interval: Duration::from_secs(5),
4566            lookback_delta: Duration::from_secs(1),
4567        };
4568
4569        let table_provider = build_test_table_provider_with_tsid(
4570            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4571            1,
4572            1,
4573        )
4574        .await;
4575        let plan =
4576            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4577                .await
4578                .unwrap();
4579
4580        let plan_str = plan.display_indent_schema().to_string();
4581        assert!(plan_str.contains("TableScan: phy"), "{plan}");
4582        assert!(plan_str.contains("SubqueryAlias: some_metric"));
4583        assert!(plan_str.contains("Filter: phy.__table_id = UInt32(1024)"));
4584        assert!(!plan_str.contains("TableScan: some_metric"));
4585    }
4586
4587    #[tokio::test]
4588    async fn sum_without_does_not_group_by_tsid() {
4589        let prom_expr = parser::parse("sum without (tag_0) (some_metric)").unwrap();
4590        let eval_stmt = EvalStmt {
4591            expr: prom_expr,
4592            start: UNIX_EPOCH,
4593            end: UNIX_EPOCH
4594                .checked_add(Duration::from_secs(100_000))
4595                .unwrap(),
4596            interval: Duration::from_secs(5),
4597            lookback_delta: Duration::from_secs(1),
4598        };
4599
4600        let table_provider = build_test_table_provider_with_tsid(
4601            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4602            1,
4603            1,
4604        )
4605        .await;
4606        let plan =
4607            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4608                .await
4609                .unwrap();
4610
4611        let plan_str = plan.display_indent_schema().to_string();
4612        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4613
4614        let aggr_line = plan_str
4615            .lines()
4616            .find(|line| line.contains("Aggregate: groupBy="))
4617            .unwrap();
4618        assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4619    }
4620
4621    #[tokio::test]
4622    async fn topk_without_does_not_partition_by_tsid() {
4623        let prom_expr = parser::parse("topk without (tag_0) (1, some_metric)").unwrap();
4624        let eval_stmt = EvalStmt {
4625            expr: prom_expr,
4626            start: UNIX_EPOCH,
4627            end: UNIX_EPOCH
4628                .checked_add(Duration::from_secs(100_000))
4629                .unwrap(),
4630            interval: Duration::from_secs(5),
4631            lookback_delta: Duration::from_secs(1),
4632        };
4633
4634        let table_provider = build_test_table_provider_with_tsid(
4635            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4636            1,
4637            1,
4638        )
4639        .await;
4640        let plan =
4641            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4642                .await
4643                .unwrap();
4644
4645        let plan_str = plan.display_indent_schema().to_string();
4646        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4647
4648        let window_line = plan_str
4649            .lines()
4650            .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
4651            .unwrap();
4652        let partition_by = window_line
4653            .split("PARTITION BY [")
4654            .nth(1)
4655            .and_then(|s| s.split("] ORDER BY").next())
4656            .unwrap();
4657        assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4658    }
4659
4660    #[tokio::test]
4661    async fn sum_by_does_not_group_by_tsid() {
4662        let prom_expr = parser::parse("sum by (__tsid) (some_metric)").unwrap();
4663        let eval_stmt = EvalStmt {
4664            expr: prom_expr,
4665            start: UNIX_EPOCH,
4666            end: UNIX_EPOCH
4667                .checked_add(Duration::from_secs(100_000))
4668                .unwrap(),
4669            interval: Duration::from_secs(5),
4670            lookback_delta: Duration::from_secs(1),
4671        };
4672
4673        let table_provider = build_test_table_provider_with_tsid(
4674            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4675            1,
4676            1,
4677        )
4678        .await;
4679        let plan =
4680            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4681                .await
4682                .unwrap();
4683
4684        let plan_str = plan.display_indent_schema().to_string();
4685        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4686
4687        let aggr_line = plan_str
4688            .lines()
4689            .find(|line| line.contains("Aggregate: groupBy="))
4690            .unwrap();
4691        assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4692    }
4693
4694    #[tokio::test]
4695    async fn topk_by_does_not_partition_by_tsid() {
4696        let prom_expr = parser::parse("topk by (__tsid) (1, some_metric)").unwrap();
4697        let eval_stmt = EvalStmt {
4698            expr: prom_expr,
4699            start: UNIX_EPOCH,
4700            end: UNIX_EPOCH
4701                .checked_add(Duration::from_secs(100_000))
4702                .unwrap(),
4703            interval: Duration::from_secs(5),
4704            lookback_delta: Duration::from_secs(1),
4705        };
4706
4707        let table_provider = build_test_table_provider_with_tsid(
4708            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4709            1,
4710            1,
4711        )
4712        .await;
4713        let plan =
4714            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4715                .await
4716                .unwrap();
4717
4718        let plan_str = plan.display_indent_schema().to_string();
4719        assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4720
4721        let window_line = plan_str
4722            .lines()
4723            .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
4724            .unwrap();
4725        let partition_by = window_line
4726            .split("PARTITION BY [")
4727            .nth(1)
4728            .and_then(|s| s.split("] ORDER BY").next())
4729            .unwrap();
4730        assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4731    }
4732
4733    #[tokio::test]
4734    async fn selector_matcher_on_tsid_does_not_use_internal_column() {
4735        let prom_expr = parser::parse(r#"some_metric{__tsid="123"}"#).unwrap();
4736        let eval_stmt = EvalStmt {
4737            expr: prom_expr,
4738            start: UNIX_EPOCH,
4739            end: UNIX_EPOCH
4740                .checked_add(Duration::from_secs(100_000))
4741                .unwrap(),
4742            interval: Duration::from_secs(5),
4743            lookback_delta: Duration::from_secs(1),
4744        };
4745
4746        let table_provider = build_test_table_provider_with_tsid(
4747            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4748            1,
4749            1,
4750        )
4751        .await;
4752        let plan =
4753            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4754                .await
4755                .unwrap();
4756
4757        fn collect_filter_cols(plan: &LogicalPlan, out: &mut HashSet<Column>) {
4758            if let LogicalPlan::Filter(filter) = plan {
4759                datafusion_expr::utils::expr_to_columns(&filter.predicate, out).unwrap();
4760            }
4761            for input in plan.inputs() {
4762                collect_filter_cols(input, out);
4763            }
4764        }
4765
4766        let mut filter_cols = HashSet::new();
4767        collect_filter_cols(&plan, &mut filter_cols);
4768        assert!(
4769            !filter_cols
4770                .iter()
4771                .any(|c| c.name == DATA_SCHEMA_TSID_COLUMN_NAME)
4772        );
4773    }
4774
4775    #[tokio::test]
4776    async fn tsid_is_not_used_when_physical_table_is_missing() {
4777        let prom_expr = parser::parse("some_metric").unwrap();
4778        let eval_stmt = EvalStmt {
4779            expr: prom_expr,
4780            start: UNIX_EPOCH,
4781            end: UNIX_EPOCH
4782                .checked_add(Duration::from_secs(100_000))
4783                .unwrap(),
4784            interval: Duration::from_secs(5),
4785            lookback_delta: Duration::from_secs(1),
4786        };
4787
4788        let catalog_list = MemoryCatalogManager::with_default_setup();
4789
4790        // Register a metric engine logical table referencing a missing physical table.
4791        let mut columns = vec![ColumnSchema::new(
4792            "tag_0".to_string(),
4793            ConcreteDataType::string_datatype(),
4794            false,
4795        )];
4796        columns.push(
4797            ColumnSchema::new(
4798                "timestamp".to_string(),
4799                ConcreteDataType::timestamp_millisecond_datatype(),
4800                false,
4801            )
4802            .with_time_index(true),
4803        );
4804        columns.push(ColumnSchema::new(
4805            "field_0".to_string(),
4806            ConcreteDataType::float64_datatype(),
4807            true,
4808        ));
4809        let schema = Arc::new(Schema::new(columns));
4810        let mut options = table::requests::TableOptions::default();
4811        options
4812            .extra_options
4813            .insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "phy".to_string());
4814        let table_meta = TableMetaBuilder::empty()
4815            .schema(schema)
4816            .primary_key_indices(vec![0])
4817            .value_indices(vec![2])
4818            .engine(METRIC_ENGINE_NAME.to_string())
4819            .options(options)
4820            .next_column_id(1024)
4821            .build()
4822            .unwrap();
4823        let table_info = TableInfoBuilder::default()
4824            .table_id(1024)
4825            .name("some_metric")
4826            .meta(table_meta)
4827            .build()
4828            .unwrap();
4829        let table = EmptyTable::from_table_info(&table_info);
4830        catalog_list
4831            .register_table_sync(RegisterTableRequest {
4832                catalog: DEFAULT_CATALOG_NAME.to_string(),
4833                schema: DEFAULT_SCHEMA_NAME.to_string(),
4834                table_name: "some_metric".to_string(),
4835                table_id: 1024,
4836                table,
4837            })
4838            .unwrap();
4839
4840        let table_provider = DfTableSourceProvider::new(
4841            catalog_list,
4842            false,
4843            QueryContext::arc(),
4844            DummyDecoder::arc(),
4845            false,
4846        );
4847
4848        let plan =
4849            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4850                .await
4851                .unwrap();
4852
4853        let plan_str = plan.display_indent_schema().to_string();
4854        assert!(plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
4855        assert!(!plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4856    }
4857
4858    #[tokio::test]
4859    async fn tsid_is_carried_only_when_aggregate_preserves_label_set() {
4860        let prom_expr = parser::parse("sum by (tag_0) (some_metric)").unwrap();
4861        let eval_stmt = EvalStmt {
4862            expr: prom_expr,
4863            start: UNIX_EPOCH,
4864            end: UNIX_EPOCH
4865                .checked_add(Duration::from_secs(100_000))
4866                .unwrap(),
4867            interval: Duration::from_secs(5),
4868            lookback_delta: Duration::from_secs(1),
4869        };
4870
4871        let table_provider = build_test_table_provider_with_tsid(
4872            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4873            1,
4874            1,
4875        )
4876        .await;
4877        let plan =
4878            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4879                .await
4880                .unwrap();
4881
4882        let plan_str = plan.display_indent_schema().to_string();
4883        assert!(plan_str.contains("first_value") && plan_str.contains("__tsid"));
4884        assert!(
4885            !plan
4886                .schema()
4887                .fields()
4888                .iter()
4889                .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4890        );
4891
4892        // Merging aggregate: label set is reduced, tsid should not be carried.
4893        let prom_expr = parser::parse("sum(some_metric)").unwrap();
4894        let eval_stmt = EvalStmt {
4895            expr: prom_expr,
4896            start: UNIX_EPOCH,
4897            end: UNIX_EPOCH
4898                .checked_add(Duration::from_secs(100_000))
4899                .unwrap(),
4900            interval: Duration::from_secs(5),
4901            lookback_delta: Duration::from_secs(1),
4902        };
4903        let table_provider = build_test_table_provider_with_tsid(
4904            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4905            1,
4906            1,
4907        )
4908        .await;
4909        let plan =
4910            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4911                .await
4912                .unwrap();
4913        let plan_str = plan.display_indent_schema().to_string();
4914        assert!(!plan_str.contains("first_value"));
4915    }
4916
4917    #[tokio::test]
4918    async fn or_operator_with_unknown_metric_does_not_require_tsid() {
4919        let prom_expr = parser::parse("unknown_metric or some_metric").unwrap();
4920        let eval_stmt = EvalStmt {
4921            expr: prom_expr,
4922            start: UNIX_EPOCH,
4923            end: UNIX_EPOCH
4924                .checked_add(Duration::from_secs(100_000))
4925                .unwrap(),
4926            interval: Duration::from_secs(5),
4927            lookback_delta: Duration::from_secs(1),
4928        };
4929
4930        let table_provider = build_test_table_provider_with_tsid(
4931            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4932            1,
4933            1,
4934        )
4935        .await;
4936
4937        let plan =
4938            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4939                .await
4940                .unwrap();
4941
4942        assert!(
4943            !plan
4944                .schema()
4945                .fields()
4946                .iter()
4947                .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4948        );
4949    }
4950
4951    #[tokio::test]
4952    async fn aggregate_avg() {
4953        do_aggregate_expr_plan("avg", "avg").await;
4954    }
4955
4956    #[tokio::test]
4957    #[should_panic] // output type doesn't match
4958    async fn aggregate_count() {
4959        do_aggregate_expr_plan("count", "count").await;
4960    }
4961
4962    #[tokio::test]
4963    async fn aggregate_min() {
4964        do_aggregate_expr_plan("min", "min").await;
4965    }
4966
4967    #[tokio::test]
4968    async fn aggregate_max() {
4969        do_aggregate_expr_plan("max", "max").await;
4970    }
4971
4972    #[tokio::test]
4973    #[should_panic] // output type doesn't match
4974    async fn aggregate_group() {
4975        do_aggregate_expr_plan("grouping", "GROUPING").await;
4976    }
4977
4978    #[tokio::test]
4979    async fn aggregate_stddev() {
4980        do_aggregate_expr_plan("stddev", "stddev_pop").await;
4981    }
4982
4983    #[tokio::test]
4984    async fn aggregate_stdvar() {
4985        do_aggregate_expr_plan("stdvar", "var_pop").await;
4986    }
4987
4988    // TODO(ruihang): add range fn tests once exprs are ready.
4989
4990    // {
4991    //     input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}",
4992    //     expected: &BinaryExpr{
4993    //         Op: ADD,
4994    //         LHS: &VectorSelector{
4995    //             Name: "a",
4996    //             LabelMatchers: []*labels.Matcher{
4997    //                     MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"),
4998    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
4999    //             },
5000    //         },
5001    //         RHS: &VectorSelector{
5002    //             Name: "sum",
5003    //             LabelMatchers: []*labels.Matcher{
5004    //                     MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"),
5005    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
5006    //             },
5007    //         },
5008    //         VectorMatching: &VectorMatching{},
5009    //     },
5010    // },
5011    #[tokio::test]
5012    async fn binary_op_column_column() {
5013        let prom_expr =
5014            parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
5015        let eval_stmt = EvalStmt {
5016            expr: prom_expr,
5017            start: UNIX_EPOCH,
5018            end: UNIX_EPOCH
5019                .checked_add(Duration::from_secs(100_000))
5020                .unwrap(),
5021            interval: Duration::from_secs(5),
5022            lookback_delta: Duration::from_secs(1),
5023        };
5024
5025        let table_provider = build_test_table_provider(
5026            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5027            1,
5028            1,
5029        )
5030        .await;
5031        let plan =
5032            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5033                .await
5034                .unwrap();
5035
5036        let expected = String::from(
5037            "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
5038            \n  Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5039            \n    SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5040            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5041            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5042            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5043            \n            Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5044            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5045            \n    SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5046            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5047            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5048            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5049            \n            Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5050            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5051        );
5052
5053        assert_eq!(plan.display_indent_schema().to_string(), expected);
5054    }
5055
5056    async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
5057        let prom_expr = parser::parse(query).unwrap();
5058        let eval_stmt = EvalStmt {
5059            expr: prom_expr,
5060            start: UNIX_EPOCH,
5061            end: UNIX_EPOCH
5062                .checked_add(Duration::from_secs(100_000))
5063                .unwrap(),
5064            interval: Duration::from_secs(5),
5065            lookback_delta: Duration::from_secs(1),
5066        };
5067
5068        let table_provider = build_test_table_provider(
5069            &[
5070                (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5071                (
5072                    "greptime_private".to_string(),
5073                    "some_alt_metric".to_string(),
5074                ),
5075            ],
5076            1,
5077            1,
5078        )
5079        .await;
5080        let plan =
5081            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5082                .await
5083                .unwrap();
5084
5085        assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
5086    }
5087
5088    #[tokio::test]
5089    async fn binary_op_literal_column() {
5090        let query = r#"1 + some_metric{tag_0="bar"}"#;
5091        let expected = String::from(
5092            "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
5093            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5094            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5095            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5096            \n        Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5097            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5098        );
5099
5100        indie_query_plan_compare(query, expected).await;
5101    }
5102
5103    #[tokio::test]
5104    async fn binary_op_literal_literal() {
5105        let query = r#"1 + 1"#;
5106        let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
5107  TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
5108        indie_query_plan_compare(query, expected).await;
5109    }
5110
5111    #[tokio::test]
5112    async fn simple_bool_grammar() {
5113        let query = "some_metric != bool 1.2345";
5114        let expected = String::from(
5115            "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
5116            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5117            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5118            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5119            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5120            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5121        );
5122
5123        indie_query_plan_compare(query, expected).await;
5124    }
5125
5126    #[tokio::test]
5127    async fn bool_with_additional_arithmetic() {
5128        let query = "some_metric + (1 == bool 2)";
5129        let expected = String::from(
5130            "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
5131            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5132            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5133            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5134            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5135            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5136        );
5137
5138        indie_query_plan_compare(query, expected).await;
5139    }
5140
5141    #[tokio::test]
5142    async fn simple_unary() {
5143        let query = "-some_metric";
5144        let expected = String::from(
5145            "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
5146            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5147            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5148            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5149            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5150            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5151        );
5152
5153        indie_query_plan_compare(query, expected).await;
5154    }
5155
5156    #[tokio::test]
5157    async fn increase_aggr() {
5158        let query = "increase(some_metric[5m])";
5159        let expected = String::from(
5160            "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5161            \n  Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5162            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
5163            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5164            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5165            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5166            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5167            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5168        );
5169
5170        indie_query_plan_compare(query, expected).await;
5171    }
5172
5173    #[tokio::test]
5174    async fn less_filter_on_value() {
5175        let query = "some_metric < 1.2345";
5176        let expected = String::from(
5177            "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5178            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5179            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5180            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5181            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5182            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5183        );
5184
5185        indie_query_plan_compare(query, expected).await;
5186    }
5187
5188    #[tokio::test]
5189    async fn count_over_time() {
5190        let query = "count_over_time(some_metric[5m])";
5191        let expected = String::from(
5192            "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5193            \n  Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5194            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
5195            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5196            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5197            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5198            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5199            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5200        );
5201
5202        indie_query_plan_compare(query, expected).await;
5203    }
5204
5205    #[tokio::test]
5206    async fn test_hash_join() {
5207        let mut eval_stmt = EvalStmt {
5208            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5209            start: UNIX_EPOCH,
5210            end: UNIX_EPOCH
5211                .checked_add(Duration::from_secs(100_000))
5212                .unwrap(),
5213            interval: Duration::from_secs(5),
5214            lookback_delta: Duration::from_secs(1),
5215        };
5216
5217        let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
5218
5219        let prom_expr = parser::parse(case).unwrap();
5220        eval_stmt.expr = prom_expr;
5221        let table_provider = build_test_table_provider_with_fields(
5222            &[
5223                (
5224                    DEFAULT_SCHEMA_NAME.to_string(),
5225                    "http_server_requests_seconds_sum".to_string(),
5226                ),
5227                (
5228                    DEFAULT_SCHEMA_NAME.to_string(),
5229                    "http_server_requests_seconds_count".to_string(),
5230                ),
5231            ],
5232            &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
5233        )
5234        .await;
5235        // Should be ok
5236        let plan =
5237            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5238                .await
5239                .unwrap();
5240        let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
5241            \n  Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
5242            \n    SubqueryAlias: http_server_requests_seconds_sum\
5243            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5244            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5245            \n          Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
5246            \n            Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
5247            \n              TableScan: http_server_requests_seconds_sum\
5248            \n    SubqueryAlias: http_server_requests_seconds_count\
5249            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5250            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5251            \n          Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
5252            \n            Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
5253            \n              TableScan: http_server_requests_seconds_count";
5254        assert_eq!(plan.to_string(), expected);
5255    }
5256
5257    #[tokio::test]
5258    async fn test_nested_histogram_quantile() {
5259        let mut eval_stmt = EvalStmt {
5260            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5261            start: UNIX_EPOCH,
5262            end: UNIX_EPOCH
5263                .checked_add(Duration::from_secs(100_000))
5264                .unwrap(),
5265            interval: Duration::from_secs(5),
5266            lookback_delta: Duration::from_secs(1),
5267        };
5268
5269        let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
5270
5271        let prom_expr = parser::parse(case).unwrap();
5272        eval_stmt.expr = prom_expr;
5273        let table_provider = build_test_table_provider_with_fields(
5274            &[(
5275                DEFAULT_SCHEMA_NAME.to_string(),
5276                "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
5277            )],
5278            &["pod", "le", "path", "code", "container"],
5279        )
5280        .await;
5281        // Should be ok
5282        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5283            .await
5284            .unwrap();
5285    }
5286
5287    #[tokio::test]
5288    async fn test_parse_and_operator() {
5289        let mut eval_stmt = EvalStmt {
5290            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5291            start: UNIX_EPOCH,
5292            end: UNIX_EPOCH
5293                .checked_add(Duration::from_secs(100_000))
5294                .unwrap(),
5295            interval: Duration::from_secs(5),
5296            lookback_delta: Duration::from_secs(1),
5297        };
5298
5299        let cases = [
5300            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5301            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5302        ];
5303
5304        for case in cases {
5305            let prom_expr = parser::parse(case).unwrap();
5306            eval_stmt.expr = prom_expr;
5307            let table_provider = build_test_table_provider_with_fields(
5308                &[
5309                    (
5310                        DEFAULT_SCHEMA_NAME.to_string(),
5311                        "kubelet_volume_stats_used_bytes".to_string(),
5312                    ),
5313                    (
5314                        DEFAULT_SCHEMA_NAME.to_string(),
5315                        "kubelet_volume_stats_capacity_bytes".to_string(),
5316                    ),
5317                ],
5318                &["namespace", "persistentvolumeclaim"],
5319            )
5320            .await;
5321            // Should be ok
5322            let _ =
5323                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5324                    .await
5325                    .unwrap();
5326        }
5327    }
5328
5329    #[tokio::test]
5330    async fn test_nested_binary_op() {
5331        let mut eval_stmt = EvalStmt {
5332            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5333            start: UNIX_EPOCH,
5334            end: UNIX_EPOCH
5335                .checked_add(Duration::from_secs(100_000))
5336                .unwrap(),
5337            interval: Duration::from_secs(5),
5338            lookback_delta: Duration::from_secs(1),
5339        };
5340
5341        let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
5342        (
5343            sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
5344            or
5345            vector(0)
5346        )"#;
5347
5348        let prom_expr = parser::parse(case).unwrap();
5349        eval_stmt.expr = prom_expr;
5350        let table_provider = build_test_table_provider_with_fields(
5351            &[(
5352                DEFAULT_SCHEMA_NAME.to_string(),
5353                "nginx_ingress_controller_requests".to_string(),
5354            )],
5355            &["namespace", "job"],
5356        )
5357        .await;
5358        // Should be ok
5359        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5360            .await
5361            .unwrap();
5362    }
5363
5364    #[tokio::test]
5365    async fn test_parse_or_operator() {
5366        let mut eval_stmt = EvalStmt {
5367            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5368            start: UNIX_EPOCH,
5369            end: UNIX_EPOCH
5370                .checked_add(Duration::from_secs(100_000))
5371                .unwrap(),
5372            interval: Duration::from_secs(5),
5373            lookback_delta: Duration::from_secs(1),
5374        };
5375
5376        let case = r#"
5377        sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
5378        (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
5379            or
5380        200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
5381        sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
5382
5383        let table_provider = build_test_table_provider_with_fields(
5384            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5385            &["tenant_name", "cluster_name"],
5386        )
5387        .await;
5388        eval_stmt.expr = parser::parse(case).unwrap();
5389        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5390            .await
5391            .unwrap();
5392
5393        let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5394            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
5395            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5396            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5397            or
5398            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5399            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5400            or
5401            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5402            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
5403        let table_provider = build_test_table_provider_with_fields(
5404            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5405            &["tenant_name", "cluster_name"],
5406        )
5407        .await;
5408        eval_stmt.expr = parser::parse(case).unwrap();
5409        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5410            .await
5411            .unwrap();
5412
5413        let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
5414            sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5415            (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5416            (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
5417        let table_provider = build_test_table_provider_with_fields(
5418            &[
5419                (
5420                    DEFAULT_SCHEMA_NAME.to_string(),
5421                    "background_waitevent_cnt".to_string(),
5422                ),
5423                (
5424                    DEFAULT_SCHEMA_NAME.to_string(),
5425                    "foreground_waitevent_cnt".to_string(),
5426                ),
5427            ],
5428            &["tenant_name", "cluster_name"],
5429        )
5430        .await;
5431        eval_stmt.expr = parser::parse(case).unwrap();
5432        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5433            .await
5434            .unwrap();
5435
5436        let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
5437        let table_provider = build_test_table_provider_with_fields(
5438            &[
5439                (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
5440                (
5441                    DEFAULT_SCHEMA_NAME.to_string(),
5442                    "container_cpu_load_average_10s".to_string(),
5443                ),
5444                (
5445                    DEFAULT_SCHEMA_NAME.to_string(),
5446                    "container_spec_cpu_quota".to_string(),
5447                ),
5448            ],
5449            &["cluster_name", "host_name"],
5450        )
5451        .await;
5452        eval_stmt.expr = parser::parse(case).unwrap();
5453        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5454            .await
5455            .unwrap();
5456    }
5457
5458    #[tokio::test]
5459    async fn value_matcher() {
5460        // template
5461        let mut eval_stmt = EvalStmt {
5462            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5463            start: UNIX_EPOCH,
5464            end: UNIX_EPOCH
5465                .checked_add(Duration::from_secs(100_000))
5466                .unwrap(),
5467            interval: Duration::from_secs(5),
5468            lookback_delta: Duration::from_secs(1),
5469        };
5470
5471        let cases = [
5472            // single equal matcher
5473            (
5474                r#"some_metric{__field__="field_1"}"#,
5475                vec![
5476                    "some_metric.field_1",
5477                    "some_metric.tag_0",
5478                    "some_metric.tag_1",
5479                    "some_metric.tag_2",
5480                    "some_metric.timestamp",
5481                ],
5482            ),
5483            // two equal matchers
5484            (
5485                r#"some_metric{__field__="field_1", __field__="field_0"}"#,
5486                vec![
5487                    "some_metric.field_0",
5488                    "some_metric.field_1",
5489                    "some_metric.tag_0",
5490                    "some_metric.tag_1",
5491                    "some_metric.tag_2",
5492                    "some_metric.timestamp",
5493                ],
5494            ),
5495            // single not_eq matcher
5496            (
5497                r#"some_metric{__field__!="field_1"}"#,
5498                vec![
5499                    "some_metric.field_0",
5500                    "some_metric.field_2",
5501                    "some_metric.tag_0",
5502                    "some_metric.tag_1",
5503                    "some_metric.tag_2",
5504                    "some_metric.timestamp",
5505                ],
5506            ),
5507            // two not_eq matchers
5508            (
5509                r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
5510                vec![
5511                    "some_metric.field_0",
5512                    "some_metric.tag_0",
5513                    "some_metric.tag_1",
5514                    "some_metric.tag_2",
5515                    "some_metric.timestamp",
5516                ],
5517            ),
5518            // equal and not_eq matchers (no conflict)
5519            (
5520                r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
5521                vec![
5522                    "some_metric.field_1",
5523                    "some_metric.tag_0",
5524                    "some_metric.tag_1",
5525                    "some_metric.tag_2",
5526                    "some_metric.timestamp",
5527                ],
5528            ),
5529            // equal and not_eq matchers (conflict)
5530            (
5531                r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
5532                vec![
5533                    "some_metric.tag_0",
5534                    "some_metric.tag_1",
5535                    "some_metric.tag_2",
5536                    "some_metric.timestamp",
5537                ],
5538            ),
5539            // single regex eq matcher
5540            (
5541                r#"some_metric{__field__=~"field_1|field_2"}"#,
5542                vec![
5543                    "some_metric.field_1",
5544                    "some_metric.field_2",
5545                    "some_metric.tag_0",
5546                    "some_metric.tag_1",
5547                    "some_metric.tag_2",
5548                    "some_metric.timestamp",
5549                ],
5550            ),
5551            // single regex not_eq matcher
5552            (
5553                r#"some_metric{__field__!~"field_1|field_2"}"#,
5554                vec![
5555                    "some_metric.field_0",
5556                    "some_metric.tag_0",
5557                    "some_metric.tag_1",
5558                    "some_metric.tag_2",
5559                    "some_metric.timestamp",
5560                ],
5561            ),
5562        ];
5563
5564        for case in cases {
5565            let prom_expr = parser::parse(case.0).unwrap();
5566            eval_stmt.expr = prom_expr;
5567            let table_provider = build_test_table_provider(
5568                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5569                3,
5570                3,
5571            )
5572            .await;
5573            let plan =
5574                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5575                    .await
5576                    .unwrap();
5577            let mut fields = plan.schema().field_names();
5578            let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
5579            fields.sort();
5580            expected.sort();
5581            assert_eq!(fields, expected, "case: {:?}", case.0);
5582        }
5583
5584        let bad_cases = [
5585            r#"some_metric{__field__="nonexistent"}"#,
5586            r#"some_metric{__field__!="nonexistent"}"#,
5587        ];
5588
5589        for case in bad_cases {
5590            let prom_expr = parser::parse(case).unwrap();
5591            eval_stmt.expr = prom_expr;
5592            let table_provider = build_test_table_provider(
5593                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5594                3,
5595                3,
5596            )
5597            .await;
5598            let plan =
5599                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5600                    .await;
5601            assert!(plan.is_err(), "case: {:?}", case);
5602        }
5603    }
5604
5605    #[tokio::test]
5606    async fn custom_schema() {
5607        let query = "some_alt_metric{__schema__=\"greptime_private\"}";
5608        let expected = String::from(
5609            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5610            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5611            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5612            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5613            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5614        );
5615
5616        indie_query_plan_compare(query, expected).await;
5617
5618        let query = "some_alt_metric{__database__=\"greptime_private\"}";
5619        let expected = String::from(
5620            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5621            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5622            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5623            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5624            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5625        );
5626
5627        indie_query_plan_compare(query, expected).await;
5628
5629        let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
5630        let expected = String::from(
5631            "Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
5632        \n  Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5633        \n    SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5634        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5635        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5636        \n          Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5637        \n            Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5638        \n              TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5639        \n    SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5640        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5641        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5642        \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5643        \n            Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5644        \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5645        );
5646
5647        indie_query_plan_compare(query, expected).await;
5648    }
5649
5650    #[tokio::test]
5651    async fn only_equals_is_supported_for_special_matcher() {
5652        let queries = &[
5653            "some_alt_metric{__schema__!=\"greptime_private\"}",
5654            "some_alt_metric{__schema__=~\"lalala\"}",
5655            "some_alt_metric{__database__!=\"greptime_private\"}",
5656            "some_alt_metric{__database__=~\"lalala\"}",
5657        ];
5658
5659        for query in queries {
5660            let prom_expr = parser::parse(query).unwrap();
5661            let eval_stmt = EvalStmt {
5662                expr: prom_expr,
5663                start: UNIX_EPOCH,
5664                end: UNIX_EPOCH
5665                    .checked_add(Duration::from_secs(100_000))
5666                    .unwrap(),
5667                interval: Duration::from_secs(5),
5668                lookback_delta: Duration::from_secs(1),
5669            };
5670
5671            let table_provider = build_test_table_provider(
5672                &[
5673                    (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5674                    (
5675                        "greptime_private".to_string(),
5676                        "some_alt_metric".to_string(),
5677                    ),
5678                ],
5679                1,
5680                1,
5681            )
5682            .await;
5683
5684            let plan =
5685                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5686                    .await;
5687            assert!(plan.is_err(), "query: {:?}", query);
5688        }
5689    }
5690
5691    #[tokio::test]
5692    async fn test_non_ms_precision() {
5693        let catalog_list = MemoryCatalogManager::with_default_setup();
5694        let columns = vec![
5695            ColumnSchema::new(
5696                "tag".to_string(),
5697                ConcreteDataType::string_datatype(),
5698                false,
5699            ),
5700            ColumnSchema::new(
5701                "timestamp".to_string(),
5702                ConcreteDataType::timestamp_nanosecond_datatype(),
5703                false,
5704            )
5705            .with_time_index(true),
5706            ColumnSchema::new(
5707                "field".to_string(),
5708                ConcreteDataType::float64_datatype(),
5709                true,
5710            ),
5711        ];
5712        let schema = Arc::new(Schema::new(columns));
5713        let table_meta = TableMetaBuilder::empty()
5714            .schema(schema)
5715            .primary_key_indices(vec![0])
5716            .value_indices(vec![2])
5717            .next_column_id(1024)
5718            .build()
5719            .unwrap();
5720        let table_info = TableInfoBuilder::default()
5721            .name("metrics".to_string())
5722            .meta(table_meta)
5723            .build()
5724            .unwrap();
5725        let table = EmptyTable::from_table_info(&table_info);
5726        assert!(
5727            catalog_list
5728                .register_table_sync(RegisterTableRequest {
5729                    catalog: DEFAULT_CATALOG_NAME.to_string(),
5730                    schema: DEFAULT_SCHEMA_NAME.to_string(),
5731                    table_name: "metrics".to_string(),
5732                    table_id: 1024,
5733                    table,
5734                })
5735                .is_ok()
5736        );
5737
5738        let plan = PromPlanner::stmt_to_plan(
5739            DfTableSourceProvider::new(
5740                catalog_list.clone(),
5741                false,
5742                QueryContext::arc(),
5743                DummyDecoder::arc(),
5744                true,
5745            ),
5746            &EvalStmt {
5747                expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
5748                start: UNIX_EPOCH,
5749                end: UNIX_EPOCH
5750                    .checked_add(Duration::from_secs(100_000))
5751                    .unwrap(),
5752                interval: Duration::from_secs(5),
5753                lookback_delta: Duration::from_secs(1),
5754            },
5755            &build_query_engine_state(),
5756        )
5757        .await
5758        .unwrap();
5759        assert_eq!(
5760            plan.display_indent_schema().to_string(),
5761            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5762            \n  PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5763            \n    Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5764            \n      Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5765            \n        Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5766            \n          TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
5767        );
5768        let plan = PromPlanner::stmt_to_plan(
5769            DfTableSourceProvider::new(
5770                catalog_list.clone(),
5771                false,
5772                QueryContext::arc(),
5773                DummyDecoder::arc(),
5774                true,
5775            ),
5776            &EvalStmt {
5777                expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
5778                start: UNIX_EPOCH,
5779                end: UNIX_EPOCH
5780                    .checked_add(Duration::from_secs(100_000))
5781                    .unwrap(),
5782                interval: Duration::from_secs(5),
5783                lookback_delta: Duration::from_secs(1),
5784            },
5785            &build_query_engine_state(),
5786        )
5787        .await
5788        .unwrap();
5789        assert_eq!(
5790            plan.display_indent_schema().to_string(),
5791            "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
5792            \n  Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
5793            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
5794            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5795            \n        PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5796            \n          Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5797            \n            Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5798            \n              Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5799            \n                TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
5800        );
5801    }
5802
5803    #[tokio::test]
5804    async fn test_nonexistent_label() {
5805        // template
5806        let mut eval_stmt = EvalStmt {
5807            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5808            start: UNIX_EPOCH,
5809            end: UNIX_EPOCH
5810                .checked_add(Duration::from_secs(100_000))
5811                .unwrap(),
5812            interval: Duration::from_secs(5),
5813            lookback_delta: Duration::from_secs(1),
5814        };
5815
5816        let case = r#"some_metric{nonexistent="hi"}"#;
5817        let prom_expr = parser::parse(case).unwrap();
5818        eval_stmt.expr = prom_expr;
5819        let table_provider = build_test_table_provider(
5820            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5821            3,
5822            3,
5823        )
5824        .await;
5825        // Should be ok
5826        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5827            .await
5828            .unwrap();
5829    }
5830
5831    #[tokio::test]
5832    async fn test_label_join() {
5833        let prom_expr = parser::parse(
5834            "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
5835        )
5836        .unwrap();
5837        let eval_stmt = EvalStmt {
5838            expr: prom_expr,
5839            start: UNIX_EPOCH,
5840            end: UNIX_EPOCH
5841                .checked_add(Duration::from_secs(100_000))
5842                .unwrap(),
5843            interval: Duration::from_secs(5),
5844            lookback_delta: Duration::from_secs(1),
5845        };
5846
5847        let table_provider =
5848            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
5849                .await;
5850        let plan =
5851            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5852                .await
5853                .unwrap();
5854
5855        let expected = r#"
5856Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
5857  Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
5858    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5859      PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5860        Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5861          Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5862            TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
5863
5864        let ret = plan.display_indent_schema().to_string();
5865        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
5866    }
5867
5868    #[tokio::test]
5869    async fn test_label_replace() {
5870        let prom_expr = parser::parse(
5871            "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
5872        )
5873        .unwrap();
5874        let eval_stmt = EvalStmt {
5875            expr: prom_expr,
5876            start: UNIX_EPOCH,
5877            end: UNIX_EPOCH
5878                .checked_add(Duration::from_secs(100_000))
5879                .unwrap(),
5880            interval: Duration::from_secs(5),
5881            lookback_delta: Duration::from_secs(1),
5882        };
5883
5884        let table_provider =
5885            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
5886                .await;
5887        let plan =
5888            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5889                .await
5890                .unwrap();
5891
5892        let expected = r#"
5893Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
5894  Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
5895    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5896      PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5897        Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5898          Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5899            TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
5900
5901        let ret = plan.display_indent_schema().to_string();
5902        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
5903    }
5904
5905    #[tokio::test]
5906    async fn test_matchers_to_expr() {
5907        let mut eval_stmt = EvalStmt {
5908            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5909            start: UNIX_EPOCH,
5910            end: UNIX_EPOCH
5911                .checked_add(Duration::from_secs(100_000))
5912                .unwrap(),
5913            interval: Duration::from_secs(5),
5914            lookback_delta: Duration::from_secs(1),
5915        };
5916        let case =
5917            r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
5918
5919        let prom_expr = parser::parse(case).unwrap();
5920        eval_stmt.expr = prom_expr;
5921        let table_provider = build_test_table_provider(
5922            &[(
5923                DEFAULT_SCHEMA_NAME.to_string(),
5924                "prometheus_tsdb_head_series".to_string(),
5925            )],
5926            3,
5927            3,
5928        )
5929        .await;
5930        let plan =
5931            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5932                .await
5933                .unwrap();
5934        let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
5935        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
5936        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
5937        \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
5938        \n        Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
5939        \n          Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
5940        \n            TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
5941        assert_eq!(plan.display_indent_schema().to_string(), expected);
5942    }
5943
5944    #[tokio::test]
5945    async fn test_topk_expr() {
5946        let mut eval_stmt = EvalStmt {
5947            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5948            start: UNIX_EPOCH,
5949            end: UNIX_EPOCH
5950                .checked_add(Duration::from_secs(100_000))
5951                .unwrap(),
5952            interval: Duration::from_secs(5),
5953            lookback_delta: Duration::from_secs(1),
5954        };
5955        let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
5956
5957        let prom_expr = parser::parse(case).unwrap();
5958        eval_stmt.expr = prom_expr;
5959        let table_provider = build_test_table_provider_with_fields(
5960            &[
5961                (
5962                    DEFAULT_SCHEMA_NAME.to_string(),
5963                    "prometheus_tsdb_head_series".to_string(),
5964                ),
5965                (
5966                    DEFAULT_SCHEMA_NAME.to_string(),
5967                    "http_server_requests_seconds_count".to_string(),
5968                ),
5969            ],
5970            &["ip"],
5971        )
5972        .await;
5973
5974        let plan =
5975            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5976                .await
5977                .unwrap();
5978        let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
5979        \n  Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
5980        \n    Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
5981        \n      WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
5982        \n        Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
5983        \n          Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
5984        \n            PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5985        \n              PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5986        \n                Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5987        \n                  Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5988        \n                    TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
5989
5990        assert_eq!(plan.display_indent_schema().to_string(), expected);
5991    }
5992
5993    #[tokio::test]
5994    async fn test_count_values_expr() {
5995        let mut eval_stmt = EvalStmt {
5996            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5997            start: UNIX_EPOCH,
5998            end: UNIX_EPOCH
5999                .checked_add(Duration::from_secs(100_000))
6000                .unwrap(),
6001            interval: Duration::from_secs(5),
6002            lookback_delta: Duration::from_secs(1),
6003        };
6004        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6005
6006        let prom_expr = parser::parse(case).unwrap();
6007        eval_stmt.expr = prom_expr;
6008        let table_provider = build_test_table_provider_with_fields(
6009            &[
6010                (
6011                    DEFAULT_SCHEMA_NAME.to_string(),
6012                    "prometheus_tsdb_head_series".to_string(),
6013                ),
6014                (
6015                    DEFAULT_SCHEMA_NAME.to_string(),
6016                    "http_server_requests_seconds_count".to_string(),
6017                ),
6018            ],
6019            &["ip"],
6020        )
6021        .await;
6022
6023        let plan =
6024            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6025                .await
6026                .unwrap();
6027        let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
6028        \n  Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
6029        \n    Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
6030        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
6031        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6032        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6033        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6034        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6035        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
6036
6037        assert_eq!(plan.display_indent_schema().to_string(), expected);
6038    }
6039
6040    #[tokio::test]
6041    async fn test_value_alias() {
6042        let mut eval_stmt = EvalStmt {
6043            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6044            start: UNIX_EPOCH,
6045            end: UNIX_EPOCH
6046                .checked_add(Duration::from_secs(100_000))
6047                .unwrap(),
6048            interval: Duration::from_secs(5),
6049            lookback_delta: Duration::from_secs(1),
6050        };
6051        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6052
6053        let prom_expr = parser::parse(case).unwrap();
6054        eval_stmt.expr = prom_expr;
6055        let table_provider = build_test_table_provider_with_fields(
6056            &[
6057                (
6058                    DEFAULT_SCHEMA_NAME.to_string(),
6059                    "prometheus_tsdb_head_series".to_string(),
6060                ),
6061                (
6062                    DEFAULT_SCHEMA_NAME.to_string(),
6063                    "http_server_requests_seconds_count".to_string(),
6064                ),
6065            ],
6066            &["ip"],
6067        )
6068        .await;
6069
6070        let alias = Some("my_series".to_string());
6071        let plan = PromPlanner::stmt_to_plan_with_alias(
6072            table_provider,
6073            &eval_stmt,
6074            alias,
6075            &build_query_engine_state(),
6076        )
6077        .await
6078        .unwrap();
6079        let expected = r#"
6080Projection: count(prometheus_tsdb_head_series.greptime_value) AS my_series, prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [my_series:Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]
6081  Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]
6082    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
6083      Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
6084        Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
6085          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6086            PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6087              Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6088                Filter: prometheus_tsdb_head_series.ip ~ Utf8("^(?:(10.0.160.237:8080|10.0.160.237:9090))$") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6089                  TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
6090        assert_eq!(format!("\n{}", plan.display_indent_schema()), expected);
6091    }
6092
6093    #[tokio::test]
6094    async fn test_quantile_expr() {
6095        let mut eval_stmt = EvalStmt {
6096            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6097            start: UNIX_EPOCH,
6098            end: UNIX_EPOCH
6099                .checked_add(Duration::from_secs(100_000))
6100                .unwrap(),
6101            interval: Duration::from_secs(5),
6102            lookback_delta: Duration::from_secs(1),
6103        };
6104        let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6105
6106        let prom_expr = parser::parse(case).unwrap();
6107        eval_stmt.expr = prom_expr;
6108        let table_provider = build_test_table_provider_with_fields(
6109            &[
6110                (
6111                    DEFAULT_SCHEMA_NAME.to_string(),
6112                    "prometheus_tsdb_head_series".to_string(),
6113                ),
6114                (
6115                    DEFAULT_SCHEMA_NAME.to_string(),
6116                    "http_server_requests_seconds_count".to_string(),
6117                ),
6118            ],
6119            &["ip"],
6120        )
6121        .await;
6122
6123        let plan =
6124            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6125                .await
6126                .unwrap();
6127        let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6128        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6129        \n    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6130        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6131        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6132        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6133        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6134        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6135        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
6136
6137        assert_eq!(plan.display_indent_schema().to_string(), expected);
6138    }
6139
6140    #[tokio::test]
6141    async fn test_or_not_exists_table_label() {
6142        let mut eval_stmt = EvalStmt {
6143            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6144            start: UNIX_EPOCH,
6145            end: UNIX_EPOCH
6146                .checked_add(Duration::from_secs(100_000))
6147                .unwrap(),
6148            interval: Duration::from_secs(5),
6149            lookback_delta: Duration::from_secs(1),
6150        };
6151        let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
6152
6153        let prom_expr = parser::parse(case).unwrap();
6154        eval_stmt.expr = prom_expr;
6155        let table_provider = build_test_table_provider_with_fields(
6156            &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
6157            &["job"],
6158        )
6159        .await;
6160
6161        let plan =
6162            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6163                .await
6164                .unwrap();
6165        let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6166  SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6167    Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6168      Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
6169        Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
6170          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6171            PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6172              Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6173                Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6174                  TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6175  SubqueryAlias:  [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
6176    Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
6177      Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
6178        Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
6179          EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
6180            TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
6181
6182        assert_eq!(plan.display_indent_schema().to_string(), expected);
6183    }
6184
6185    #[tokio::test]
6186    async fn test_histogram_quantile_missing_le_column() {
6187        let mut eval_stmt = EvalStmt {
6188            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6189            start: UNIX_EPOCH,
6190            end: UNIX_EPOCH
6191                .checked_add(Duration::from_secs(100_000))
6192                .unwrap(),
6193            interval: Duration::from_secs(5),
6194            lookback_delta: Duration::from_secs(1),
6195        };
6196
6197        // Test case: histogram_quantile with a table that doesn't have 'le' column
6198        let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
6199
6200        let prom_expr = parser::parse(case).unwrap();
6201        eval_stmt.expr = prom_expr;
6202
6203        // Create a table provider with a table that doesn't have 'le' column
6204        let table_provider = build_test_table_provider_with_fields(
6205            &[(
6206                DEFAULT_SCHEMA_NAME.to_string(),
6207                "non_existent_histogram_bucket".to_string(),
6208            )],
6209            &["pod", "instance"], // Note: no 'le' column
6210        )
6211        .await;
6212
6213        // Should return empty result instead of error
6214        let result =
6215            PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6216                .await;
6217
6218        // This should succeed now (returning empty result) instead of failing with "Cannot find column le"
6219        assert!(
6220            result.is_ok(),
6221            "Expected successful plan creation with empty result, but got error: {:?}",
6222            result.err()
6223        );
6224
6225        // Verify that the result is an EmptyRelation
6226        let plan = result.unwrap();
6227        match plan {
6228            LogicalPlan::EmptyRelation(_) => {
6229                // This is what we expect
6230            }
6231            _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
6232        }
6233    }
6234}