query/promql/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_query::prelude::GREPTIME_VALUE;
25use datafusion::common::DFSchemaRef;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::execution::context::SessionState;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::grouping::grouping_udaf;
31use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
32use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
33use datafusion::functions_aggregate::sum::sum_udaf;
34use datafusion::functions_aggregate::variance::var_pop_udaf;
35use datafusion::functions_window::row_number::RowNumber;
36use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
37use datafusion::logical_expr::expr_rewriter::normalize_cols;
38use datafusion::logical_expr::{
39    BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
40    ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
41};
42use datafusion::prelude as df_prelude;
43use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
44use datafusion::scalar::ScalarValue;
45use datafusion::sql::TableReference;
46use datafusion_common::DFSchema;
47use datafusion_expr::utils::conjunction;
48use datafusion_expr::{col, lit, ExprSchemable, SortExpr};
49use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
50use datatypes::data_type::ConcreteDataType;
51use itertools::Itertools;
52use promql::extension_plan::{
53    build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
54    RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
55};
56use promql::functions::{
57    quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters,
58    IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
59    QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
60};
61use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
62use promql_parser::parser::token::TokenType;
63use promql_parser::parser::{
64    token, AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt,
65    Expr as PromExpr, Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector,
66    NumberLiteral, Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr,
67    VectorMatchCardinality, VectorSelector,
68};
69use snafu::{ensure, OptionExt, ResultExt};
70use store_api::metric_engine_consts::{
71    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
72};
73use table::table::adapter::DfTableProviderAdapter;
74
75use crate::promql::error::{
76    CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
77    ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidTimeRangeSnafu,
78    MultiFieldsNotSupportedSnafu, MultipleMetricMatchersSnafu, MultipleVectorSnafu,
79    NoMetricMatcherSnafu, PromqlPlanNodeSnafu, Result, SameLabelSetSnafu, TableNameNotFoundSnafu,
80    TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
81    UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu,
82    ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
83};
84
85/// `time()` function in PromQL.
86const SPECIAL_TIME_FUNCTION: &str = "time";
87/// `scalar()` function in PromQL.
88const SCALAR_FUNCTION: &str = "scalar";
89/// `histogram_quantile` function in PromQL
90const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
91/// `vector` function in PromQL
92const SPECIAL_VECTOR_FUNCTION: &str = "vector";
93/// `le` column for conventional histogram.
94const LE_COLUMN_NAME: &str = "le";
95
96const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
97
98/// default value column name for empty metric
99const DEFAULT_FIELD_COLUMN: &str = "value";
100
101/// Special modifier to project field columns under multi-field mode
102const FIELD_COLUMN_MATCHER: &str = "__field__";
103
104/// Special modifier for cross schema query
105const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
106const DB_COLUMN_MATCHER: &str = "__database__";
107
108/// Threshold for scatter scan mode
109const MAX_SCATTER_POINTS: i64 = 400;
110
111/// Interval 1 hour in millisecond
112const INTERVAL_1H: i64 = 60 * 60 * 1000;
113
114#[derive(Default, Debug, Clone)]
115struct PromPlannerContext {
116    // query parameters
117    start: Millisecond,
118    end: Millisecond,
119    interval: Millisecond,
120    lookback_delta: Millisecond,
121
122    // planner states
123    table_name: Option<String>,
124    time_index_column: Option<String>,
125    field_columns: Vec<String>,
126    tag_columns: Vec<String>,
127    field_column_matcher: Option<Vec<Matcher>>,
128    schema_name: Option<String>,
129    /// The range in millisecond of range selector. None if there is no range selector.
130    range: Option<Millisecond>,
131}
132
133impl PromPlannerContext {
134    fn from_eval_stmt(stmt: &EvalStmt) -> Self {
135        Self {
136            start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
137            end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
138            interval: stmt.interval.as_millis() as _,
139            lookback_delta: stmt.lookback_delta.as_millis() as _,
140            ..Default::default()
141        }
142    }
143
144    /// Reset all planner states
145    fn reset(&mut self) {
146        self.table_name = None;
147        self.time_index_column = None;
148        self.field_columns = vec![];
149        self.tag_columns = vec![];
150        self.field_column_matcher = None;
151        self.schema_name = None;
152        self.range = None;
153    }
154
155    /// Reset table name and schema to empty
156    fn reset_table_name_and_schema(&mut self) {
157        self.table_name = Some(String::new());
158        self.schema_name = None;
159    }
160
161    /// Check if `le` is present in tag columns
162    fn has_le_tag(&self) -> bool {
163        self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
164    }
165}
166
167pub struct PromPlanner {
168    table_provider: DfTableSourceProvider,
169    ctx: PromPlannerContext,
170}
171
172/// Unescapes the value of the matcher
173pub fn normalize_matcher(mut matcher: Matcher) -> Matcher {
174    if let Ok(unescaped_value) = unescaper::unescape(&matcher.value) {
175        matcher.value = unescaped_value;
176    }
177    matcher
178}
179
180impl PromPlanner {
181    pub async fn stmt_to_plan(
182        table_provider: DfTableSourceProvider,
183        stmt: &EvalStmt,
184        session_state: &SessionState,
185    ) -> Result<LogicalPlan> {
186        let mut planner = Self {
187            table_provider,
188            ctx: PromPlannerContext::from_eval_stmt(stmt),
189        };
190
191        planner.prom_expr_to_plan(&stmt.expr, session_state).await
192    }
193
194    #[async_recursion]
195    pub async fn prom_expr_to_plan(
196        &mut self,
197        prom_expr: &PromExpr,
198        session_state: &SessionState,
199    ) -> Result<LogicalPlan> {
200        let res = match prom_expr {
201            PromExpr::Aggregate(expr) => self.prom_aggr_expr_to_plan(session_state, expr).await?,
202            PromExpr::Unary(expr) => self.prom_unary_expr_to_plan(session_state, expr).await?,
203            PromExpr::Binary(expr) => self.prom_binary_expr_to_plan(session_state, expr).await?,
204            PromExpr::Paren(ParenExpr { expr }) => {
205                self.prom_expr_to_plan(expr, session_state).await?
206            }
207            PromExpr::Subquery(expr) => {
208                self.prom_subquery_expr_to_plan(session_state, expr).await?
209            }
210            PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
211            PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
212            PromExpr::VectorSelector(selector) => {
213                self.prom_vector_selector_to_plan(selector).await?
214            }
215            PromExpr::MatrixSelector(selector) => {
216                self.prom_matrix_selector_to_plan(selector).await?
217            }
218            PromExpr::Call(expr) => self.prom_call_expr_to_plan(session_state, expr).await?,
219            PromExpr::Extension(expr) => self.prom_ext_expr_to_plan(session_state, expr).await?,
220        };
221
222        Ok(res)
223    }
224
225    async fn prom_subquery_expr_to_plan(
226        &mut self,
227        session_state: &SessionState,
228        subquery_expr: &SubqueryExpr,
229    ) -> Result<LogicalPlan> {
230        let SubqueryExpr {
231            expr, range, step, ..
232        } = subquery_expr;
233
234        let current_interval = self.ctx.interval;
235        if let Some(step) = step {
236            self.ctx.interval = step.as_millis() as _;
237        }
238        let current_start = self.ctx.start;
239        self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
240        let input = self.prom_expr_to_plan(expr, session_state).await?;
241        self.ctx.interval = current_interval;
242        self.ctx.start = current_start;
243
244        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
245        let range_ms = range.as_millis() as _;
246        self.ctx.range = Some(range_ms);
247
248        let manipulate = RangeManipulate::new(
249            self.ctx.start,
250            self.ctx.end,
251            self.ctx.interval,
252            range_ms,
253            self.ctx
254                .time_index_column
255                .clone()
256                .expect("time index should be set in `setup_context`"),
257            self.ctx.field_columns.clone(),
258            input,
259        )
260        .context(DataFusionPlanningSnafu)?;
261
262        Ok(LogicalPlan::Extension(Extension {
263            node: Arc::new(manipulate),
264        }))
265    }
266
267    async fn prom_aggr_expr_to_plan(
268        &mut self,
269        session_state: &SessionState,
270        aggr_expr: &AggregateExpr,
271    ) -> Result<LogicalPlan> {
272        let AggregateExpr {
273            op,
274            expr,
275            modifier,
276            param,
277        } = aggr_expr;
278
279        let input = self.prom_expr_to_plan(expr, session_state).await?;
280
281        match (*op).id() {
282            token::T_TOPK | token::T_BOTTOMK => {
283                self.prom_topk_bottomk_to_plan(aggr_expr, input).await
284            }
285            _ => {
286                // calculate columns to group by
287                // Need to append time index column into group by columns
288                let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
289                // convert op and value columns to aggregate exprs
290                let (aggr_exprs, prev_field_exprs) =
291                    self.create_aggregate_exprs(*op, param, &input)?;
292
293                // create plan
294                let builder = LogicalPlanBuilder::from(input);
295                let builder = if op.id() == token::T_COUNT_VALUES {
296                    let label = Self::get_param_value_as_str(*op, param)?;
297                    // `count_values` must be grouped by fields,
298                    // and project the fields to the new label.
299                    group_exprs.extend(prev_field_exprs.clone());
300                    let project_fields = self
301                        .create_field_column_exprs()?
302                        .into_iter()
303                        .chain(self.create_tag_column_exprs()?)
304                        .chain(Some(self.create_time_index_column_expr()?))
305                        .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
306
307                    builder
308                        .aggregate(group_exprs.clone(), aggr_exprs)
309                        .context(DataFusionPlanningSnafu)?
310                        .project(project_fields)
311                        .context(DataFusionPlanningSnafu)?
312                } else {
313                    builder
314                        .aggregate(group_exprs.clone(), aggr_exprs)
315                        .context(DataFusionPlanningSnafu)?
316                };
317
318                let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
319
320                builder
321                    .sort(sort_expr)
322                    .context(DataFusionPlanningSnafu)?
323                    .build()
324                    .context(DataFusionPlanningSnafu)
325            }
326        }
327    }
328
329    /// Create logical plan for PromQL topk and bottomk expr.
330    async fn prom_topk_bottomk_to_plan(
331        &mut self,
332        aggr_expr: &AggregateExpr,
333        input: LogicalPlan,
334    ) -> Result<LogicalPlan> {
335        let AggregateExpr {
336            op,
337            param,
338            modifier,
339            ..
340        } = aggr_expr;
341
342        let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
343
344        let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
345
346        // convert op and value columns to window exprs.
347        let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
348
349        let rank_columns: Vec<_> = window_exprs
350            .iter()
351            .map(|expr| expr.schema_name().to_string())
352            .collect();
353
354        // Create ranks filter with `Operator::Or`.
355        // Safety: at least one rank column
356        let filter: DfExpr = rank_columns
357            .iter()
358            .fold(None, |expr, rank| {
359                let predicate = DfExpr::BinaryExpr(BinaryExpr {
360                    left: Box::new(col(rank)),
361                    op: Operator::LtEq,
362                    right: Box::new(val.clone()),
363                });
364
365                match expr {
366                    None => Some(predicate),
367                    Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
368                        left: Box::new(expr),
369                        op: Operator::Or,
370                        right: Box::new(predicate),
371                    })),
372                }
373            })
374            .unwrap();
375
376        let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
377
378        let mut new_group_exprs = group_exprs.clone();
379        // Order by ranks
380        new_group_exprs.extend(rank_columns);
381
382        let group_sort_expr = new_group_exprs
383            .into_iter()
384            .map(|expr| expr.sort(true, false));
385
386        let project_fields = self
387            .create_field_column_exprs()?
388            .into_iter()
389            .chain(self.create_tag_column_exprs()?)
390            .chain(Some(self.create_time_index_column_expr()?));
391
392        LogicalPlanBuilder::from(input)
393            .window(window_exprs)
394            .context(DataFusionPlanningSnafu)?
395            .filter(filter)
396            .context(DataFusionPlanningSnafu)?
397            .sort(group_sort_expr)
398            .context(DataFusionPlanningSnafu)?
399            .project(project_fields)
400            .context(DataFusionPlanningSnafu)?
401            .build()
402            .context(DataFusionPlanningSnafu)
403    }
404
405    async fn prom_unary_expr_to_plan(
406        &mut self,
407        session_state: &SessionState,
408        unary_expr: &UnaryExpr,
409    ) -> Result<LogicalPlan> {
410        let UnaryExpr { expr } = unary_expr;
411        // Unary Expr in PromQL implys the `-` operator
412        let input = self.prom_expr_to_plan(expr, session_state).await?;
413        self.projection_for_each_field_column(input, |col| {
414            Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
415        })
416    }
417
418    async fn prom_binary_expr_to_plan(
419        &mut self,
420        session_state: &SessionState,
421        binary_expr: &PromBinaryExpr,
422    ) -> Result<LogicalPlan> {
423        let PromBinaryExpr {
424            lhs,
425            rhs,
426            op,
427            modifier,
428        } = binary_expr;
429
430        // if set to true, comparison operator will return 0/1 (for true/false) instead of
431        // filter on the result column
432        let should_return_bool = if let Some(m) = modifier {
433            m.return_bool
434        } else {
435            false
436        };
437        let is_comparison_op = Self::is_token_a_comparison_op(*op);
438
439        // we should build a filter plan here if the op is comparison op and need not
440        // to return 0/1. Otherwise, we should build a projection plan
441        match (
442            Self::try_build_literal_expr(lhs),
443            Self::try_build_literal_expr(rhs),
444        ) {
445            (Some(lhs), Some(rhs)) => {
446                self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
447                self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
448                self.ctx.reset_table_name_and_schema();
449                let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
450                let mut field_expr = field_expr_builder(lhs, rhs)?;
451
452                if is_comparison_op && should_return_bool {
453                    field_expr = DfExpr::Cast(Cast {
454                        expr: Box::new(field_expr),
455                        data_type: ArrowDataType::Float64,
456                    });
457                }
458
459                Ok(LogicalPlan::Extension(Extension {
460                    node: Arc::new(
461                        EmptyMetric::new(
462                            self.ctx.start,
463                            self.ctx.end,
464                            self.ctx.interval,
465                            SPECIAL_TIME_FUNCTION.to_string(),
466                            DEFAULT_FIELD_COLUMN.to_string(),
467                            Some(field_expr),
468                        )
469                        .context(DataFusionPlanningSnafu)?,
470                    ),
471                }))
472            }
473            // lhs is a literal, rhs is a column
474            (Some(mut expr), None) => {
475                let input = self.prom_expr_to_plan(rhs, session_state).await?;
476                // check if the literal is a special time expr
477                if let Some(time_expr) = Self::try_build_special_time_expr(
478                    lhs,
479                    self.ctx.time_index_column.as_ref().unwrap(),
480                ) {
481                    expr = time_expr
482                }
483                let bin_expr_builder = |col: &String| {
484                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
485                    let mut binary_expr =
486                        binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
487
488                    if is_comparison_op && should_return_bool {
489                        binary_expr = DfExpr::Cast(Cast {
490                            expr: Box::new(binary_expr),
491                            data_type: ArrowDataType::Float64,
492                        });
493                    }
494                    Ok(binary_expr)
495                };
496                if is_comparison_op && !should_return_bool {
497                    self.filter_on_field_column(input, bin_expr_builder)
498                } else {
499                    self.projection_for_each_field_column(input, bin_expr_builder)
500                }
501            }
502            // lhs is a column, rhs is a literal
503            (None, Some(mut expr)) => {
504                let input = self.prom_expr_to_plan(lhs, session_state).await?;
505                // check if the literal is a special time expr
506                if let Some(time_expr) = Self::try_build_special_time_expr(
507                    rhs,
508                    self.ctx.time_index_column.as_ref().unwrap(),
509                ) {
510                    expr = time_expr
511                }
512                let bin_expr_builder = |col: &String| {
513                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
514                    let mut binary_expr =
515                        binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
516
517                    if is_comparison_op && should_return_bool {
518                        binary_expr = DfExpr::Cast(Cast {
519                            expr: Box::new(binary_expr),
520                            data_type: ArrowDataType::Float64,
521                        });
522                    }
523                    Ok(binary_expr)
524                };
525                if is_comparison_op && !should_return_bool {
526                    self.filter_on_field_column(input, bin_expr_builder)
527                } else {
528                    self.projection_for_each_field_column(input, bin_expr_builder)
529                }
530            }
531            // both are columns. join them on time index
532            (None, None) => {
533                let left_input = self.prom_expr_to_plan(lhs, session_state).await?;
534                let left_field_columns = self.ctx.field_columns.clone();
535                let left_time_index_column = self.ctx.time_index_column.clone();
536                let mut left_table_ref = self
537                    .table_ref()
538                    .unwrap_or_else(|_| TableReference::bare(""));
539                let left_context = self.ctx.clone();
540
541                let right_input = self.prom_expr_to_plan(rhs, session_state).await?;
542                let right_field_columns = self.ctx.field_columns.clone();
543                let right_time_index_column = self.ctx.time_index_column.clone();
544                let mut right_table_ref = self
545                    .table_ref()
546                    .unwrap_or_else(|_| TableReference::bare(""));
547                let right_context = self.ctx.clone();
548
549                // TODO(ruihang): avoid join if left and right are the same table
550
551                // set op has "special" join semantics
552                if Self::is_token_a_set_op(*op) {
553                    return self.set_op_on_non_field_columns(
554                        left_input,
555                        right_input,
556                        left_context,
557                        right_context,
558                        *op,
559                        modifier,
560                    );
561                }
562
563                // normal join
564                if left_table_ref == right_table_ref {
565                    // rename table references to avoid ambiguity
566                    left_table_ref = TableReference::bare("lhs");
567                    right_table_ref = TableReference::bare("rhs");
568                    // `self.ctx` have ctx in right plan, if right plan have no tag,
569                    // we use left plan ctx as the ctx for subsequent calculations,
570                    // to avoid case like `host + scalar(...)`
571                    // we need preserve tag column on `host` table in subsequent projection,
572                    // which only show in left plan ctx.
573                    if self.ctx.tag_columns.is_empty() {
574                        self.ctx = left_context.clone();
575                        self.ctx.table_name = Some("lhs".to_string());
576                    } else {
577                        self.ctx.table_name = Some("rhs".to_string());
578                    }
579                }
580                let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
581
582                let join_plan = self.join_on_non_field_columns(
583                    left_input,
584                    right_input,
585                    left_table_ref.clone(),
586                    right_table_ref.clone(),
587                    left_time_index_column,
588                    right_time_index_column,
589                    // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
590                    // under this case we only join on time index
591                    left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
592                    modifier,
593                )?;
594                let join_plan_schema = join_plan.schema().clone();
595
596                let bin_expr_builder = |_: &String| {
597                    let (left_col_name, right_col_name) = field_columns.next().unwrap();
598                    let left_col = join_plan_schema
599                        .qualified_field_with_name(Some(&left_table_ref), left_col_name)
600                        .context(DataFusionPlanningSnafu)?
601                        .into();
602                    let right_col = join_plan_schema
603                        .qualified_field_with_name(Some(&right_table_ref), right_col_name)
604                        .context(DataFusionPlanningSnafu)?
605                        .into();
606
607                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
608                    let mut binary_expr =
609                        binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
610                    if is_comparison_op && should_return_bool {
611                        binary_expr = DfExpr::Cast(Cast {
612                            expr: Box::new(binary_expr),
613                            data_type: ArrowDataType::Float64,
614                        });
615                    }
616                    Ok(binary_expr)
617                };
618                if is_comparison_op && !should_return_bool {
619                    self.filter_on_field_column(join_plan, bin_expr_builder)
620                } else {
621                    self.projection_for_each_field_column(join_plan, bin_expr_builder)
622                }
623            }
624        }
625    }
626
627    fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
628        let NumberLiteral { val } = number_literal;
629        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
630        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
631        self.ctx.reset_table_name_and_schema();
632        let literal_expr = df_prelude::lit(*val);
633
634        let plan = LogicalPlan::Extension(Extension {
635            node: Arc::new(
636                EmptyMetric::new(
637                    self.ctx.start,
638                    self.ctx.end,
639                    self.ctx.interval,
640                    SPECIAL_TIME_FUNCTION.to_string(),
641                    DEFAULT_FIELD_COLUMN.to_string(),
642                    Some(literal_expr),
643                )
644                .context(DataFusionPlanningSnafu)?,
645            ),
646        });
647        Ok(plan)
648    }
649
650    fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
651        let StringLiteral { val } = string_literal;
652        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
653        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
654        self.ctx.reset_table_name_and_schema();
655        let literal_expr = df_prelude::lit(val.to_string());
656
657        let plan = LogicalPlan::Extension(Extension {
658            node: Arc::new(
659                EmptyMetric::new(
660                    self.ctx.start,
661                    self.ctx.end,
662                    self.ctx.interval,
663                    SPECIAL_TIME_FUNCTION.to_string(),
664                    DEFAULT_FIELD_COLUMN.to_string(),
665                    Some(literal_expr),
666                )
667                .context(DataFusionPlanningSnafu)?,
668            ),
669        });
670        Ok(plan)
671    }
672
673    async fn prom_vector_selector_to_plan(
674        &mut self,
675        vector_selector: &VectorSelector,
676    ) -> Result<LogicalPlan> {
677        let VectorSelector {
678            name,
679            offset,
680            matchers,
681            at: _,
682        } = vector_selector;
683        let matchers = self.preprocess_label_matchers(matchers, name)?;
684        if let Some(empty_plan) = self.setup_context().await? {
685            return Ok(empty_plan);
686        }
687        let normalize = self
688            .selector_to_series_normalize_plan(offset, matchers, false)
689            .await?;
690        let manipulate = InstantManipulate::new(
691            self.ctx.start,
692            self.ctx.end,
693            self.ctx.lookback_delta,
694            self.ctx.interval,
695            self.ctx
696                .time_index_column
697                .clone()
698                .expect("time index should be set in `setup_context`"),
699            self.ctx.field_columns.first().cloned(),
700            normalize,
701        );
702        Ok(LogicalPlan::Extension(Extension {
703            node: Arc::new(manipulate),
704        }))
705    }
706
707    async fn prom_matrix_selector_to_plan(
708        &mut self,
709        matrix_selector: &MatrixSelector,
710    ) -> Result<LogicalPlan> {
711        let MatrixSelector { vs, range } = matrix_selector;
712        let VectorSelector {
713            name,
714            offset,
715            matchers,
716            ..
717        } = vs;
718        let matchers = self.preprocess_label_matchers(matchers, name)?;
719        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
720        let range_ms = range.as_millis() as _;
721        self.ctx.range = Some(range_ms);
722
723        // Some functions like rate may require special fields in the RangeManipulate plan
724        // so we can't skip RangeManipulate.
725        let normalize = match self.setup_context().await? {
726            Some(empty_plan) => empty_plan,
727            None => {
728                self.selector_to_series_normalize_plan(offset, matchers, true)
729                    .await?
730            }
731        };
732        let manipulate = RangeManipulate::new(
733            self.ctx.start,
734            self.ctx.end,
735            self.ctx.interval,
736            // TODO(ruihang): convert via Timestamp datatypes to support different time units
737            range_ms,
738            self.ctx
739                .time_index_column
740                .clone()
741                .expect("time index should be set in `setup_context`"),
742            self.ctx.field_columns.clone(),
743            normalize,
744        )
745        .context(DataFusionPlanningSnafu)?;
746
747        Ok(LogicalPlan::Extension(Extension {
748            node: Arc::new(manipulate),
749        }))
750    }
751
752    async fn prom_call_expr_to_plan(
753        &mut self,
754        session_state: &SessionState,
755        call_expr: &Call,
756    ) -> Result<LogicalPlan> {
757        let Call { func, args } = call_expr;
758        // some special functions that are not expression but a plan
759        match func.name {
760            SPECIAL_HISTOGRAM_QUANTILE => {
761                return self.create_histogram_plan(args, session_state).await
762            }
763            SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
764            SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await,
765            _ => {}
766        }
767
768        // transform function arguments
769        let args = self.create_function_args(&args.args)?;
770        let input = if let Some(prom_expr) = &args.input {
771            self.prom_expr_to_plan(prom_expr, session_state).await?
772        } else {
773            self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
774            self.ctx.reset_table_name_and_schema();
775            self.ctx.tag_columns = vec![];
776            self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
777            LogicalPlan::Extension(Extension {
778                node: Arc::new(
779                    EmptyMetric::new(
780                        self.ctx.start,
781                        self.ctx.end,
782                        self.ctx.interval,
783                        SPECIAL_TIME_FUNCTION.to_string(),
784                        DEFAULT_FIELD_COLUMN.to_string(),
785                        None,
786                    )
787                    .context(DataFusionPlanningSnafu)?,
788                ),
789            })
790        };
791        let (mut func_exprs, new_tags) =
792            self.create_function_expr(func, args.literals.clone(), session_state)?;
793        func_exprs.insert(0, self.create_time_index_column_expr()?);
794        func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
795
796        let builder = LogicalPlanBuilder::from(input)
797            .project(func_exprs)
798            .context(DataFusionPlanningSnafu)?
799            .filter(self.create_empty_values_filter_expr()?)
800            .context(DataFusionPlanningSnafu)?;
801
802        let builder = match func.name {
803            "sort" => builder
804                .sort(self.create_field_columns_sort_exprs(true))
805                .context(DataFusionPlanningSnafu)?,
806            "sort_desc" => builder
807                .sort(self.create_field_columns_sort_exprs(false))
808                .context(DataFusionPlanningSnafu)?,
809            "sort_by_label" => builder
810                .sort(Self::create_sort_exprs_by_tags(
811                    func.name,
812                    args.literals,
813                    true,
814                )?)
815                .context(DataFusionPlanningSnafu)?,
816            "sort_by_label_desc" => builder
817                .sort(Self::create_sort_exprs_by_tags(
818                    func.name,
819                    args.literals,
820                    false,
821                )?)
822                .context(DataFusionPlanningSnafu)?,
823
824            _ => builder,
825        };
826
827        // Update context tags after building plan
828        // We can't push them before planning, because they won't exist until projection.
829        for tag in new_tags {
830            self.ctx.tag_columns.push(tag);
831        }
832
833        builder.build().context(DataFusionPlanningSnafu)
834    }
835
836    async fn prom_ext_expr_to_plan(
837        &mut self,
838        session_state: &SessionState,
839        ext_expr: &promql_parser::parser::ast::Extension,
840    ) -> Result<LogicalPlan> {
841        // let promql_parser::parser::ast::Extension { expr } = ext_expr;
842        let expr = &ext_expr.expr;
843        let children = expr.children();
844        let plan = self.prom_expr_to_plan(&children[0], session_state).await?;
845        // Wrapper for the explanation/analyze of the existing plan
846        // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
847        // if `analyze` is true, runs the actual plan and produces
848        // information about metrics during run.
849        // if `verbose` is true, prints out additional details when VERBOSE keyword is specified
850        match expr.name() {
851            "ANALYZE" => LogicalPlanBuilder::from(plan)
852                .explain(false, true)
853                .unwrap()
854                .build()
855                .context(DataFusionPlanningSnafu),
856            "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
857                .explain(true, true)
858                .unwrap()
859                .build()
860                .context(DataFusionPlanningSnafu),
861            "EXPLAIN" => LogicalPlanBuilder::from(plan)
862                .explain(false, false)
863                .unwrap()
864                .build()
865                .context(DataFusionPlanningSnafu),
866            "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
867                .explain(true, false)
868                .unwrap()
869                .build()
870                .context(DataFusionPlanningSnafu),
871            _ => LogicalPlanBuilder::empty(true)
872                .build()
873                .context(DataFusionPlanningSnafu),
874        }
875    }
876
877    /// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].
878    /// Returns a new [Matchers] that doesn't contain metric name matcher.
879    ///
880    /// Each call to this function means new selector is started. Thus, the context will be reset
881    /// at first.
882    ///
883    /// Name rule:
884    /// - if `name` is some, then the matchers MUST NOT contain `__name__` matcher.
885    /// - if `name` is none, then the matchers MAY contain NONE OR MULTIPLE `__name__` matchers.
886    #[allow(clippy::mutable_key_type)]
887    fn preprocess_label_matchers(
888        &mut self,
889        label_matchers: &Matchers,
890        name: &Option<String>,
891    ) -> Result<Matchers> {
892        self.ctx.reset();
893
894        let metric_name;
895        if let Some(name) = name.clone() {
896            metric_name = Some(name);
897            ensure!(
898                label_matchers.find_matchers(METRIC_NAME).is_empty(),
899                MultipleMetricMatchersSnafu
900            );
901        } else {
902            let mut matches = label_matchers.find_matchers(METRIC_NAME);
903            ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
904            ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
905            ensure!(
906                matches[0].op == MatchOp::Equal,
907                UnsupportedMatcherOpSnafu {
908                    matcher_op: matches[0].op.to_string(),
909                    matcher: METRIC_NAME
910                }
911            );
912            metric_name = matches.pop().map(|m| m.value);
913        }
914
915        self.ctx.table_name = metric_name;
916
917        let mut matchers = HashSet::new();
918        for matcher in &label_matchers.matchers {
919            // TODO(ruihang): support other metric match ops
920            if matcher.name == FIELD_COLUMN_MATCHER {
921                self.ctx
922                    .field_column_matcher
923                    .get_or_insert_default()
924                    .push(matcher.clone());
925            } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
926                ensure!(
927                    matcher.op == MatchOp::Equal,
928                    UnsupportedMatcherOpSnafu {
929                        matcher: matcher.name.to_string(),
930                        matcher_op: matcher.op.to_string(),
931                    }
932                );
933                self.ctx.schema_name = Some(matcher.value.clone());
934            } else if matcher.name != METRIC_NAME {
935                let _ = matchers.insert(matcher.clone());
936            }
937        }
938
939        Ok(Matchers::new(
940            matchers.into_iter().map(normalize_matcher).collect(),
941        ))
942    }
943
944    async fn selector_to_series_normalize_plan(
945        &mut self,
946        offset: &Option<Offset>,
947        label_matchers: Matchers,
948        is_range_selector: bool,
949    ) -> Result<LogicalPlan> {
950        // make table scan plan
951        let table_ref = self.table_ref()?;
952        let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
953        let table_schema = table_scan.schema();
954
955        // make filter exprs
956        let offset_duration = match offset {
957            Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
958            Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
959            None => 0,
960        };
961        let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
962        if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
963            scan_filters.push(time_index_filter);
964        }
965        table_scan = LogicalPlanBuilder::from(table_scan)
966            .filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
967            .context(DataFusionPlanningSnafu)?
968            .build()
969            .context(DataFusionPlanningSnafu)?;
970
971        // make a projection plan if there is any `__field__` matcher
972        if let Some(field_matchers) = &self.ctx.field_column_matcher {
973            let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
974            // opt-in set
975            let mut result_set = HashSet::new();
976            // opt-out set
977            let mut reverse_set = HashSet::new();
978            for matcher in field_matchers {
979                match &matcher.op {
980                    MatchOp::Equal => {
981                        if col_set.contains(&matcher.value) {
982                            let _ = result_set.insert(matcher.value.clone());
983                        } else {
984                            return Err(ColumnNotFoundSnafu {
985                                col: matcher.value.clone(),
986                            }
987                            .build());
988                        }
989                    }
990                    MatchOp::NotEqual => {
991                        if col_set.contains(&matcher.value) {
992                            let _ = reverse_set.insert(matcher.value.clone());
993                        } else {
994                            return Err(ColumnNotFoundSnafu {
995                                col: matcher.value.clone(),
996                            }
997                            .build());
998                        }
999                    }
1000                    MatchOp::Re(regex) => {
1001                        for col in &self.ctx.field_columns {
1002                            if regex.is_match(col) {
1003                                let _ = result_set.insert(col.clone());
1004                            }
1005                        }
1006                    }
1007                    MatchOp::NotRe(regex) => {
1008                        for col in &self.ctx.field_columns {
1009                            if regex.is_match(col) {
1010                                let _ = reverse_set.insert(col.clone());
1011                            }
1012                        }
1013                    }
1014                }
1015            }
1016            // merge two set
1017            if result_set.is_empty() {
1018                result_set = col_set.into_iter().cloned().collect();
1019            }
1020            for col in reverse_set {
1021                let _ = result_set.remove(&col);
1022            }
1023
1024            // mask the field columns in context using computed result set
1025            self.ctx.field_columns = self
1026                .ctx
1027                .field_columns
1028                .drain(..)
1029                .filter(|col| result_set.contains(col))
1030                .collect();
1031
1032            let exprs = result_set
1033                .into_iter()
1034                .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1035                .chain(self.create_tag_column_exprs()?)
1036                .chain(Some(self.create_time_index_column_expr()?))
1037                .collect::<Vec<_>>();
1038
1039            // reuse this variable for simplicity
1040            table_scan = LogicalPlanBuilder::from(table_scan)
1041                .project(exprs)
1042                .context(DataFusionPlanningSnafu)?
1043                .build()
1044                .context(DataFusionPlanningSnafu)?;
1045        }
1046
1047        // make sort plan
1048        let sort_plan = LogicalPlanBuilder::from(table_scan)
1049            .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1050            .context(DataFusionPlanningSnafu)?
1051            .build()
1052            .context(DataFusionPlanningSnafu)?;
1053
1054        // make divide plan
1055        let time_index_column =
1056            self.ctx
1057                .time_index_column
1058                .clone()
1059                .with_context(|| TimeIndexNotFoundSnafu {
1060                    table: table_ref.to_string(),
1061                })?;
1062        let divide_plan = LogicalPlan::Extension(Extension {
1063            node: Arc::new(SeriesDivide::new(
1064                self.ctx.tag_columns.clone(),
1065                time_index_column,
1066                sort_plan,
1067            )),
1068        });
1069
1070        // make series_normalize plan
1071        if !is_range_selector && offset_duration == 0 {
1072            return Ok(divide_plan);
1073        }
1074        let series_normalize = SeriesNormalize::new(
1075            offset_duration,
1076            self.ctx
1077                .time_index_column
1078                .clone()
1079                .with_context(|| TimeIndexNotFoundSnafu {
1080                    table: table_ref.to_quoted_string(),
1081                })?,
1082            is_range_selector,
1083            self.ctx.tag_columns.clone(),
1084            divide_plan,
1085        );
1086        let logical_plan = LogicalPlan::Extension(Extension {
1087            node: Arc::new(series_normalize),
1088        });
1089
1090        Ok(logical_plan)
1091    }
1092
1093    /// Convert [LabelModifier] to [Column] exprs for aggregation.
1094    /// Timestamp column and tag columns will be included.
1095    ///
1096    /// # Side effect
1097    ///
1098    /// This method will also change the tag columns in ctx if `update_ctx` is true.
1099    fn agg_modifier_to_col(
1100        &mut self,
1101        input_schema: &DFSchemaRef,
1102        modifier: &Option<LabelModifier>,
1103        update_ctx: bool,
1104    ) -> Result<Vec<DfExpr>> {
1105        match modifier {
1106            None => {
1107                if update_ctx {
1108                    self.ctx.tag_columns.clear();
1109                }
1110                Ok(vec![self.create_time_index_column_expr()?])
1111            }
1112            Some(LabelModifier::Include(labels)) => {
1113                if update_ctx {
1114                    self.ctx.tag_columns.clear();
1115                }
1116                let mut exprs = Vec::with_capacity(labels.labels.len());
1117                for label in &labels.labels {
1118                    // nonexistence label will be ignored
1119                    if let Ok(field) = input_schema.field_with_unqualified_name(label) {
1120                        exprs.push(DfExpr::Column(Column::from(field.name())));
1121
1122                        if update_ctx {
1123                            // update the tag columns in context
1124                            self.ctx.tag_columns.push(label.clone());
1125                        }
1126                    }
1127                }
1128                // add timestamp column
1129                exprs.push(self.create_time_index_column_expr()?);
1130
1131                Ok(exprs)
1132            }
1133            Some(LabelModifier::Exclude(labels)) => {
1134                let mut all_fields = input_schema
1135                    .fields()
1136                    .iter()
1137                    .map(|f| f.name())
1138                    .collect::<BTreeSet<_>>();
1139
1140                // remove "without"-ed fields
1141                // nonexistence label will be ignored
1142                for label in &labels.labels {
1143                    let _ = all_fields.remove(label);
1144                }
1145
1146                // remove time index and value fields
1147                if let Some(time_index) = &self.ctx.time_index_column {
1148                    let _ = all_fields.remove(time_index);
1149                }
1150                for value in &self.ctx.field_columns {
1151                    let _ = all_fields.remove(value);
1152                }
1153
1154                if update_ctx {
1155                    // change the tag columns in context
1156                    self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1157                }
1158
1159                // collect remaining fields and convert to col expr
1160                let mut exprs = all_fields
1161                    .into_iter()
1162                    .map(|c| DfExpr::Column(Column::from(c)))
1163                    .collect::<Vec<_>>();
1164
1165                // add timestamp column
1166                exprs.push(self.create_time_index_column_expr()?);
1167
1168                Ok(exprs)
1169            }
1170        }
1171    }
1172
1173    // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
1174    pub fn matchers_to_expr(
1175        label_matchers: Matchers,
1176        table_schema: &DFSchemaRef,
1177    ) -> Result<Vec<DfExpr>> {
1178        let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1179        for matcher in label_matchers.matchers {
1180            let col = if table_schema
1181                .field_with_unqualified_name(&matcher.name)
1182                .is_err()
1183            {
1184                DfExpr::Literal(ScalarValue::Utf8(Some(String::new()))).alias(matcher.name)
1185            } else {
1186                DfExpr::Column(Column::from_name(matcher.name))
1187            };
1188            let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)));
1189            let expr = match matcher.op {
1190                MatchOp::Equal => col.eq(lit),
1191                MatchOp::NotEqual => col.not_eq(lit),
1192                MatchOp::Re(re) => {
1193                    // TODO(ruihang): a more programmatic way to handle this in datafusion
1194                    if re.as_str() == ".*" {
1195                        continue;
1196                    }
1197                    DfExpr::BinaryExpr(BinaryExpr {
1198                        left: Box::new(col),
1199                        op: Operator::RegexMatch,
1200                        right: Box::new(DfExpr::Literal(ScalarValue::Utf8(Some(
1201                            re.as_str().to_string(),
1202                        )))),
1203                    })
1204                }
1205                MatchOp::NotRe(re) => DfExpr::BinaryExpr(BinaryExpr {
1206                    left: Box::new(col),
1207                    op: Operator::RegexNotMatch,
1208                    right: Box::new(DfExpr::Literal(ScalarValue::Utf8(Some(
1209                        re.as_str().to_string(),
1210                    )))),
1211                }),
1212            };
1213            exprs.push(expr);
1214        }
1215
1216        Ok(exprs)
1217    }
1218
1219    fn table_ref(&self) -> Result<TableReference> {
1220        let table_name = self
1221            .ctx
1222            .table_name
1223            .clone()
1224            .context(TableNameNotFoundSnafu)?;
1225
1226        // set schema name if `__schema__` is given
1227        let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1228            TableReference::partial(schema_name.as_str(), table_name.as_str())
1229        } else {
1230            TableReference::bare(table_name.as_str())
1231        };
1232
1233        Ok(table_ref)
1234    }
1235
1236    fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1237        let start = self.ctx.start;
1238        let end = self.ctx.end;
1239        if end < start {
1240            return InvalidTimeRangeSnafu { start, end }.fail();
1241        }
1242        let lookback_delta = self.ctx.lookback_delta;
1243        let range = self.ctx.range.unwrap_or_default();
1244        let interval = self.ctx.interval;
1245        let time_index_expr = self.create_time_index_column_expr()?;
1246        let num_points = (end - start) / interval;
1247
1248        // Scan a continuous time range
1249        if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1250            let single_time_range = time_index_expr
1251                .clone()
1252                .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1253                    Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
1254                    None,
1255                )))
1256                .and(
1257                    time_index_expr.lt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1258                        Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
1259                        None,
1260                    ))),
1261                );
1262            return Ok(Some(single_time_range));
1263        }
1264
1265        // Otherwise scan scatter ranges separately
1266        let mut filters = Vec::with_capacity(num_points as usize);
1267        for timestamp in (start..end).step_by(interval as usize) {
1268            filters.push(
1269                time_index_expr
1270                    .clone()
1271                    .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1272                        Some(timestamp - offset_duration - lookback_delta - range),
1273                        None,
1274                    )))
1275                    .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1276                        ScalarValue::TimestampMillisecond(
1277                            Some(timestamp - offset_duration + lookback_delta),
1278                            None,
1279                        ),
1280                    ))),
1281            )
1282        }
1283
1284        Ok(filters.into_iter().reduce(DfExpr::or))
1285    }
1286
1287    /// Create a table scan plan and a filter plan with given filter.
1288    ///
1289    /// # Panic
1290    /// If the filter is empty
1291    async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1292        let provider = self
1293            .table_provider
1294            .resolve_table(table_ref.clone())
1295            .await
1296            .context(CatalogSnafu)?;
1297
1298        let is_time_index_ms = provider
1299            .as_any()
1300            .downcast_ref::<DefaultTableSource>()
1301            .context(UnknownTableSnafu)?
1302            .table_provider
1303            .as_any()
1304            .downcast_ref::<DfTableProviderAdapter>()
1305            .context(UnknownTableSnafu)?
1306            .table()
1307            .schema()
1308            .timestamp_column()
1309            .with_context(|| TimeIndexNotFoundSnafu {
1310                table: table_ref.to_quoted_string(),
1311            })?
1312            .data_type
1313            == ConcreteDataType::timestamp_millisecond_datatype();
1314
1315        let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1316            .context(DataFusionPlanningSnafu)?
1317            .build()
1318            .context(DataFusionPlanningSnafu)?;
1319
1320        if !is_time_index_ms {
1321            // cast to ms if time_index not in Millisecond precision
1322            let expr: Vec<_> = self
1323                .ctx
1324                .field_columns
1325                .iter()
1326                .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1327                .chain(self.create_tag_column_exprs()?)
1328                .chain(Some(DfExpr::Alias(Alias {
1329                    expr: Box::new(DfExpr::Cast(Cast {
1330                        expr: Box::new(self.create_time_index_column_expr()?),
1331                        data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1332                    })),
1333                    relation: Some(table_ref.clone()),
1334                    name: self
1335                        .ctx
1336                        .time_index_column
1337                        .as_ref()
1338                        .with_context(|| TimeIndexNotFoundSnafu {
1339                            table: table_ref.to_quoted_string(),
1340                        })?
1341                        .clone(),
1342                })))
1343                .collect::<Vec<_>>();
1344            scan_plan = LogicalPlanBuilder::from(scan_plan)
1345                .project(expr)
1346                .context(DataFusionPlanningSnafu)?
1347                .build()
1348                .context(DataFusionPlanningSnafu)?;
1349        }
1350
1351        let result = LogicalPlanBuilder::from(scan_plan)
1352            .build()
1353            .context(DataFusionPlanningSnafu)?;
1354        Ok(result)
1355    }
1356
1357    /// Setup [PromPlannerContext]'s state fields.
1358    ///
1359    /// Returns a logical plan for an empty metric.
1360    async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1361        let table_ref = self.table_ref()?;
1362        let table = match self.table_provider.resolve_table(table_ref.clone()).await {
1363            Err(e) if e.status_code() == StatusCode::TableNotFound => {
1364                let plan = self.setup_context_for_empty_metric()?;
1365                return Ok(Some(plan));
1366            }
1367            res => res.context(CatalogSnafu)?,
1368        };
1369        let table = table
1370            .as_any()
1371            .downcast_ref::<DefaultTableSource>()
1372            .context(UnknownTableSnafu)?
1373            .table_provider
1374            .as_any()
1375            .downcast_ref::<DfTableProviderAdapter>()
1376            .context(UnknownTableSnafu)?
1377            .table();
1378
1379        // set time index column name
1380        let time_index = table
1381            .schema()
1382            .timestamp_column()
1383            .with_context(|| TimeIndexNotFoundSnafu {
1384                table: table_ref.to_quoted_string(),
1385            })?
1386            .name
1387            .clone();
1388        self.ctx.time_index_column = Some(time_index);
1389
1390        // set values columns
1391        let values = table
1392            .table_info()
1393            .meta
1394            .field_column_names()
1395            .cloned()
1396            .collect();
1397        self.ctx.field_columns = values;
1398
1399        // set primary key (tag) columns
1400        let tags = table
1401            .table_info()
1402            .meta
1403            .row_key_column_names()
1404            .filter(|col| {
1405                // remove metric engine's internal columns
1406                col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1407            })
1408            .cloned()
1409            .collect();
1410        self.ctx.tag_columns = tags;
1411
1412        Ok(None)
1413    }
1414
1415    /// Setup [PromPlannerContext]'s state fields for a non existent table
1416    /// without any rows.
1417    fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
1418        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1419        self.ctx.reset_table_name_and_schema();
1420        self.ctx.tag_columns = vec![];
1421        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1422
1423        // The table doesn't have any data, so we set start to 0 and end to -1.
1424        let plan = LogicalPlan::Extension(Extension {
1425            node: Arc::new(
1426                EmptyMetric::new(
1427                    0,
1428                    -1,
1429                    self.ctx.interval,
1430                    SPECIAL_TIME_FUNCTION.to_string(),
1431                    DEFAULT_FIELD_COLUMN.to_string(),
1432                    Some(DfExpr::Literal(ScalarValue::Float64(Some(0.0)))),
1433                )
1434                .context(DataFusionPlanningSnafu)?,
1435            ),
1436        });
1437        Ok(plan)
1438    }
1439
1440    // TODO(ruihang): insert column expr
1441    fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1442        let mut result = FunctionArgs::default();
1443
1444        for arg in args {
1445            match *arg.clone() {
1446                PromExpr::Subquery(_)
1447                | PromExpr::VectorSelector(_)
1448                | PromExpr::MatrixSelector(_)
1449                | PromExpr::Extension(_)
1450                | PromExpr::Aggregate(_)
1451                | PromExpr::Paren(_)
1452                | PromExpr::Call(_) => {
1453                    if result.input.replace(*arg.clone()).is_some() {
1454                        MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1455                    }
1456                }
1457
1458                _ => {
1459                    let expr =
1460                        Self::get_param_as_literal_expr(&Some(Box::new(*arg.clone())), None, None)?;
1461                    result.literals.push(expr);
1462                }
1463            }
1464        }
1465
1466        Ok(result)
1467    }
1468
1469    /// Creates function expressions for projection and returns the expressions and new tags.
1470    ///
1471    /// # Side Effects
1472    ///
1473    /// This method will update [PromPlannerContext]'s fields and tags if needed.
1474    fn create_function_expr(
1475        &mut self,
1476        func: &Function,
1477        other_input_exprs: Vec<DfExpr>,
1478        session_state: &SessionState,
1479    ) -> Result<(Vec<DfExpr>, Vec<String>)> {
1480        // TODO(ruihang): check function args list
1481        let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1482
1483        // TODO(ruihang): set this according to in-param list
1484        let field_column_pos = 0;
1485        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1486        // New labels after executing the function, e.g. `label_replace` etc.
1487        let mut new_tags = vec![];
1488        let scalar_func = match func.name {
1489            "increase" => ScalarFunc::ExtrapolateUdf(
1490                Arc::new(Increase::scalar_udf()),
1491                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1492            ),
1493            "rate" => ScalarFunc::ExtrapolateUdf(
1494                Arc::new(Rate::scalar_udf()),
1495                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1496            ),
1497            "delta" => ScalarFunc::ExtrapolateUdf(
1498                Arc::new(Delta::scalar_udf()),
1499                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1500            ),
1501            "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1502            "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1503            "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1504            "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1505            "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1506            "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1507            "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1508            "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1509            "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1510            "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1511            "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1512            "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1513            "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1514            "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1515            "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1516            "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1517            "predict_linear" => {
1518                other_input_exprs[0] = DfExpr::Cast(Cast {
1519                    expr: Box::new(other_input_exprs[0].clone()),
1520                    data_type: ArrowDataType::Int64,
1521                });
1522                ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
1523            }
1524            "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1525            "time" => {
1526                exprs.push(build_special_time_expr(
1527                    self.ctx.time_index_column.as_ref().unwrap(),
1528                ));
1529                ScalarFunc::GeneratedExpr
1530            }
1531            "minute" => {
1532                // date_part('minute', time_index)
1533                let expr = self.date_part_on_time_index("minute")?;
1534                exprs.push(expr);
1535                ScalarFunc::GeneratedExpr
1536            }
1537            "hour" => {
1538                // date_part('hour', time_index)
1539                let expr = self.date_part_on_time_index("hour")?;
1540                exprs.push(expr);
1541                ScalarFunc::GeneratedExpr
1542            }
1543            "month" => {
1544                // date_part('month', time_index)
1545                let expr = self.date_part_on_time_index("month")?;
1546                exprs.push(expr);
1547                ScalarFunc::GeneratedExpr
1548            }
1549            "year" => {
1550                // date_part('year', time_index)
1551                let expr = self.date_part_on_time_index("year")?;
1552                exprs.push(expr);
1553                ScalarFunc::GeneratedExpr
1554            }
1555            "day_of_month" => {
1556                // date_part('day', time_index)
1557                let expr = self.date_part_on_time_index("day")?;
1558                exprs.push(expr);
1559                ScalarFunc::GeneratedExpr
1560            }
1561            "day_of_week" => {
1562                // date_part('dow', time_index)
1563                let expr = self.date_part_on_time_index("dow")?;
1564                exprs.push(expr);
1565                ScalarFunc::GeneratedExpr
1566            }
1567            "day_of_year" => {
1568                // date_part('doy', time_index)
1569                let expr = self.date_part_on_time_index("doy")?;
1570                exprs.push(expr);
1571                ScalarFunc::GeneratedExpr
1572            }
1573            "days_in_month" => {
1574                // date_part(
1575                //     'days',
1576                //     (date_trunc('month', <TIME INDEX>::date) + interval '1 month - 1 day')
1577                // );
1578                let day_lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some("day".to_string())));
1579                let month_lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some("month".to_string())));
1580                let interval_1month_lit_expr =
1581                    DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)));
1582                let interval_1day_lit_expr = DfExpr::Literal(ScalarValue::IntervalDayTime(Some(
1583                    IntervalDayTime::new(1, 0),
1584                )));
1585                let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1586                    left: Box::new(interval_1month_lit_expr),
1587                    op: Operator::Minus,
1588                    right: Box::new(interval_1day_lit_expr),
1589                });
1590                let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1591                    func: datafusion_functions::datetime::date_trunc(),
1592                    args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1593                });
1594                let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1595                    left: Box::new(date_trunc_expr),
1596                    op: Operator::Plus,
1597                    right: Box::new(the_1month_minus_1day_expr),
1598                });
1599                let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1600                    func: datafusion_functions::datetime::date_part(),
1601                    args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1602                });
1603
1604                exprs.push(date_part_expr);
1605                ScalarFunc::GeneratedExpr
1606            }
1607
1608            "label_join" => {
1609                let (concat_expr, dst_label) =
1610                    Self::build_concat_labels_expr(&mut other_input_exprs, session_state)?;
1611
1612                // Reserve the current field columns except the `dst_label`.
1613                for value in &self.ctx.field_columns {
1614                    if *value != dst_label {
1615                        let expr = DfExpr::Column(Column::from_name(value));
1616                        exprs.push(expr);
1617                    }
1618                }
1619
1620                // Remove it from tag columns if exists to avoid duplicated column names
1621                self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1622                new_tags.push(dst_label);
1623                // Add the new label expr to evaluate
1624                exprs.push(concat_expr);
1625
1626                ScalarFunc::GeneratedExpr
1627            }
1628            "label_replace" => {
1629                if let Some((replace_expr, dst_label)) =
1630                    self.build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?
1631                {
1632                    // Reserve the current field columns except the `dst_label`.
1633                    for value in &self.ctx.field_columns {
1634                        if *value != dst_label {
1635                            let expr = DfExpr::Column(Column::from_name(value));
1636                            exprs.push(expr);
1637                        }
1638                    }
1639
1640                    ensure!(
1641                        !self.ctx.tag_columns.contains(&dst_label),
1642                        SameLabelSetSnafu
1643                    );
1644                    new_tags.push(dst_label);
1645                    // Add the new label expr to evaluate
1646                    exprs.push(replace_expr);
1647                } else {
1648                    // Keep the current field columns
1649                    for value in &self.ctx.field_columns {
1650                        let expr = DfExpr::Column(Column::from_name(value));
1651                        exprs.push(expr);
1652                    }
1653                }
1654
1655                ScalarFunc::GeneratedExpr
1656            }
1657            "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" => {
1658                // These functions are not expression but a part of plan,
1659                // they are processed by `prom_call_expr_to_plan`.
1660                for value in &self.ctx.field_columns {
1661                    let expr = DfExpr::Column(Column::from_name(value));
1662                    exprs.push(expr);
1663                }
1664
1665                ScalarFunc::GeneratedExpr
1666            }
1667            "round" => {
1668                if other_input_exprs.is_empty() {
1669                    other_input_exprs.push_front(DfExpr::Literal(ScalarValue::Float64(Some(0.0))));
1670                }
1671                ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1672            }
1673
1674            _ => {
1675                if let Some(f) = session_state.scalar_functions().get(func.name) {
1676                    ScalarFunc::DataFusionBuiltin(f.clone())
1677                } else if let Some(f) = datafusion_functions::math::functions()
1678                    .iter()
1679                    .find(|f| f.name() == func.name)
1680                {
1681                    ScalarFunc::DataFusionUdf(f.clone())
1682                } else {
1683                    return UnsupportedExprSnafu {
1684                        name: func.name.to_string(),
1685                    }
1686                    .fail();
1687                }
1688            }
1689        };
1690
1691        for value in &self.ctx.field_columns {
1692            let col_expr = DfExpr::Column(Column::from_name(value));
1693
1694            match scalar_func.clone() {
1695                ScalarFunc::DataFusionBuiltin(func) => {
1696                    other_input_exprs.insert(field_column_pos, col_expr);
1697                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1698                        func,
1699                        args: other_input_exprs.clone().into(),
1700                    });
1701                    exprs.push(fn_expr);
1702                    let _ = other_input_exprs.remove(field_column_pos);
1703                }
1704                ScalarFunc::DataFusionUdf(func) => {
1705                    let args = itertools::chain!(
1706                        other_input_exprs.iter().take(field_column_pos).cloned(),
1707                        std::iter::once(col_expr),
1708                        other_input_exprs.iter().skip(field_column_pos).cloned()
1709                    )
1710                    .collect_vec();
1711                    exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1712                }
1713                ScalarFunc::Udf(func) => {
1714                    let ts_range_expr = DfExpr::Column(Column::from_name(
1715                        RangeManipulate::build_timestamp_range_name(
1716                            self.ctx.time_index_column.as_ref().unwrap(),
1717                        ),
1718                    ));
1719                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1720                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1721                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1722                        func,
1723                        args: other_input_exprs.clone().into(),
1724                    });
1725                    exprs.push(fn_expr);
1726                    let _ = other_input_exprs.remove(field_column_pos + 1);
1727                    let _ = other_input_exprs.remove(field_column_pos);
1728                }
1729                ScalarFunc::ExtrapolateUdf(func, range_length) => {
1730                    let ts_range_expr = DfExpr::Column(Column::from_name(
1731                        RangeManipulate::build_timestamp_range_name(
1732                            self.ctx.time_index_column.as_ref().unwrap(),
1733                        ),
1734                    ));
1735                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1736                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1737                    other_input_exprs
1738                        .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1739                    other_input_exprs.push_back(lit(range_length));
1740                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1741                        func,
1742                        args: other_input_exprs.clone().into(),
1743                    });
1744                    exprs.push(fn_expr);
1745                    let _ = other_input_exprs.pop_back();
1746                    let _ = other_input_exprs.remove(field_column_pos + 2);
1747                    let _ = other_input_exprs.remove(field_column_pos + 1);
1748                    let _ = other_input_exprs.remove(field_column_pos);
1749                }
1750                ScalarFunc::GeneratedExpr => {}
1751            }
1752        }
1753
1754        // Update value columns' name, and alias them to remove qualifiers
1755        // For label functions such as `label_join`, `label_replace`, etc.,
1756        // we keep the fields unchanged.
1757        if !matches!(func.name, "label_join" | "label_replace") {
1758            let mut new_field_columns = Vec::with_capacity(exprs.len());
1759
1760            exprs = exprs
1761                .into_iter()
1762                .map(|expr| {
1763                    let display_name = expr.schema_name().to_string();
1764                    new_field_columns.push(display_name.clone());
1765                    Ok(expr.alias(display_name))
1766                })
1767                .collect::<std::result::Result<Vec<_>, _>>()
1768                .context(DataFusionPlanningSnafu)?;
1769
1770            self.ctx.field_columns = new_field_columns;
1771        }
1772
1773        Ok((exprs, new_tags))
1774    }
1775
1776    /// Build expr for `label_replace` function
1777    fn build_regexp_replace_label_expr(
1778        &self,
1779        other_input_exprs: &mut VecDeque<DfExpr>,
1780        session_state: &SessionState,
1781    ) -> Result<Option<(DfExpr, String)>> {
1782        // label_replace(vector, dst_label, replacement, src_label, regex)
1783        let dst_label = match other_input_exprs.pop_front() {
1784            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1785            other => UnexpectedPlanExprSnafu {
1786                desc: format!("expected dst_label string literal, but found {:?}", other),
1787            }
1788            .fail()?,
1789        };
1790        let replacement = match other_input_exprs.pop_front() {
1791            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
1792            other => UnexpectedPlanExprSnafu {
1793                desc: format!("expected replacement string literal, but found {:?}", other),
1794            }
1795            .fail()?,
1796        };
1797        let src_label = match other_input_exprs.pop_front() {
1798            Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)))) => s,
1799            other => UnexpectedPlanExprSnafu {
1800                desc: format!("expected src_label string literal, but found {:?}", other),
1801            }
1802            .fail()?,
1803        };
1804
1805        let regex = match other_input_exprs.pop_front() {
1806            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
1807            other => UnexpectedPlanExprSnafu {
1808                desc: format!("expected regex string literal, but found {:?}", other),
1809            }
1810            .fail()?,
1811        };
1812
1813        // If the src_label exists and regex is empty, keep everything unchanged.
1814        if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
1815            return Ok(None);
1816        }
1817
1818        // If the src_label doesn't exists, and
1819        if !self.ctx.tag_columns.contains(&src_label) {
1820            if replacement.is_empty() {
1821                // the replacement is empty, keep everything unchanged.
1822                return Ok(None);
1823            } else {
1824                // the replacement is not empty, always adds dst_label with replacement value.
1825                return Ok(Some((
1826                    // alias literal `replacement` as dst_label
1827                    DfExpr::Literal(ScalarValue::Utf8(Some(replacement))).alias(&dst_label),
1828                    dst_label,
1829                )));
1830            }
1831        }
1832
1833        // Preprocess the regex:
1834        // https://github.com/prometheus/prometheus/blob/d902abc50d6652ba8fe9a81ff8e5cce936114eba/promql/functions.go#L1575C32-L1575C37
1835        let regex = format!("^(?s:{regex})$");
1836
1837        let func = session_state
1838            .scalar_functions()
1839            .get("regexp_replace")
1840            .context(UnsupportedExprSnafu {
1841                name: "regexp_replace",
1842            })?;
1843
1844        // regexp_replace(src_label, regex, replacement)
1845        let args = vec![
1846            if src_label.is_empty() {
1847                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())))
1848            } else {
1849                DfExpr::Column(Column::from_name(src_label))
1850            },
1851            DfExpr::Literal(ScalarValue::Utf8(Some(regex))),
1852            DfExpr::Literal(ScalarValue::Utf8(Some(replacement))),
1853        ];
1854
1855        Ok(Some((
1856            DfExpr::ScalarFunction(ScalarFunction {
1857                func: func.clone(),
1858                args,
1859            })
1860            .alias(&dst_label),
1861            dst_label,
1862        )))
1863    }
1864
1865    /// Build expr for `label_join` function
1866    fn build_concat_labels_expr(
1867        other_input_exprs: &mut VecDeque<DfExpr>,
1868        session_state: &SessionState,
1869    ) -> Result<(DfExpr, String)> {
1870        // label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)
1871
1872        let dst_label = match other_input_exprs.pop_front() {
1873            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1874            other => UnexpectedPlanExprSnafu {
1875                desc: format!("expected dst_label string literal, but found {:?}", other),
1876            }
1877            .fail()?,
1878        };
1879        let separator = match other_input_exprs.pop_front() {
1880            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1881            other => UnexpectedPlanExprSnafu {
1882                desc: format!("expected separator string literal, but found {:?}", other),
1883            }
1884            .fail()?,
1885        };
1886        let src_labels = other_input_exprs
1887            .clone()
1888            .into_iter()
1889            .map(|expr| {
1890                // Cast source label into column
1891                match expr {
1892                    DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
1893                        if label.is_empty() {
1894                            Ok(DfExpr::Literal(ScalarValue::Null))
1895                        } else {
1896                            Ok(DfExpr::Column(Column::from_name(label)))
1897                        }
1898                    }
1899                    other => UnexpectedPlanExprSnafu {
1900                        desc: format!(
1901                            "expected source label string literal, but found {:?}",
1902                            other
1903                        ),
1904                    }
1905                    .fail(),
1906                }
1907            })
1908            .collect::<Result<Vec<_>>>()?;
1909        ensure!(
1910            !src_labels.is_empty(),
1911            FunctionInvalidArgumentSnafu {
1912                fn_name: "label_join"
1913            }
1914        );
1915
1916        let func = session_state
1917            .scalar_functions()
1918            .get("concat_ws")
1919            .context(UnsupportedExprSnafu { name: "concat_ws" })?;
1920
1921        // concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
1922        let mut args = Vec::with_capacity(1 + src_labels.len());
1923        args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator))));
1924        args.extend(src_labels);
1925
1926        Ok((
1927            DfExpr::ScalarFunction(ScalarFunction {
1928                func: func.clone(),
1929                args,
1930            })
1931            .alias(&dst_label),
1932            dst_label,
1933        ))
1934    }
1935
1936    fn create_time_index_column_expr(&self) -> Result<DfExpr> {
1937        Ok(DfExpr::Column(Column::from_name(
1938            self.ctx
1939                .time_index_column
1940                .clone()
1941                .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
1942        )))
1943    }
1944
1945    fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
1946        let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
1947        for tag in &self.ctx.tag_columns {
1948            let expr = DfExpr::Column(Column::from_name(tag));
1949            result.push(expr);
1950        }
1951        Ok(result)
1952    }
1953
1954    fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
1955        let mut result = Vec::with_capacity(self.ctx.field_columns.len());
1956        for field in &self.ctx.field_columns {
1957            let expr = DfExpr::Column(Column::from_name(field));
1958            result.push(expr);
1959        }
1960        Ok(result)
1961    }
1962
1963    fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
1964        let mut result = self
1965            .ctx
1966            .tag_columns
1967            .iter()
1968            .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
1969            .collect::<Vec<_>>();
1970        result.push(self.create_time_index_column_expr()?.sort(true, true));
1971        Ok(result)
1972    }
1973
1974    fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
1975        self.ctx
1976            .field_columns
1977            .iter()
1978            .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
1979            .collect::<Vec<_>>()
1980    }
1981
1982    fn create_sort_exprs_by_tags(
1983        func: &str,
1984        tags: Vec<DfExpr>,
1985        asc: bool,
1986    ) -> Result<Vec<SortExpr>> {
1987        ensure!(
1988            !tags.is_empty(),
1989            FunctionInvalidArgumentSnafu { fn_name: func }
1990        );
1991
1992        tags.iter()
1993            .map(|col| match col {
1994                DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
1995                    Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
1996                }
1997                other => UnexpectedPlanExprSnafu {
1998                    desc: format!("expected label string literal, but found {:?}", other),
1999                }
2000                .fail(),
2001            })
2002            .collect::<Result<Vec<_>>>()
2003    }
2004
2005    fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2006        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2007        for value in &self.ctx.field_columns {
2008            let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2009            exprs.push(expr);
2010        }
2011
2012        conjunction(exprs).context(ValueNotFoundSnafu {
2013            table: self.table_ref()?.to_quoted_string(),
2014        })
2015    }
2016
2017    /// Creates a set of DataFusion `DfExpr::AggregateFunction` expressions for each value column using the specified aggregate function.
2018    ///
2019    /// # Side Effects
2020    ///
2021    /// This method modifies the value columns in the context by replacing them with the new columns
2022    /// created by the aggregate function application.
2023    ///
2024    /// # Returns
2025    ///
2026    /// Returns a tuple of `(aggregate_expressions, previous_field_expressions)` where:
2027    /// - `aggregate_expressions`: Expressions that apply the aggregate function to the original fields
2028    /// - `previous_field_expressions`: Original field expressions before aggregation. This is non-empty
2029    ///   only when the operation is `count_values`, as this operation requires preserving the original
2030    ///   values for grouping.
2031    ///
2032    fn create_aggregate_exprs(
2033        &mut self,
2034        op: TokenType,
2035        param: &Option<Box<PromExpr>>,
2036        input_plan: &LogicalPlan,
2037    ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2038        let mut non_col_args = Vec::new();
2039        let aggr = match op.id() {
2040            token::T_SUM => sum_udaf(),
2041            token::T_QUANTILE => {
2042                let q =
2043                    Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2044                non_col_args.push(q);
2045                quantile_udaf()
2046            }
2047            token::T_AVG => avg_udaf(),
2048            token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2049            token::T_MIN => min_udaf(),
2050            token::T_MAX => max_udaf(),
2051            token::T_GROUP => grouping_udaf(),
2052            token::T_STDDEV => stddev_pop_udaf(),
2053            token::T_STDVAR => var_pop_udaf(),
2054            token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2055                name: format!("{op:?}"),
2056            }
2057            .fail()?,
2058            _ => UnexpectedTokenSnafu { token: op }.fail()?,
2059        };
2060
2061        // perform aggregate operation to each value column
2062        let exprs: Vec<DfExpr> = self
2063            .ctx
2064            .field_columns
2065            .iter()
2066            .map(|col| {
2067                non_col_args.push(DfExpr::Column(Column::from_name(col)));
2068                let expr = aggr.call(non_col_args.clone());
2069                non_col_args.pop();
2070                expr
2071            })
2072            .collect::<Vec<_>>();
2073
2074        // if the aggregator is `count_values`, it must be grouped by current fields.
2075        let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2076            let prev_field_exprs: Vec<_> = self
2077                .ctx
2078                .field_columns
2079                .iter()
2080                .map(|col| DfExpr::Column(Column::from_name(col)))
2081                .collect();
2082
2083            ensure!(
2084                self.ctx.field_columns.len() == 1,
2085                UnsupportedExprSnafu {
2086                    name: "count_values on multi-value input"
2087                }
2088            );
2089
2090            prev_field_exprs
2091        } else {
2092            vec![]
2093        };
2094
2095        // update value column name according to the aggregators,
2096        let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2097
2098        let normalized_exprs =
2099            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2100        for expr in normalized_exprs {
2101            new_field_columns.push(expr.schema_name().to_string());
2102        }
2103        self.ctx.field_columns = new_field_columns;
2104
2105        Ok((exprs, prev_field_exprs))
2106    }
2107
2108    fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2109        let param = param
2110            .as_deref()
2111            .with_context(|| FunctionInvalidArgumentSnafu {
2112                fn_name: op.to_string(),
2113            })?;
2114        let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2115            return FunctionInvalidArgumentSnafu {
2116                fn_name: op.to_string(),
2117            }
2118            .fail();
2119        };
2120
2121        Ok(val)
2122    }
2123
2124    fn get_param_as_literal_expr(
2125        param: &Option<Box<PromExpr>>,
2126        op: Option<TokenType>,
2127        expected_type: Option<ArrowDataType>,
2128    ) -> Result<DfExpr> {
2129        let prom_param = param.as_deref().with_context(|| {
2130            if let Some(op) = op {
2131                FunctionInvalidArgumentSnafu {
2132                    fn_name: op.to_string(),
2133                }
2134            } else {
2135                FunctionInvalidArgumentSnafu {
2136                    fn_name: "unknown".to_string(),
2137                }
2138            }
2139        })?;
2140
2141        let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2142            if let Some(op) = op {
2143                FunctionInvalidArgumentSnafu {
2144                    fn_name: op.to_string(),
2145                }
2146            } else {
2147                FunctionInvalidArgumentSnafu {
2148                    fn_name: "unknown".to_string(),
2149                }
2150            }
2151        })?;
2152
2153        // check if the type is expected
2154        if let Some(expected_type) = expected_type {
2155            // literal should not have reference to column
2156            let expr_type = expr
2157                .get_type(&DFSchema::empty())
2158                .context(DataFusionPlanningSnafu)?;
2159            if expected_type != expr_type {
2160                return FunctionInvalidArgumentSnafu {
2161                    fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2162                }
2163                .fail();
2164            }
2165        }
2166
2167        Ok(expr)
2168    }
2169
2170    /// Create [DfExpr::WindowFunction] expr for each value column with given window function.
2171    ///
2172    fn create_window_exprs(
2173        &mut self,
2174        op: TokenType,
2175        group_exprs: Vec<DfExpr>,
2176        input_plan: &LogicalPlan,
2177    ) -> Result<Vec<DfExpr>> {
2178        ensure!(
2179            self.ctx.field_columns.len() == 1,
2180            UnsupportedExprSnafu {
2181                name: "topk or bottomk on multi-value input"
2182            }
2183        );
2184
2185        assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2186
2187        let asc = matches!(op.id(), token::T_BOTTOMK);
2188
2189        let tag_sort_exprs = self
2190            .create_tag_column_exprs()?
2191            .into_iter()
2192            .map(|expr| expr.sort(asc, true));
2193
2194        // perform window operation to each value column
2195        let exprs: Vec<DfExpr> = self
2196            .ctx
2197            .field_columns
2198            .iter()
2199            .map(|col| {
2200                let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2201                // Order by value in the specific order
2202                sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2203                // Then tags if the values are equal,
2204                // Try to ensure the relative stability of the output results.
2205                sort_exprs.extend(tag_sort_exprs.clone());
2206
2207                DfExpr::WindowFunction(WindowFunction {
2208                    fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2209                    args: vec![],
2210                    partition_by: group_exprs.clone(),
2211                    order_by: sort_exprs,
2212                    window_frame: WindowFrame::new(Some(true)),
2213                    null_treatment: None,
2214                })
2215            })
2216            .collect();
2217
2218        let normalized_exprs =
2219            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2220        Ok(normalized_exprs)
2221    }
2222
2223    /// Try to build a [f64] from [PromExpr].
2224    #[deprecated(
2225        note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2226    )]
2227    fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2228        match expr {
2229            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2230            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2231            PromExpr::Unary(UnaryExpr { expr, .. }) => {
2232                Self::try_build_float_literal(expr).map(|f| -f)
2233            }
2234            PromExpr::StringLiteral(_)
2235            | PromExpr::Binary(_)
2236            | PromExpr::VectorSelector(_)
2237            | PromExpr::MatrixSelector(_)
2238            | PromExpr::Call(_)
2239            | PromExpr::Extension(_)
2240            | PromExpr::Aggregate(_)
2241            | PromExpr::Subquery(_) => None,
2242        }
2243    }
2244
2245    /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
2246    async fn create_histogram_plan(
2247        &mut self,
2248        args: &PromFunctionArgs,
2249        session_state: &SessionState,
2250    ) -> Result<LogicalPlan> {
2251        if args.args.len() != 2 {
2252            return FunctionInvalidArgumentSnafu {
2253                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2254            }
2255            .fail();
2256        }
2257        #[allow(deprecated)]
2258        let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2259            FunctionInvalidArgumentSnafu {
2260                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2261            }
2262        })?;
2263
2264        let input = args.args[1].as_ref().clone();
2265        let input_plan = self.prom_expr_to_plan(&input, session_state).await?;
2266
2267        if !self.ctx.has_le_tag() {
2268            return ColumnNotFoundSnafu {
2269                col: LE_COLUMN_NAME.to_string(),
2270            }
2271            .fail();
2272        }
2273        let time_index_column =
2274            self.ctx
2275                .time_index_column
2276                .clone()
2277                .with_context(|| TimeIndexNotFoundSnafu {
2278                    table: self.ctx.table_name.clone().unwrap_or_default(),
2279                })?;
2280        // FIXME(ruihang): support multi fields
2281        let field_column = self
2282            .ctx
2283            .field_columns
2284            .first()
2285            .with_context(|| FunctionInvalidArgumentSnafu {
2286                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2287            })?
2288            .clone();
2289        // remove le column from tag columns
2290        self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2291
2292        Ok(LogicalPlan::Extension(Extension {
2293            node: Arc::new(
2294                HistogramFold::new(
2295                    LE_COLUMN_NAME.to_string(),
2296                    field_column,
2297                    time_index_column,
2298                    phi,
2299                    input_plan,
2300                )
2301                .context(DataFusionPlanningSnafu)?,
2302            ),
2303        }))
2304    }
2305
2306    /// Create a [SPECIAL_VECTOR_FUNCTION] plan
2307    async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2308        if args.args.len() != 1 {
2309            return FunctionInvalidArgumentSnafu {
2310                fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2311            }
2312            .fail();
2313        }
2314        let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
2315
2316        // reuse `SPECIAL_TIME_FUNCTION` as name of time index column
2317        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2318        self.ctx.reset_table_name_and_schema();
2319        self.ctx.tag_columns = vec![];
2320        self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()];
2321        Ok(LogicalPlan::Extension(Extension {
2322            node: Arc::new(
2323                EmptyMetric::new(
2324                    self.ctx.start,
2325                    self.ctx.end,
2326                    self.ctx.interval,
2327                    SPECIAL_TIME_FUNCTION.to_string(),
2328                    GREPTIME_VALUE.to_string(),
2329                    Some(lit),
2330                )
2331                .context(DataFusionPlanningSnafu)?,
2332            ),
2333        }))
2334    }
2335
2336    /// Create a [SCALAR_FUNCTION] plan
2337    async fn create_scalar_plan(
2338        &mut self,
2339        args: &PromFunctionArgs,
2340        session_state: &SessionState,
2341    ) -> Result<LogicalPlan> {
2342        ensure!(
2343            args.len() == 1,
2344            FunctionInvalidArgumentSnafu {
2345                fn_name: SCALAR_FUNCTION
2346            }
2347        );
2348        let input = self.prom_expr_to_plan(&args.args[0], session_state).await?;
2349        ensure!(
2350            self.ctx.field_columns.len() == 1,
2351            MultiFieldsNotSupportedSnafu {
2352                operator: SCALAR_FUNCTION
2353            },
2354        );
2355        let scalar_plan = LogicalPlan::Extension(Extension {
2356            node: Arc::new(
2357                ScalarCalculate::new(
2358                    self.ctx.start,
2359                    self.ctx.end,
2360                    self.ctx.interval,
2361                    input,
2362                    self.ctx.time_index_column.as_ref().unwrap(),
2363                    &self.ctx.tag_columns,
2364                    &self.ctx.field_columns[0],
2365                    self.ctx.table_name.as_deref(),
2366                )
2367                .context(PromqlPlanNodeSnafu)?,
2368            ),
2369        });
2370        // scalar plan have no tag columns
2371        self.ctx.tag_columns.clear();
2372        self.ctx.field_columns.clear();
2373        self.ctx
2374            .field_columns
2375            .push(scalar_plan.schema().field(1).name().clone());
2376        Ok(scalar_plan)
2377    }
2378
2379    /// Try to build a DataFusion Literal Expression from PromQL Expr, return
2380    /// `None` if the input is not a literal expression.
2381    fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2382        match expr {
2383            PromExpr::NumberLiteral(NumberLiteral { val }) => {
2384                let scalar_value = ScalarValue::Float64(Some(*val));
2385                Some(DfExpr::Literal(scalar_value))
2386            }
2387            PromExpr::StringLiteral(StringLiteral { val }) => {
2388                let scalar_value = ScalarValue::Utf8(Some(val.to_string()));
2389                Some(DfExpr::Literal(scalar_value))
2390            }
2391            PromExpr::VectorSelector(_)
2392            | PromExpr::MatrixSelector(_)
2393            | PromExpr::Extension(_)
2394            | PromExpr::Aggregate(_)
2395            | PromExpr::Subquery(_) => None,
2396            PromExpr::Call(Call { func, .. }) => {
2397                if func.name == SPECIAL_TIME_FUNCTION {
2398                    Some(build_special_time_expr(SPECIAL_TIME_FUNCTION))
2399                } else {
2400                    None
2401                }
2402            }
2403            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2404            // TODO(ruihang): support Unary operator
2405            PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2406            PromExpr::Binary(PromBinaryExpr {
2407                lhs,
2408                rhs,
2409                op,
2410                modifier,
2411            }) => {
2412                let lhs = Self::try_build_literal_expr(lhs)?;
2413                let rhs = Self::try_build_literal_expr(rhs)?;
2414                let is_comparison_op = Self::is_token_a_comparison_op(*op);
2415                let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2416                let expr = expr_builder(lhs, rhs).ok()?;
2417
2418                let should_return_bool = if let Some(m) = modifier {
2419                    m.return_bool
2420                } else {
2421                    false
2422                };
2423                if is_comparison_op && should_return_bool {
2424                    Some(DfExpr::Cast(Cast {
2425                        expr: Box::new(expr),
2426                        data_type: ArrowDataType::Float64,
2427                    }))
2428                } else {
2429                    Some(expr)
2430                }
2431            }
2432        }
2433    }
2434
2435    fn try_build_special_time_expr(expr: &PromExpr, time_index_col: &str) -> Option<DfExpr> {
2436        match expr {
2437            PromExpr::Call(Call { func, .. }) => {
2438                if func.name == SPECIAL_TIME_FUNCTION {
2439                    Some(build_special_time_expr(time_index_col))
2440                } else {
2441                    None
2442                }
2443            }
2444            _ => None,
2445        }
2446    }
2447
2448    /// Return a lambda to build binary expression from token.
2449    /// Because some binary operator are function in DataFusion like `atan2` or `^`.
2450    #[allow(clippy::type_complexity)]
2451    fn prom_token_to_binary_expr_builder(
2452        token: TokenType,
2453    ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2454        match token.id() {
2455            token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2456            token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2457            token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2458            token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2459            token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2460            token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2461            token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2462            token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2463            token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2464            token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2465            token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2466            token::T_POW => Ok(Box::new(|lhs, rhs| {
2467                Ok(DfExpr::ScalarFunction(ScalarFunction {
2468                    func: datafusion_functions::math::power(),
2469                    args: vec![lhs, rhs],
2470                }))
2471            })),
2472            token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2473                Ok(DfExpr::ScalarFunction(ScalarFunction {
2474                    func: datafusion_functions::math::atan2(),
2475                    args: vec![lhs, rhs],
2476                }))
2477            })),
2478            _ => UnexpectedTokenSnafu { token }.fail(),
2479        }
2480    }
2481
2482    /// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
2483    fn is_token_a_comparison_op(token: TokenType) -> bool {
2484        matches!(
2485            token.id(),
2486            token::T_EQLC
2487                | token::T_NEQ
2488                | token::T_GTR
2489                | token::T_LSS
2490                | token::T_GTE
2491                | token::T_LTE
2492        )
2493    }
2494
2495    /// Check if the given op is a set operator (UNION, INTERSECT and EXCEPT in SQL).
2496    fn is_token_a_set_op(token: TokenType) -> bool {
2497        matches!(
2498            token.id(),
2499            token::T_LAND // INTERSECT
2500                | token::T_LOR // UNION
2501                | token::T_LUNLESS // EXCEPT
2502        )
2503    }
2504
2505    /// Build a inner join on time index column and tag columns to concat two logical plans.
2506    /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
2507    #[allow(clippy::too_many_arguments)]
2508    fn join_on_non_field_columns(
2509        &self,
2510        left: LogicalPlan,
2511        right: LogicalPlan,
2512        left_table_ref: TableReference,
2513        right_table_ref: TableReference,
2514        left_time_index_column: Option<String>,
2515        right_time_index_column: Option<String>,
2516        only_join_time_index: bool,
2517        modifier: &Option<BinModifier>,
2518    ) -> Result<LogicalPlan> {
2519        let mut left_tag_columns = if only_join_time_index {
2520            BTreeSet::new()
2521        } else {
2522            self.ctx
2523                .tag_columns
2524                .iter()
2525                .cloned()
2526                .collect::<BTreeSet<_>>()
2527        };
2528        let mut right_tag_columns = left_tag_columns.clone();
2529
2530        // apply modifier
2531        if let Some(modifier) = modifier {
2532            // apply label modifier
2533            if let Some(matching) = &modifier.matching {
2534                match matching {
2535                    // keeps columns mentioned in `on`
2536                    LabelModifier::Include(on) => {
2537                        let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2538                        left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2539                        right_tag_columns =
2540                            right_tag_columns.intersection(&mask).cloned().collect();
2541                    }
2542                    // removes columns memtioned in `ignoring`
2543                    LabelModifier::Exclude(ignoring) => {
2544                        // doesn't check existence of label
2545                        for label in &ignoring.labels {
2546                            let _ = left_tag_columns.remove(label);
2547                            let _ = right_tag_columns.remove(label);
2548                        }
2549                    }
2550                }
2551            }
2552        }
2553
2554        // push time index column if it exists
2555        if let (Some(left_time_index_column), Some(right_time_index_column)) =
2556            (left_time_index_column, right_time_index_column)
2557        {
2558            left_tag_columns.insert(left_time_index_column);
2559            right_tag_columns.insert(right_time_index_column);
2560        }
2561
2562        let right = LogicalPlanBuilder::from(right)
2563            .alias(right_table_ref)
2564            .context(DataFusionPlanningSnafu)?
2565            .build()
2566            .context(DataFusionPlanningSnafu)?;
2567
2568        // Inner Join on time index column to concat two operator
2569        LogicalPlanBuilder::from(left)
2570            .alias(left_table_ref)
2571            .context(DataFusionPlanningSnafu)?
2572            .join_detailed(
2573                right,
2574                JoinType::Inner,
2575                (
2576                    left_tag_columns
2577                        .into_iter()
2578                        .map(Column::from_name)
2579                        .collect::<Vec<_>>(),
2580                    right_tag_columns
2581                        .into_iter()
2582                        .map(Column::from_name)
2583                        .collect::<Vec<_>>(),
2584                ),
2585                None,
2586                true,
2587            )
2588            .context(DataFusionPlanningSnafu)?
2589            .build()
2590            .context(DataFusionPlanningSnafu)
2591    }
2592
2593    /// Build a set operator (AND/OR/UNLESS)
2594    fn set_op_on_non_field_columns(
2595        &mut self,
2596        left: LogicalPlan,
2597        mut right: LogicalPlan,
2598        left_context: PromPlannerContext,
2599        right_context: PromPlannerContext,
2600        op: TokenType,
2601        modifier: &Option<BinModifier>,
2602    ) -> Result<LogicalPlan> {
2603        let mut left_tag_col_set = left_context
2604            .tag_columns
2605            .iter()
2606            .cloned()
2607            .collect::<HashSet<_>>();
2608        let mut right_tag_col_set = right_context
2609            .tag_columns
2610            .iter()
2611            .cloned()
2612            .collect::<HashSet<_>>();
2613
2614        if matches!(op.id(), token::T_LOR) {
2615            return self.or_operator(
2616                left,
2617                right,
2618                left_tag_col_set,
2619                right_tag_col_set,
2620                left_context,
2621                right_context,
2622                modifier,
2623            );
2624        }
2625
2626        // apply modifier
2627        if let Some(modifier) = modifier {
2628            // one-to-many and many-to-one are not supported
2629            ensure!(
2630                matches!(
2631                    modifier.card,
2632                    VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2633                ),
2634                UnsupportedVectorMatchSnafu {
2635                    name: modifier.card.clone(),
2636                },
2637            );
2638            // apply label modifier
2639            if let Some(matching) = &modifier.matching {
2640                match matching {
2641                    // keeps columns mentioned in `on`
2642                    LabelModifier::Include(on) => {
2643                        let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2644                        left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2645                        right_tag_col_set =
2646                            right_tag_col_set.intersection(&mask).cloned().collect();
2647                    }
2648                    // removes columns memtioned in `ignoring`
2649                    LabelModifier::Exclude(ignoring) => {
2650                        // doesn't check existence of label
2651                        for label in &ignoring.labels {
2652                            let _ = left_tag_col_set.remove(label);
2653                            let _ = right_tag_col_set.remove(label);
2654                        }
2655                    }
2656                }
2657            }
2658        }
2659        // ensure two sides have the same tag columns
2660        if !matches!(op.id(), token::T_LOR) {
2661            ensure!(
2662                left_tag_col_set == right_tag_col_set,
2663                CombineTableColumnMismatchSnafu {
2664                    left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2665                    right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2666                }
2667            )
2668        };
2669        let left_time_index = left_context.time_index_column.clone().unwrap();
2670        let right_time_index = right_context.time_index_column.clone().unwrap();
2671        let join_keys = left_tag_col_set
2672            .iter()
2673            .cloned()
2674            .chain([left_time_index.clone()])
2675            .collect::<Vec<_>>();
2676        self.ctx.time_index_column = Some(left_time_index.clone());
2677
2678        // alias right time index column if necessary
2679        if left_context.time_index_column != right_context.time_index_column {
2680            let right_project_exprs = right
2681                .schema()
2682                .fields()
2683                .iter()
2684                .map(|field| {
2685                    if field.name() == &right_time_index {
2686                        DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
2687                    } else {
2688                        DfExpr::Column(Column::from_name(field.name()))
2689                    }
2690                })
2691                .collect::<Vec<_>>();
2692
2693            right = LogicalPlanBuilder::from(right)
2694                .project(right_project_exprs)
2695                .context(DataFusionPlanningSnafu)?
2696                .build()
2697                .context(DataFusionPlanningSnafu)?;
2698        }
2699
2700        ensure!(
2701            left_context.field_columns.len() == 1,
2702            MultiFieldsNotSupportedSnafu {
2703                operator: "AND operator"
2704            }
2705        );
2706        // Update the field column in context.
2707        // The AND/UNLESS operator only keep the field column in left input.
2708        let left_field_col = left_context.field_columns.first().unwrap();
2709        self.ctx.field_columns = vec![left_field_col.clone()];
2710
2711        // Generate join plan.
2712        // All set operations in PromQL are "distinct"
2713        match op.id() {
2714            token::T_LAND => LogicalPlanBuilder::from(left)
2715                .distinct()
2716                .context(DataFusionPlanningSnafu)?
2717                .join_detailed(
2718                    right,
2719                    JoinType::LeftSemi,
2720                    (join_keys.clone(), join_keys),
2721                    None,
2722                    true,
2723                )
2724                .context(DataFusionPlanningSnafu)?
2725                .build()
2726                .context(DataFusionPlanningSnafu),
2727            token::T_LUNLESS => LogicalPlanBuilder::from(left)
2728                .distinct()
2729                .context(DataFusionPlanningSnafu)?
2730                .join_detailed(
2731                    right,
2732                    JoinType::LeftAnti,
2733                    (join_keys.clone(), join_keys),
2734                    None,
2735                    true,
2736                )
2737                .context(DataFusionPlanningSnafu)?
2738                .build()
2739                .context(DataFusionPlanningSnafu),
2740            token::T_LOR => {
2741                // OR is handled at the beginning of this function, as it cannot
2742                // be expressed using JOIN like AND and UNLESS.
2743                unreachable!()
2744            }
2745            _ => UnexpectedTokenSnafu { token: op }.fail(),
2746        }
2747    }
2748
2749    // TODO(ruihang): change function name
2750    #[allow(clippy::too_many_arguments)]
2751    fn or_operator(
2752        &mut self,
2753        left: LogicalPlan,
2754        right: LogicalPlan,
2755        left_tag_cols_set: HashSet<String>,
2756        right_tag_cols_set: HashSet<String>,
2757        left_context: PromPlannerContext,
2758        right_context: PromPlannerContext,
2759        modifier: &Option<BinModifier>,
2760    ) -> Result<LogicalPlan> {
2761        // checks
2762        ensure!(
2763            left_context.field_columns.len() == right_context.field_columns.len(),
2764            CombineTableColumnMismatchSnafu {
2765                left: left_context.field_columns.clone(),
2766                right: right_context.field_columns.clone()
2767            }
2768        );
2769        ensure!(
2770            left_context.field_columns.len() == 1,
2771            MultiFieldsNotSupportedSnafu {
2772                operator: "OR operator"
2773            }
2774        );
2775
2776        // prepare hash sets
2777        let all_tags = left_tag_cols_set
2778            .union(&right_tag_cols_set)
2779            .cloned()
2780            .collect::<HashSet<_>>();
2781        let tags_not_in_left = all_tags
2782            .difference(&left_tag_cols_set)
2783            .cloned()
2784            .collect::<Vec<_>>();
2785        let tags_not_in_right = all_tags
2786            .difference(&right_tag_cols_set)
2787            .cloned()
2788            .collect::<Vec<_>>();
2789        let left_qualifier = left.schema().qualified_field(0).0.cloned();
2790        let right_qualifier = right.schema().qualified_field(0).0.cloned();
2791        let left_qualifier_string = left_qualifier
2792            .as_ref()
2793            .map(|l| l.to_string())
2794            .unwrap_or_default();
2795        let right_qualifier_string = right_qualifier
2796            .as_ref()
2797            .map(|r| r.to_string())
2798            .unwrap_or_default();
2799        let left_time_index_column =
2800            left_context
2801                .time_index_column
2802                .clone()
2803                .with_context(|| TimeIndexNotFoundSnafu {
2804                    table: left_qualifier_string.clone(),
2805                })?;
2806        let right_time_index_column =
2807            right_context
2808                .time_index_column
2809                .clone()
2810                .with_context(|| TimeIndexNotFoundSnafu {
2811                    table: right_qualifier_string.clone(),
2812                })?;
2813        // Take the name of first field column. The length is checked above.
2814        let left_field_col = left_context.field_columns.first().unwrap();
2815        let right_field_col = right_context.field_columns.first().unwrap();
2816
2817        // step 0: fill all columns in output schema
2818        let mut all_columns_set = left
2819            .schema()
2820            .fields()
2821            .iter()
2822            .chain(right.schema().fields().iter())
2823            .map(|field| field.name().clone())
2824            .collect::<HashSet<_>>();
2825        // remove time index column
2826        all_columns_set.remove(&left_time_index_column);
2827        all_columns_set.remove(&right_time_index_column);
2828        // remove field column in the right
2829        if left_field_col != right_field_col {
2830            all_columns_set.remove(right_field_col);
2831        }
2832        let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
2833        // sort to ensure the generated schema is not volatile
2834        all_columns.sort_unstable();
2835        // use left time index column name as the result time index column name
2836        all_columns.insert(0, left_time_index_column.clone());
2837
2838        // step 1: align schema using project, fill non-exist columns with null
2839        let left_proj_exprs = all_columns.iter().map(|col| {
2840            if tags_not_in_left.contains(col) {
2841                DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
2842            } else {
2843                DfExpr::Column(Column::new(None::<String>, col))
2844            }
2845        });
2846        let right_time_index_expr = DfExpr::Column(Column::new(
2847            right_qualifier.clone(),
2848            right_time_index_column,
2849        ))
2850        .alias(left_time_index_column.clone());
2851        // The field column in right side may not have qualifier (it may be removed by join operation),
2852        // so we need to find it from the schema.
2853        let right_qualifier_for_field = right
2854            .schema()
2855            .iter()
2856            .find(|(_, f)| f.name() == right_field_col)
2857            .map(|(q, _)| q)
2858            .context(ColumnNotFoundSnafu {
2859                col: right_field_col.to_string(),
2860            })?
2861            .cloned();
2862
2863        // `skip(1)` to skip the time index column
2864        let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
2865            // expr
2866            if col == left_field_col && left_field_col != right_field_col {
2867                // qualify field in right side if necessary to handle different field name
2868                DfExpr::Column(Column::new(
2869                    right_qualifier_for_field.clone(),
2870                    right_field_col,
2871                ))
2872            } else if tags_not_in_right.contains(col) {
2873                DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
2874            } else {
2875                DfExpr::Column(Column::new(None::<String>, col))
2876            }
2877        });
2878        let right_proj_exprs = [right_time_index_expr]
2879            .into_iter()
2880            .chain(right_proj_exprs_without_time_index);
2881
2882        let left_projected = LogicalPlanBuilder::from(left)
2883            .project(left_proj_exprs)
2884            .context(DataFusionPlanningSnafu)?
2885            .alias(left_qualifier_string.clone())
2886            .context(DataFusionPlanningSnafu)?
2887            .build()
2888            .context(DataFusionPlanningSnafu)?;
2889        let right_projected = LogicalPlanBuilder::from(right)
2890            .project(right_proj_exprs)
2891            .context(DataFusionPlanningSnafu)?
2892            .alias(right_qualifier_string.clone())
2893            .context(DataFusionPlanningSnafu)?
2894            .build()
2895            .context(DataFusionPlanningSnafu)?;
2896
2897        // step 2: compute match columns
2898        let mut match_columns = if let Some(modifier) = modifier
2899            && let Some(matching) = &modifier.matching
2900        {
2901            match matching {
2902                // keeps columns mentioned in `on`
2903                LabelModifier::Include(on) => on.labels.clone(),
2904                // removes columns memtioned in `ignoring`
2905                LabelModifier::Exclude(ignoring) => {
2906                    let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
2907                    all_tags.difference(&ignoring).cloned().collect()
2908                }
2909            }
2910        } else {
2911            all_tags.iter().cloned().collect()
2912        };
2913        // sort to ensure the generated plan is not volatile
2914        match_columns.sort_unstable();
2915        // step 3: build `UnionDistinctOn` plan
2916        let schema = left_projected.schema().clone();
2917        let union_distinct_on = UnionDistinctOn::new(
2918            left_projected,
2919            right_projected,
2920            match_columns,
2921            left_time_index_column.clone(),
2922            schema,
2923        );
2924        let result = LogicalPlan::Extension(Extension {
2925            node: Arc::new(union_distinct_on),
2926        });
2927
2928        // step 4: update context
2929        self.ctx.time_index_column = Some(left_time_index_column);
2930        self.ctx.tag_columns = all_tags.into_iter().collect();
2931        self.ctx.field_columns = vec![left_field_col.to_string()];
2932
2933        Ok(result)
2934    }
2935
2936    /// Build a projection that project and perform operation expr for every value columns.
2937    /// Non-value columns (tag and timestamp) will be preserved in the projection.
2938    ///
2939    /// # Side effect
2940    ///
2941    /// This function will update the value columns in the context. Those new column names
2942    /// don't contains qualifier.
2943    fn projection_for_each_field_column<F>(
2944        &mut self,
2945        input: LogicalPlan,
2946        name_to_expr: F,
2947    ) -> Result<LogicalPlan>
2948    where
2949        F: FnMut(&String) -> Result<DfExpr>,
2950    {
2951        let non_field_columns_iter = self
2952            .ctx
2953            .tag_columns
2954            .iter()
2955            .chain(self.ctx.time_index_column.iter())
2956            .map(|col| {
2957                Ok(DfExpr::Column(Column::new(
2958                    self.ctx.table_name.clone().map(TableReference::bare),
2959                    col,
2960                )))
2961            });
2962
2963        // build computation exprs
2964        let result_field_columns = self
2965            .ctx
2966            .field_columns
2967            .iter()
2968            .map(name_to_expr)
2969            .collect::<Result<Vec<_>>>()?;
2970
2971        // alias the computation exprs to remove qualifier
2972        self.ctx.field_columns = result_field_columns
2973            .iter()
2974            .map(|expr| expr.schema_name().to_string())
2975            .collect();
2976        let field_columns_iter = result_field_columns
2977            .into_iter()
2978            .zip(self.ctx.field_columns.iter())
2979            .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
2980
2981        // chain non-field columns (unchanged) and field columns (applied computation then alias)
2982        let project_fields = non_field_columns_iter
2983            .chain(field_columns_iter)
2984            .collect::<Result<Vec<_>>>()?;
2985
2986        LogicalPlanBuilder::from(input)
2987            .project(project_fields)
2988            .context(DataFusionPlanningSnafu)?
2989            .build()
2990            .context(DataFusionPlanningSnafu)
2991    }
2992
2993    /// Build a filter plan that filter on value column. Notice that only one value column
2994    /// is expected.
2995    fn filter_on_field_column<F>(
2996        &self,
2997        input: LogicalPlan,
2998        mut name_to_expr: F,
2999    ) -> Result<LogicalPlan>
3000    where
3001        F: FnMut(&String) -> Result<DfExpr>,
3002    {
3003        ensure!(
3004            self.ctx.field_columns.len() == 1,
3005            UnsupportedExprSnafu {
3006                name: "filter on multi-value input"
3007            }
3008        );
3009
3010        let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3011
3012        LogicalPlanBuilder::from(input)
3013            .filter(field_column_filter)
3014            .context(DataFusionPlanningSnafu)?
3015            .build()
3016            .context(DataFusionPlanningSnafu)
3017    }
3018
3019    /// Generate an expr like `date_part("hour", <TIME_INDEX>)`. Caller should ensure the
3020    /// time index column in context is set
3021    fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3022        let lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some(date_part.to_string())));
3023        let input_expr = datafusion::logical_expr::col(
3024            self.ctx
3025                .time_index_column
3026                .as_ref()
3027                // table name doesn't matters here
3028                .with_context(|| TimeIndexNotFoundSnafu {
3029                    table: "<doesn't matter>",
3030                })?,
3031        );
3032        let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3033            func: datafusion_functions::datetime::date_part(),
3034            args: vec![lit_expr, input_expr],
3035        });
3036        Ok(fn_expr)
3037    }
3038}
3039
3040#[derive(Default, Debug)]
3041struct FunctionArgs {
3042    input: Option<PromExpr>,
3043    literals: Vec<DfExpr>,
3044}
3045
3046#[derive(Debug, Clone)]
3047enum ScalarFunc {
3048    DataFusionBuiltin(Arc<ScalarUdfDef>),
3049    /// The UDF that is defined by Datafusion itself.
3050    DataFusionUdf(Arc<ScalarUdfDef>),
3051    Udf(Arc<ScalarUdfDef>),
3052    // todo(ruihang): maybe merge with Udf later
3053    /// UDF that require extra information like range length to be evaluated.
3054    /// The second argument is range length.
3055    ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3056    /// Func that doesn't require input, like `time()`.
3057    GeneratedExpr,
3058}
3059
3060#[cfg(test)]
3061mod test {
3062    use std::time::{Duration, UNIX_EPOCH};
3063
3064    use catalog::memory::MemoryCatalogManager;
3065    use catalog::RegisterTableRequest;
3066    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3067    use common_query::test_util::DummyDecoder;
3068    use datafusion::execution::SessionStateBuilder;
3069    use datatypes::prelude::ConcreteDataType;
3070    use datatypes::schema::{ColumnSchema, Schema};
3071    use promql_parser::label::Labels;
3072    use promql_parser::parser;
3073    use session::context::QueryContext;
3074    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3075    use table::test_util::EmptyTable;
3076
3077    use super::*;
3078
3079    fn build_session_state() -> SessionState {
3080        SessionStateBuilder::new().with_default_features().build()
3081    }
3082
3083    async fn build_test_table_provider(
3084        table_name_tuples: &[(String, String)],
3085        num_tag: usize,
3086        num_field: usize,
3087    ) -> DfTableSourceProvider {
3088        let catalog_list = MemoryCatalogManager::with_default_setup();
3089        for (schema_name, table_name) in table_name_tuples {
3090            let mut columns = vec![];
3091            for i in 0..num_tag {
3092                columns.push(ColumnSchema::new(
3093                    format!("tag_{i}"),
3094                    ConcreteDataType::string_datatype(),
3095                    false,
3096                ));
3097            }
3098            columns.push(
3099                ColumnSchema::new(
3100                    "timestamp".to_string(),
3101                    ConcreteDataType::timestamp_millisecond_datatype(),
3102                    false,
3103                )
3104                .with_time_index(true),
3105            );
3106            for i in 0..num_field {
3107                columns.push(ColumnSchema::new(
3108                    format!("field_{i}"),
3109                    ConcreteDataType::float64_datatype(),
3110                    true,
3111                ));
3112            }
3113            let schema = Arc::new(Schema::new(columns));
3114            let table_meta = TableMetaBuilder::empty()
3115                .schema(schema)
3116                .primary_key_indices((0..num_tag).collect())
3117                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3118                .next_column_id(1024)
3119                .build()
3120                .unwrap();
3121            let table_info = TableInfoBuilder::default()
3122                .name(table_name.to_string())
3123                .meta(table_meta)
3124                .build()
3125                .unwrap();
3126            let table = EmptyTable::from_table_info(&table_info);
3127
3128            assert!(catalog_list
3129                .register_table_sync(RegisterTableRequest {
3130                    catalog: DEFAULT_CATALOG_NAME.to_string(),
3131                    schema: schema_name.to_string(),
3132                    table_name: table_name.to_string(),
3133                    table_id: 1024,
3134                    table,
3135                })
3136                .is_ok());
3137        }
3138
3139        DfTableSourceProvider::new(
3140            catalog_list,
3141            false,
3142            QueryContext::arc(),
3143            DummyDecoder::arc(),
3144            false,
3145        )
3146    }
3147
3148    async fn build_test_table_provider_with_fields(
3149        table_name_tuples: &[(String, String)],
3150        tags: &[&str],
3151    ) -> DfTableSourceProvider {
3152        let catalog_list = MemoryCatalogManager::with_default_setup();
3153        for (schema_name, table_name) in table_name_tuples {
3154            let mut columns = vec![];
3155            let num_tag = tags.len();
3156            for tag in tags {
3157                columns.push(ColumnSchema::new(
3158                    tag.to_string(),
3159                    ConcreteDataType::string_datatype(),
3160                    false,
3161                ));
3162            }
3163            columns.push(
3164                ColumnSchema::new(
3165                    "greptime_timestamp".to_string(),
3166                    ConcreteDataType::timestamp_millisecond_datatype(),
3167                    false,
3168                )
3169                .with_time_index(true),
3170            );
3171            columns.push(ColumnSchema::new(
3172                "greptime_value".to_string(),
3173                ConcreteDataType::float64_datatype(),
3174                true,
3175            ));
3176            let schema = Arc::new(Schema::new(columns));
3177            let table_meta = TableMetaBuilder::empty()
3178                .schema(schema)
3179                .primary_key_indices((0..num_tag).collect())
3180                .next_column_id(1024)
3181                .build()
3182                .unwrap();
3183            let table_info = TableInfoBuilder::default()
3184                .name(table_name.to_string())
3185                .meta(table_meta)
3186                .build()
3187                .unwrap();
3188            let table = EmptyTable::from_table_info(&table_info);
3189
3190            assert!(catalog_list
3191                .register_table_sync(RegisterTableRequest {
3192                    catalog: DEFAULT_CATALOG_NAME.to_string(),
3193                    schema: schema_name.to_string(),
3194                    table_name: table_name.to_string(),
3195                    table_id: 1024,
3196                    table,
3197                })
3198                .is_ok());
3199        }
3200
3201        DfTableSourceProvider::new(
3202            catalog_list,
3203            false,
3204            QueryContext::arc(),
3205            DummyDecoder::arc(),
3206            false,
3207        )
3208    }
3209
3210    // {
3211    //     input: `abs(some_metric{foo!="bar"})`,
3212    //     expected: &Call{
3213    //         Func: MustGetFunction("abs"),
3214    //         Args: Expressions{
3215    //             &VectorSelector{
3216    //                 Name: "some_metric",
3217    //                 LabelMatchers: []*labels.Matcher{
3218    //                     MustLabelMatcher(labels.MatchNotEqual, "foo", "bar"),
3219    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3220    //                 },
3221    //             },
3222    //         },
3223    //     },
3224    // },
3225    async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3226        let prom_expr =
3227            parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3228        let eval_stmt = EvalStmt {
3229            expr: prom_expr,
3230            start: UNIX_EPOCH,
3231            end: UNIX_EPOCH
3232                .checked_add(Duration::from_secs(100_000))
3233                .unwrap(),
3234            interval: Duration::from_secs(5),
3235            lookback_delta: Duration::from_secs(1),
3236        };
3237
3238        let table_provider = build_test_table_provider(
3239            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3240            1,
3241            1,
3242        )
3243        .await;
3244        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3245            .await
3246            .unwrap();
3247
3248        let expected = String::from(
3249            "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3250            \n  Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3251            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3252            \n      PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3253            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3254            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3255            \n            TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3256        ).replace("TEMPLATE", plan_name);
3257
3258        assert_eq!(plan.display_indent_schema().to_string(), expected);
3259    }
3260
3261    #[tokio::test]
3262    async fn single_abs() {
3263        do_single_instant_function_call("abs", "abs").await;
3264    }
3265
3266    #[tokio::test]
3267    #[should_panic]
3268    async fn single_absent() {
3269        do_single_instant_function_call("absent", "").await;
3270    }
3271
3272    #[tokio::test]
3273    async fn single_ceil() {
3274        do_single_instant_function_call("ceil", "ceil").await;
3275    }
3276
3277    #[tokio::test]
3278    async fn single_exp() {
3279        do_single_instant_function_call("exp", "exp").await;
3280    }
3281
3282    #[tokio::test]
3283    async fn single_ln() {
3284        do_single_instant_function_call("ln", "ln").await;
3285    }
3286
3287    #[tokio::test]
3288    async fn single_log2() {
3289        do_single_instant_function_call("log2", "log2").await;
3290    }
3291
3292    #[tokio::test]
3293    async fn single_log10() {
3294        do_single_instant_function_call("log10", "log10").await;
3295    }
3296
3297    #[tokio::test]
3298    #[should_panic]
3299    async fn single_scalar() {
3300        do_single_instant_function_call("scalar", "").await;
3301    }
3302
3303    #[tokio::test]
3304    #[should_panic]
3305    async fn single_sgn() {
3306        do_single_instant_function_call("sgn", "").await;
3307    }
3308
3309    #[tokio::test]
3310    #[should_panic]
3311    async fn single_sort() {
3312        do_single_instant_function_call("sort", "").await;
3313    }
3314
3315    #[tokio::test]
3316    #[should_panic]
3317    async fn single_sort_desc() {
3318        do_single_instant_function_call("sort_desc", "").await;
3319    }
3320
3321    #[tokio::test]
3322    async fn single_sqrt() {
3323        do_single_instant_function_call("sqrt", "sqrt").await;
3324    }
3325
3326    #[tokio::test]
3327    #[should_panic]
3328    async fn single_timestamp() {
3329        do_single_instant_function_call("timestamp", "").await;
3330    }
3331
3332    #[tokio::test]
3333    async fn single_acos() {
3334        do_single_instant_function_call("acos", "acos").await;
3335    }
3336
3337    #[tokio::test]
3338    #[should_panic]
3339    async fn single_acosh() {
3340        do_single_instant_function_call("acosh", "").await;
3341    }
3342
3343    #[tokio::test]
3344    async fn single_asin() {
3345        do_single_instant_function_call("asin", "asin").await;
3346    }
3347
3348    #[tokio::test]
3349    #[should_panic]
3350    async fn single_asinh() {
3351        do_single_instant_function_call("asinh", "").await;
3352    }
3353
3354    #[tokio::test]
3355    async fn single_atan() {
3356        do_single_instant_function_call("atan", "atan").await;
3357    }
3358
3359    #[tokio::test]
3360    #[should_panic]
3361    async fn single_atanh() {
3362        do_single_instant_function_call("atanh", "").await;
3363    }
3364
3365    #[tokio::test]
3366    async fn single_cos() {
3367        do_single_instant_function_call("cos", "cos").await;
3368    }
3369
3370    #[tokio::test]
3371    #[should_panic]
3372    async fn single_cosh() {
3373        do_single_instant_function_call("cosh", "").await;
3374    }
3375
3376    #[tokio::test]
3377    async fn single_sin() {
3378        do_single_instant_function_call("sin", "sin").await;
3379    }
3380
3381    #[tokio::test]
3382    #[should_panic]
3383    async fn single_sinh() {
3384        do_single_instant_function_call("sinh", "").await;
3385    }
3386
3387    #[tokio::test]
3388    async fn single_tan() {
3389        do_single_instant_function_call("tan", "tan").await;
3390    }
3391
3392    #[tokio::test]
3393    #[should_panic]
3394    async fn single_tanh() {
3395        do_single_instant_function_call("tanh", "").await;
3396    }
3397
3398    #[tokio::test]
3399    #[should_panic]
3400    async fn single_deg() {
3401        do_single_instant_function_call("deg", "").await;
3402    }
3403
3404    #[tokio::test]
3405    #[should_panic]
3406    async fn single_rad() {
3407        do_single_instant_function_call("rad", "").await;
3408    }
3409
3410    // {
3411    //     input: "avg by (foo)(some_metric)",
3412    //     expected: &AggregateExpr{
3413    //         Op: AVG,
3414    //         Expr: &VectorSelector{
3415    //             Name: "some_metric",
3416    //             LabelMatchers: []*labels.Matcher{
3417    //                 MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3418    //             },
3419    //             PosRange: PositionRange{
3420    //                 Start: 13,
3421    //                 End:   24,
3422    //             },
3423    //         },
3424    //         Grouping: []string{"foo"},
3425    //         PosRange: PositionRange{
3426    //             Start: 0,
3427    //             End:   25,
3428    //         },
3429    //     },
3430    // },
3431    async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3432        let prom_expr = parser::parse(&format!(
3433            "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3434        ))
3435        .unwrap();
3436        let mut eval_stmt = EvalStmt {
3437            expr: prom_expr,
3438            start: UNIX_EPOCH,
3439            end: UNIX_EPOCH
3440                .checked_add(Duration::from_secs(100_000))
3441                .unwrap(),
3442            interval: Duration::from_secs(5),
3443            lookback_delta: Duration::from_secs(1),
3444        };
3445
3446        // test group by
3447        let table_provider = build_test_table_provider(
3448            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3449            2,
3450            2,
3451        )
3452        .await;
3453        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3454            .await
3455            .unwrap();
3456        let expected_no_without = String::from(
3457            "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3458            \n  Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3459            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3460            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3461            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3462            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3463            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3464        ).replace("TEMPLATE", plan_name);
3465        assert_eq!(
3466            plan.display_indent_schema().to_string(),
3467            expected_no_without
3468        );
3469
3470        // test group without
3471        if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3472            *modifier = Some(LabelModifier::Exclude(Labels {
3473                labels: vec![String::from("tag_1")].into_iter().collect(),
3474            }));
3475        }
3476        let table_provider = build_test_table_provider(
3477            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3478            2,
3479            2,
3480        )
3481        .await;
3482        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3483            .await
3484            .unwrap();
3485        let expected_without = String::from(
3486            "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3487            \n  Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3488            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3489            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3490            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3491            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3492            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3493        ).replace("TEMPLATE", plan_name);
3494        assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3495    }
3496
3497    #[tokio::test]
3498    async fn aggregate_sum() {
3499        do_aggregate_expr_plan("sum", "sum").await;
3500    }
3501
3502    #[tokio::test]
3503    async fn aggregate_avg() {
3504        do_aggregate_expr_plan("avg", "avg").await;
3505    }
3506
3507    #[tokio::test]
3508    #[should_panic] // output type doesn't match
3509    async fn aggregate_count() {
3510        do_aggregate_expr_plan("count", "count").await;
3511    }
3512
3513    #[tokio::test]
3514    async fn aggregate_min() {
3515        do_aggregate_expr_plan("min", "min").await;
3516    }
3517
3518    #[tokio::test]
3519    async fn aggregate_max() {
3520        do_aggregate_expr_plan("max", "max").await;
3521    }
3522
3523    #[tokio::test]
3524    #[should_panic] // output type doesn't match
3525    async fn aggregate_group() {
3526        do_aggregate_expr_plan("grouping", "GROUPING").await;
3527    }
3528
3529    #[tokio::test]
3530    async fn aggregate_stddev() {
3531        do_aggregate_expr_plan("stddev", "stddev_pop").await;
3532    }
3533
3534    #[tokio::test]
3535    async fn aggregate_stdvar() {
3536        do_aggregate_expr_plan("stdvar", "var_pop").await;
3537    }
3538
3539    // TODO(ruihang): add range fn tests once exprs are ready.
3540
3541    // {
3542    //     input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}",
3543    //     expected: &BinaryExpr{
3544    //         Op: ADD,
3545    //         LHS: &VectorSelector{
3546    //             Name: "a",
3547    //             LabelMatchers: []*labels.Matcher{
3548    //                     MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"),
3549    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3550    //             },
3551    //         },
3552    //         RHS: &VectorSelector{
3553    //             Name: "sum",
3554    //             LabelMatchers: []*labels.Matcher{
3555    //                     MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"),
3556    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3557    //             },
3558    //         },
3559    //         VectorMatching: &VectorMatching{},
3560    //     },
3561    // },
3562    #[tokio::test]
3563    async fn binary_op_column_column() {
3564        let prom_expr =
3565            parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3566        let eval_stmt = EvalStmt {
3567            expr: prom_expr,
3568            start: UNIX_EPOCH,
3569            end: UNIX_EPOCH
3570                .checked_add(Duration::from_secs(100_000))
3571                .unwrap(),
3572            interval: Duration::from_secs(5),
3573            lookback_delta: Duration::from_secs(1),
3574        };
3575
3576        let table_provider = build_test_table_provider(
3577            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3578            1,
3579            1,
3580        )
3581        .await;
3582        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3583            .await
3584            .unwrap();
3585
3586        let  expected = String::from(
3587            "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3588            \n  Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3589            \n    SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3590            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3591            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3592            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3593            \n            Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3594            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3595            \n    SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3596            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3597            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3598            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3599            \n            Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3600            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3601        );
3602
3603        assert_eq!(plan.display_indent_schema().to_string(), expected);
3604    }
3605
3606    async fn indie_query_plan_compare(query: &str, expected: String) {
3607        let prom_expr = parser::parse(query).unwrap();
3608        let eval_stmt = EvalStmt {
3609            expr: prom_expr,
3610            start: UNIX_EPOCH,
3611            end: UNIX_EPOCH
3612                .checked_add(Duration::from_secs(100_000))
3613                .unwrap(),
3614            interval: Duration::from_secs(5),
3615            lookback_delta: Duration::from_secs(1),
3616        };
3617
3618        let table_provider = build_test_table_provider(
3619            &[
3620                (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
3621                (
3622                    "greptime_private".to_string(),
3623                    "some_alt_metric".to_string(),
3624                ),
3625            ],
3626            1,
3627            1,
3628        )
3629        .await;
3630        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3631            .await
3632            .unwrap();
3633
3634        assert_eq!(plan.display_indent_schema().to_string(), expected);
3635    }
3636
3637    #[tokio::test]
3638    async fn binary_op_literal_column() {
3639        let query = r#"1 + some_metric{tag_0="bar"}"#;
3640        let expected = String::from(
3641            "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
3642            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3643            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3644            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3645            \n        Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3646            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3647        );
3648
3649        indie_query_plan_compare(query, expected).await;
3650    }
3651
3652    #[tokio::test]
3653    async fn binary_op_literal_literal() {
3654        let query = r#"1 + 1"#;
3655        let expected = String::from("EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]");
3656
3657        indie_query_plan_compare(query, expected).await;
3658    }
3659
3660    #[tokio::test]
3661    async fn simple_bool_grammar() {
3662        let query = "some_metric != bool 1.2345";
3663        let expected = String::from(
3664            "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
3665            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3666            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3667            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3668            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3669            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3670        );
3671
3672        indie_query_plan_compare(query, expected).await;
3673    }
3674
3675    #[tokio::test]
3676    async fn bool_with_additional_arithmetic() {
3677        let query = "some_metric + (1 == bool 2)";
3678        let expected = String::from(
3679            "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
3680            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3681            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3682            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3683            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3684            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3685        );
3686
3687        indie_query_plan_compare(query, expected).await;
3688    }
3689
3690    #[tokio::test]
3691    async fn simple_unary() {
3692        let query = "-some_metric";
3693        let expected = String::from(
3694            "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
3695            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3696            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3697            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3698            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3699            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3700        );
3701
3702        indie_query_plan_compare(query, expected).await;
3703    }
3704
3705    #[tokio::test]
3706    async fn increase_aggr() {
3707        let query = "increase(some_metric[5m])";
3708        let expected = String::from(
3709            "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
3710            \n  Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
3711            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
3712            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3713            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3714            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3715            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3716            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3717        );
3718
3719        indie_query_plan_compare(query, expected).await;
3720    }
3721
3722    #[tokio::test]
3723    async fn less_filter_on_value() {
3724        let query = "some_metric < 1.2345";
3725        let expected = String::from(
3726            "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3727            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3728            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3729            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3730            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3731            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3732        );
3733
3734        indie_query_plan_compare(query, expected).await;
3735    }
3736
3737    #[tokio::test]
3738    async fn count_over_time() {
3739        let query = "count_over_time(some_metric[5m])";
3740        let expected = String::from(
3741            "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
3742            \n  Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
3743            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
3744            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3745            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3746            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3747            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3748            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3749        );
3750
3751        indie_query_plan_compare(query, expected).await;
3752    }
3753
3754    #[tokio::test]
3755    async fn test_hash_join() {
3756        let mut eval_stmt = EvalStmt {
3757            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3758            start: UNIX_EPOCH,
3759            end: UNIX_EPOCH
3760                .checked_add(Duration::from_secs(100_000))
3761                .unwrap(),
3762            interval: Duration::from_secs(5),
3763            lookback_delta: Duration::from_secs(1),
3764        };
3765
3766        let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
3767
3768        let prom_expr = parser::parse(case).unwrap();
3769        eval_stmt.expr = prom_expr;
3770        let table_provider = build_test_table_provider_with_fields(
3771            &[
3772                (
3773                    DEFAULT_SCHEMA_NAME.to_string(),
3774                    "http_server_requests_seconds_sum".to_string(),
3775                ),
3776                (
3777                    DEFAULT_SCHEMA_NAME.to_string(),
3778                    "http_server_requests_seconds_count".to_string(),
3779                ),
3780            ],
3781            &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
3782        )
3783        .await;
3784        // Should be ok
3785        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3786            .await
3787            .unwrap();
3788        let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
3789            \n  Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
3790            \n    SubqueryAlias: http_server_requests_seconds_sum\
3791            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
3792            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
3793            \n          Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
3794            \n            Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
3795            \n              TableScan: http_server_requests_seconds_sum\
3796            \n    SubqueryAlias: http_server_requests_seconds_count\
3797            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
3798            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
3799            \n          Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
3800            \n            Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
3801            \n              TableScan: http_server_requests_seconds_count";
3802        assert_eq!(plan.to_string(), expected);
3803    }
3804
3805    #[tokio::test]
3806    async fn test_nested_histogram_quantile() {
3807        let mut eval_stmt = EvalStmt {
3808            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3809            start: UNIX_EPOCH,
3810            end: UNIX_EPOCH
3811                .checked_add(Duration::from_secs(100_000))
3812                .unwrap(),
3813            interval: Duration::from_secs(5),
3814            lookback_delta: Duration::from_secs(1),
3815        };
3816
3817        let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
3818
3819        let prom_expr = parser::parse(case).unwrap();
3820        eval_stmt.expr = prom_expr;
3821        let table_provider = build_test_table_provider_with_fields(
3822            &[(
3823                DEFAULT_SCHEMA_NAME.to_string(),
3824                "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
3825            )],
3826            &["pod", "le", "path", "code", "container"],
3827        )
3828        .await;
3829        // Should be ok
3830        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3831            .await
3832            .unwrap();
3833    }
3834
3835    #[tokio::test]
3836    async fn test_parse_and_operator() {
3837        let mut eval_stmt = EvalStmt {
3838            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3839            start: UNIX_EPOCH,
3840            end: UNIX_EPOCH
3841                .checked_add(Duration::from_secs(100_000))
3842                .unwrap(),
3843            interval: Duration::from_secs(5),
3844            lookback_delta: Duration::from_secs(1),
3845        };
3846
3847        let cases = [
3848            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
3849            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
3850        ];
3851
3852        for case in cases {
3853            let prom_expr = parser::parse(case).unwrap();
3854            eval_stmt.expr = prom_expr;
3855            let table_provider = build_test_table_provider_with_fields(
3856                &[
3857                    (
3858                        DEFAULT_SCHEMA_NAME.to_string(),
3859                        "kubelet_volume_stats_used_bytes".to_string(),
3860                    ),
3861                    (
3862                        DEFAULT_SCHEMA_NAME.to_string(),
3863                        "kubelet_volume_stats_capacity_bytes".to_string(),
3864                    ),
3865                ],
3866                &["namespace", "persistentvolumeclaim"],
3867            )
3868            .await;
3869            // Should be ok
3870            let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3871                .await
3872                .unwrap();
3873        }
3874    }
3875
3876    #[tokio::test]
3877    async fn test_nested_binary_op() {
3878        let mut eval_stmt = EvalStmt {
3879            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3880            start: UNIX_EPOCH,
3881            end: UNIX_EPOCH
3882                .checked_add(Duration::from_secs(100_000))
3883                .unwrap(),
3884            interval: Duration::from_secs(5),
3885            lookback_delta: Duration::from_secs(1),
3886        };
3887
3888        let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
3889        (
3890            sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
3891            or
3892            vector(0)
3893        )"#;
3894
3895        let prom_expr = parser::parse(case).unwrap();
3896        eval_stmt.expr = prom_expr;
3897        let table_provider = build_test_table_provider_with_fields(
3898            &[(
3899                DEFAULT_SCHEMA_NAME.to_string(),
3900                "nginx_ingress_controller_requests".to_string(),
3901            )],
3902            &["namespace", "job"],
3903        )
3904        .await;
3905        // Should be ok
3906        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3907            .await
3908            .unwrap();
3909    }
3910
3911    #[tokio::test]
3912    async fn test_parse_or_operator() {
3913        let mut eval_stmt = EvalStmt {
3914            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3915            start: UNIX_EPOCH,
3916            end: UNIX_EPOCH
3917                .checked_add(Duration::from_secs(100_000))
3918                .unwrap(),
3919            interval: Duration::from_secs(5),
3920            lookback_delta: Duration::from_secs(1),
3921        };
3922
3923        let case = r#"
3924        sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
3925        (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
3926            or
3927        200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
3928        sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
3929
3930        let table_provider = build_test_table_provider_with_fields(
3931            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
3932            &["tenant_name", "cluster_name"],
3933        )
3934        .await;
3935        eval_stmt.expr = parser::parse(case).unwrap();
3936        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3937            .await
3938            .unwrap();
3939
3940        let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3941            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
3942            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3943            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
3944            or
3945            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3946            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
3947            or
3948            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3949            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
3950        let table_provider = build_test_table_provider_with_fields(
3951            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
3952            &["tenant_name", "cluster_name"],
3953        )
3954        .await;
3955        eval_stmt.expr = parser::parse(case).unwrap();
3956        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3957            .await
3958            .unwrap();
3959
3960        let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
3961            sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
3962            (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
3963            (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
3964        let table_provider = build_test_table_provider_with_fields(
3965            &[
3966                (
3967                    DEFAULT_SCHEMA_NAME.to_string(),
3968                    "background_waitevent_cnt".to_string(),
3969                ),
3970                (
3971                    DEFAULT_SCHEMA_NAME.to_string(),
3972                    "foreground_waitevent_cnt".to_string(),
3973                ),
3974            ],
3975            &["tenant_name", "cluster_name"],
3976        )
3977        .await;
3978        eval_stmt.expr = parser::parse(case).unwrap();
3979        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3980            .await
3981            .unwrap();
3982
3983        let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
3984        let table_provider = build_test_table_provider_with_fields(
3985            &[
3986                (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
3987                (
3988                    DEFAULT_SCHEMA_NAME.to_string(),
3989                    "container_cpu_load_average_10s".to_string(),
3990                ),
3991                (
3992                    DEFAULT_SCHEMA_NAME.to_string(),
3993                    "container_spec_cpu_quota".to_string(),
3994                ),
3995            ],
3996            &["cluster_name", "host_name"],
3997        )
3998        .await;
3999        eval_stmt.expr = parser::parse(case).unwrap();
4000        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4001            .await
4002            .unwrap();
4003    }
4004
4005    #[tokio::test]
4006    async fn value_matcher() {
4007        // template
4008        let mut eval_stmt = EvalStmt {
4009            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4010            start: UNIX_EPOCH,
4011            end: UNIX_EPOCH
4012                .checked_add(Duration::from_secs(100_000))
4013                .unwrap(),
4014            interval: Duration::from_secs(5),
4015            lookback_delta: Duration::from_secs(1),
4016        };
4017
4018        let cases = [
4019            // single equal matcher
4020            (
4021                r#"some_metric{__field__="field_1"}"#,
4022                vec![
4023                    "some_metric.field_1",
4024                    "some_metric.tag_0",
4025                    "some_metric.tag_1",
4026                    "some_metric.tag_2",
4027                    "some_metric.timestamp",
4028                ],
4029            ),
4030            // two equal matchers
4031            (
4032                r#"some_metric{__field__="field_1", __field__="field_0"}"#,
4033                vec![
4034                    "some_metric.field_0",
4035                    "some_metric.field_1",
4036                    "some_metric.tag_0",
4037                    "some_metric.tag_1",
4038                    "some_metric.tag_2",
4039                    "some_metric.timestamp",
4040                ],
4041            ),
4042            // single not_eq matcher
4043            (
4044                r#"some_metric{__field__!="field_1"}"#,
4045                vec![
4046                    "some_metric.field_0",
4047                    "some_metric.field_2",
4048                    "some_metric.tag_0",
4049                    "some_metric.tag_1",
4050                    "some_metric.tag_2",
4051                    "some_metric.timestamp",
4052                ],
4053            ),
4054            // two not_eq matchers
4055            (
4056                r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
4057                vec![
4058                    "some_metric.field_0",
4059                    "some_metric.tag_0",
4060                    "some_metric.tag_1",
4061                    "some_metric.tag_2",
4062                    "some_metric.timestamp",
4063                ],
4064            ),
4065            // equal and not_eq matchers (no conflict)
4066            (
4067                r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
4068                vec![
4069                    "some_metric.field_1",
4070                    "some_metric.tag_0",
4071                    "some_metric.tag_1",
4072                    "some_metric.tag_2",
4073                    "some_metric.timestamp",
4074                ],
4075            ),
4076            // equal and not_eq matchers (conflict)
4077            (
4078                r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
4079                vec![
4080                    "some_metric.tag_0",
4081                    "some_metric.tag_1",
4082                    "some_metric.tag_2",
4083                    "some_metric.timestamp",
4084                ],
4085            ),
4086            // single regex eq matcher
4087            (
4088                r#"some_metric{__field__=~"field_1|field_2"}"#,
4089                vec![
4090                    "some_metric.field_1",
4091                    "some_metric.field_2",
4092                    "some_metric.tag_0",
4093                    "some_metric.tag_1",
4094                    "some_metric.tag_2",
4095                    "some_metric.timestamp",
4096                ],
4097            ),
4098            // single regex not_eq matcher
4099            (
4100                r#"some_metric{__field__!~"field_1|field_2"}"#,
4101                vec![
4102                    "some_metric.field_0",
4103                    "some_metric.tag_0",
4104                    "some_metric.tag_1",
4105                    "some_metric.tag_2",
4106                    "some_metric.timestamp",
4107                ],
4108            ),
4109        ];
4110
4111        for case in cases {
4112            let prom_expr = parser::parse(case.0).unwrap();
4113            eval_stmt.expr = prom_expr;
4114            let table_provider = build_test_table_provider(
4115                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4116                3,
4117                3,
4118            )
4119            .await;
4120            let plan =
4121                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4122                    .await
4123                    .unwrap();
4124            let mut fields = plan.schema().field_names();
4125            let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
4126            fields.sort();
4127            expected.sort();
4128            assert_eq!(fields, expected, "case: {:?}", case.0);
4129        }
4130
4131        let bad_cases = [
4132            r#"some_metric{__field__="nonexistent"}"#,
4133            r#"some_metric{__field__!="nonexistent"}"#,
4134        ];
4135
4136        for case in bad_cases {
4137            let prom_expr = parser::parse(case).unwrap();
4138            eval_stmt.expr = prom_expr;
4139            let table_provider = build_test_table_provider(
4140                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4141                3,
4142                3,
4143            )
4144            .await;
4145            let plan =
4146                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
4147            assert!(plan.is_err(), "case: {:?}", case);
4148        }
4149    }
4150
4151    #[tokio::test]
4152    async fn custom_schema() {
4153        let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4154        let expected = String::from(
4155            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4156            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4157            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4158            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4159            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4160        );
4161
4162        indie_query_plan_compare(query, expected).await;
4163
4164        let query = "some_alt_metric{__database__=\"greptime_private\"}";
4165        let expected = String::from(
4166            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4167            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4168            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4169            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4170            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4171        );
4172
4173        indie_query_plan_compare(query, expected).await;
4174
4175        let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4176        let expected = String::from("Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4177        \n  Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4178        \n    SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4179        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4180        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4181        \n          Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4182        \n            Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4183        \n              TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4184        \n    SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4185        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4186        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4187        \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4188        \n            Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4189        \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]");
4190
4191        indie_query_plan_compare(query, expected).await;
4192    }
4193
4194    #[tokio::test]
4195    async fn only_equals_is_supported_for_special_matcher() {
4196        let queries = &[
4197            "some_alt_metric{__schema__!=\"greptime_private\"}",
4198            "some_alt_metric{__schema__=~\"lalala\"}",
4199            "some_alt_metric{__database__!=\"greptime_private\"}",
4200            "some_alt_metric{__database__=~\"lalala\"}",
4201        ];
4202
4203        for query in queries {
4204            let prom_expr = parser::parse(query).unwrap();
4205            let eval_stmt = EvalStmt {
4206                expr: prom_expr,
4207                start: UNIX_EPOCH,
4208                end: UNIX_EPOCH
4209                    .checked_add(Duration::from_secs(100_000))
4210                    .unwrap(),
4211                interval: Duration::from_secs(5),
4212                lookback_delta: Duration::from_secs(1),
4213            };
4214
4215            let table_provider = build_test_table_provider(
4216                &[
4217                    (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4218                    (
4219                        "greptime_private".to_string(),
4220                        "some_alt_metric".to_string(),
4221                    ),
4222                ],
4223                1,
4224                1,
4225            )
4226            .await;
4227
4228            let plan =
4229                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
4230            assert!(plan.is_err(), "query: {:?}", query);
4231        }
4232    }
4233
4234    #[tokio::test]
4235    async fn test_non_ms_precision() {
4236        let catalog_list = MemoryCatalogManager::with_default_setup();
4237        let columns = vec![
4238            ColumnSchema::new(
4239                "tag".to_string(),
4240                ConcreteDataType::string_datatype(),
4241                false,
4242            ),
4243            ColumnSchema::new(
4244                "timestamp".to_string(),
4245                ConcreteDataType::timestamp_nanosecond_datatype(),
4246                false,
4247            )
4248            .with_time_index(true),
4249            ColumnSchema::new(
4250                "field".to_string(),
4251                ConcreteDataType::float64_datatype(),
4252                true,
4253            ),
4254        ];
4255        let schema = Arc::new(Schema::new(columns));
4256        let table_meta = TableMetaBuilder::empty()
4257            .schema(schema)
4258            .primary_key_indices(vec![0])
4259            .value_indices(vec![2])
4260            .next_column_id(1024)
4261            .build()
4262            .unwrap();
4263        let table_info = TableInfoBuilder::default()
4264            .name("metrics".to_string())
4265            .meta(table_meta)
4266            .build()
4267            .unwrap();
4268        let table = EmptyTable::from_table_info(&table_info);
4269        assert!(catalog_list
4270            .register_table_sync(RegisterTableRequest {
4271                catalog: DEFAULT_CATALOG_NAME.to_string(),
4272                schema: DEFAULT_SCHEMA_NAME.to_string(),
4273                table_name: "metrics".to_string(),
4274                table_id: 1024,
4275                table,
4276            })
4277            .is_ok());
4278
4279        let plan = PromPlanner::stmt_to_plan(
4280            DfTableSourceProvider::new(
4281                catalog_list.clone(),
4282                false,
4283                QueryContext::arc(),
4284                DummyDecoder::arc(),
4285                true,
4286            ),
4287            &EvalStmt {
4288                expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4289                start: UNIX_EPOCH,
4290                end: UNIX_EPOCH
4291                    .checked_add(Duration::from_secs(100_000))
4292                    .unwrap(),
4293                interval: Duration::from_secs(5),
4294                lookback_delta: Duration::from_secs(1),
4295            },
4296            &build_session_state(),
4297        )
4298        .await
4299        .unwrap();
4300        assert_eq!(plan.display_indent_schema().to_string(),
4301        "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4302        \n  PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4303        \n    Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4304        \n      Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4305        \n        Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4306        \n          TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4307        );
4308        let plan = PromPlanner::stmt_to_plan(
4309            DfTableSourceProvider::new(
4310                catalog_list.clone(),
4311                false,
4312                QueryContext::arc(),
4313                DummyDecoder::arc(),
4314                true,
4315            ),
4316            &EvalStmt {
4317                expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4318                start: UNIX_EPOCH,
4319                end: UNIX_EPOCH
4320                    .checked_add(Duration::from_secs(100_000))
4321                    .unwrap(),
4322                interval: Duration::from_secs(5),
4323                lookback_delta: Duration::from_secs(1),
4324            },
4325            &build_session_state(),
4326        )
4327        .await
4328        .unwrap();
4329        assert_eq!(plan.display_indent_schema().to_string(),
4330        "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4331        \n  Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4332        \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4333        \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4334        \n        PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4335        \n          Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4336        \n            Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4337        \n              Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4338        \n                TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4339        );
4340    }
4341
4342    #[tokio::test]
4343    async fn test_nonexistent_label() {
4344        // template
4345        let mut eval_stmt = EvalStmt {
4346            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4347            start: UNIX_EPOCH,
4348            end: UNIX_EPOCH
4349                .checked_add(Duration::from_secs(100_000))
4350                .unwrap(),
4351            interval: Duration::from_secs(5),
4352            lookback_delta: Duration::from_secs(1),
4353        };
4354
4355        let case = r#"some_metric{nonexistent="hi"}"#;
4356        let prom_expr = parser::parse(case).unwrap();
4357        eval_stmt.expr = prom_expr;
4358        let table_provider = build_test_table_provider(
4359            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4360            3,
4361            3,
4362        )
4363        .await;
4364        // Should be ok
4365        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4366            .await
4367            .unwrap();
4368    }
4369
4370    #[tokio::test]
4371    async fn test_label_join() {
4372        let prom_expr = parser::parse(
4373            "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4374        )
4375        .unwrap();
4376        let eval_stmt = EvalStmt {
4377            expr: prom_expr,
4378            start: UNIX_EPOCH,
4379            end: UNIX_EPOCH
4380                .checked_add(Duration::from_secs(100_000))
4381                .unwrap(),
4382            interval: Duration::from_secs(5),
4383            lookback_delta: Duration::from_secs(1),
4384        };
4385
4386        let table_provider =
4387            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4388                .await;
4389        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4390            .await
4391            .unwrap();
4392
4393        let expected = r#"
4394Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4395  Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4396    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4397      PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4398        Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4399          Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4400            TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4401
4402        let ret = plan.display_indent_schema().to_string();
4403        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4404    }
4405
4406    #[tokio::test]
4407    async fn test_label_replace() {
4408        let prom_expr = parser::parse(
4409            "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4410        )
4411        .unwrap();
4412        let eval_stmt = EvalStmt {
4413            expr: prom_expr,
4414            start: UNIX_EPOCH,
4415            end: UNIX_EPOCH
4416                .checked_add(Duration::from_secs(100_000))
4417                .unwrap(),
4418            interval: Duration::from_secs(5),
4419            lookback_delta: Duration::from_secs(1),
4420        };
4421
4422        let table_provider =
4423            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4424                .await;
4425        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4426            .await
4427            .unwrap();
4428
4429        let expected = r#"
4430Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4431  Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4432    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4433      PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4434        Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4435          Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4436            TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4437
4438        let ret = plan.display_indent_schema().to_string();
4439        assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4440    }
4441
4442    #[tokio::test]
4443    async fn test_matchers_to_expr() {
4444        let mut eval_stmt = EvalStmt {
4445            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4446            start: UNIX_EPOCH,
4447            end: UNIX_EPOCH
4448                .checked_add(Duration::from_secs(100_000))
4449                .unwrap(),
4450            interval: Duration::from_secs(5),
4451            lookback_delta: Duration::from_secs(1),
4452        };
4453        let case =
4454            r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4455
4456        let prom_expr = parser::parse(case).unwrap();
4457        eval_stmt.expr = prom_expr;
4458        let table_provider = build_test_table_provider(
4459            &[(
4460                DEFAULT_SCHEMA_NAME.to_string(),
4461                "prometheus_tsdb_head_series".to_string(),
4462            )],
4463            3,
4464            3,
4465        )
4466        .await;
4467        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4468            .await
4469            .unwrap();
4470        let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4471        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4472        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4473        \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4474        \n        Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4475        \n          Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4476        \n            TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4477        assert_eq!(plan.display_indent_schema().to_string(), expected);
4478    }
4479
4480    #[tokio::test]
4481    async fn test_topk_expr() {
4482        let mut eval_stmt = EvalStmt {
4483            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4484            start: UNIX_EPOCH,
4485            end: UNIX_EPOCH
4486                .checked_add(Duration::from_secs(100_000))
4487                .unwrap(),
4488            interval: Duration::from_secs(5),
4489            lookback_delta: Duration::from_secs(1),
4490        };
4491        let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4492
4493        let prom_expr = parser::parse(case).unwrap();
4494        eval_stmt.expr = prom_expr;
4495        let table_provider = build_test_table_provider_with_fields(
4496            &[
4497                (
4498                    DEFAULT_SCHEMA_NAME.to_string(),
4499                    "prometheus_tsdb_head_series".to_string(),
4500                ),
4501                (
4502                    DEFAULT_SCHEMA_NAME.to_string(),
4503                    "http_server_requests_seconds_count".to_string(),
4504                ),
4505            ],
4506            &["ip"],
4507        )
4508        .await;
4509
4510        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4511            .await
4512            .unwrap();
4513        let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4514        \n  Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4515        \n    Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4516        \n      WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4517        \n        Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4518        \n          Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4519        \n            PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4520        \n              PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4521        \n                Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4522        \n                  Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4523        \n                    TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4524
4525        assert_eq!(plan.display_indent_schema().to_string(), expected);
4526    }
4527
4528    #[tokio::test]
4529    async fn test_count_values_expr() {
4530        let mut eval_stmt = EvalStmt {
4531            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4532            start: UNIX_EPOCH,
4533            end: UNIX_EPOCH
4534                .checked_add(Duration::from_secs(100_000))
4535                .unwrap(),
4536            interval: Duration::from_secs(5),
4537            lookback_delta: Duration::from_secs(1),
4538        };
4539        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4540
4541        let prom_expr = parser::parse(case).unwrap();
4542        eval_stmt.expr = prom_expr;
4543        let table_provider = build_test_table_provider_with_fields(
4544            &[
4545                (
4546                    DEFAULT_SCHEMA_NAME.to_string(),
4547                    "prometheus_tsdb_head_series".to_string(),
4548                ),
4549                (
4550                    DEFAULT_SCHEMA_NAME.to_string(),
4551                    "http_server_requests_seconds_count".to_string(),
4552                ),
4553            ],
4554            &["ip"],
4555        )
4556        .await;
4557
4558        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4559            .await
4560            .unwrap();
4561        let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4562        \n  Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4563        \n    Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4564        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4565        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4566        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4567        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4568        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4569        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4570
4571        assert_eq!(plan.display_indent_schema().to_string(), expected);
4572    }
4573
4574    #[tokio::test]
4575    async fn test_quantile_expr() {
4576        let mut eval_stmt = EvalStmt {
4577            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4578            start: UNIX_EPOCH,
4579            end: UNIX_EPOCH
4580                .checked_add(Duration::from_secs(100_000))
4581                .unwrap(),
4582            interval: Duration::from_secs(5),
4583            lookback_delta: Duration::from_secs(1),
4584        };
4585        let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4586
4587        let prom_expr = parser::parse(case).unwrap();
4588        eval_stmt.expr = prom_expr;
4589        let table_provider = build_test_table_provider_with_fields(
4590            &[
4591                (
4592                    DEFAULT_SCHEMA_NAME.to_string(),
4593                    "prometheus_tsdb_head_series".to_string(),
4594                ),
4595                (
4596                    DEFAULT_SCHEMA_NAME.to_string(),
4597                    "http_server_requests_seconds_count".to_string(),
4598                ),
4599            ],
4600            &["ip"],
4601        )
4602        .await;
4603
4604        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4605            .await
4606            .unwrap();
4607        let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4608        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4609        \n    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4610        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4611        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4612        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4613        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4614        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4615        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4616
4617        assert_eq!(plan.display_indent_schema().to_string(), expected);
4618    }
4619
4620    #[tokio::test]
4621    async fn test_or_not_exists_table_label() {
4622        let mut eval_stmt = EvalStmt {
4623            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4624            start: UNIX_EPOCH,
4625            end: UNIX_EPOCH
4626                .checked_add(Duration::from_secs(100_000))
4627                .unwrap(),
4628            interval: Duration::from_secs(5),
4629            lookback_delta: Duration::from_secs(1),
4630        };
4631        let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
4632
4633        let prom_expr = parser::parse(case).unwrap();
4634        eval_stmt.expr = prom_expr;
4635        let table_provider = build_test_table_provider_with_fields(
4636            &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
4637            &["job"],
4638        )
4639        .await;
4640
4641        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4642            .await
4643            .unwrap();
4644        let expected = "UnionDistinctOn: on col=[[\"job\"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
4645        \n  SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
4646        \n    Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
4647        \n      Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]\
4648        \n        Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]\
4649        \n          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4650        \n            PromSeriesDivide: tags=[\"job\"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4651        \n              Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4652        \n                Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4653        \n                  TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4654        \n  SubqueryAlias:  [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]\
4655        \n    Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]\
4656        \n      Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]\
4657        \n        Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]\
4658        \n          EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]";
4659
4660        assert_eq!(plan.display_indent_schema().to_string(), expected);
4661    }
4662}