query/promql/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_query::prelude::GREPTIME_VALUE;
25use datafusion::common::DFSchemaRef;
26use datafusion::datasource::DefaultTableSource;
27use datafusion::execution::context::SessionState;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::grouping::grouping_udaf;
31use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
32use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
33use datafusion::functions_aggregate::sum::sum_udaf;
34use datafusion::functions_aggregate::variance::var_pop_udaf;
35use datafusion::functions_window::row_number::RowNumber;
36use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
37use datafusion::logical_expr::expr_rewriter::normalize_cols;
38use datafusion::logical_expr::{
39    BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
40    ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
41};
42use datafusion::prelude as df_prelude;
43use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
44use datafusion::scalar::ScalarValue;
45use datafusion::sql::TableReference;
46use datafusion_common::DFSchema;
47use datafusion_expr::utils::conjunction;
48use datafusion_expr::{col, lit, ExprSchemable, SortExpr};
49use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
50use datatypes::data_type::ConcreteDataType;
51use itertools::Itertools;
52use promql::extension_plan::{
53    build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
54    RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
55};
56use promql::functions::{
57    quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters,
58    IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
59    QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
60};
61use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
62use promql_parser::parser::token::TokenType;
63use promql_parser::parser::{
64    token, AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt,
65    Expr as PromExpr, Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector,
66    NumberLiteral, Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr,
67    VectorMatchCardinality, VectorSelector,
68};
69use snafu::{ensure, OptionExt, ResultExt};
70use store_api::metric_engine_consts::{
71    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
72};
73use table::table::adapter::DfTableProviderAdapter;
74
75use crate::promql::error::{
76    CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
77    ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidTimeRangeSnafu,
78    MultiFieldsNotSupportedSnafu, MultipleMetricMatchersSnafu, MultipleVectorSnafu,
79    NoMetricMatcherSnafu, PromqlPlanNodeSnafu, Result, TableNameNotFoundSnafu,
80    TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
81    UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu,
82    ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
83};
84
85/// `time()` function in PromQL.
86const SPECIAL_TIME_FUNCTION: &str = "time";
87/// `scalar()` function in PromQL.
88const SCALAR_FUNCTION: &str = "scalar";
89/// `histogram_quantile` function in PromQL
90const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
91/// `vector` function in PromQL
92const SPECIAL_VECTOR_FUNCTION: &str = "vector";
93/// `le` column for conventional histogram.
94const LE_COLUMN_NAME: &str = "le";
95
96const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
97
98/// default value column name for empty metric
99const DEFAULT_FIELD_COLUMN: &str = "value";
100
101/// Special modifier to project field columns under multi-field mode
102const FIELD_COLUMN_MATCHER: &str = "__field__";
103
104/// Special modifier for cross schema query
105const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
106const DB_COLUMN_MATCHER: &str = "__database__";
107
108/// Threshold for scatter scan mode
109const MAX_SCATTER_POINTS: i64 = 400;
110
111/// Interval 1 hour in millisecond
112const INTERVAL_1H: i64 = 60 * 60 * 1000;
113
114#[derive(Default, Debug, Clone)]
115struct PromPlannerContext {
116    // query parameters
117    start: Millisecond,
118    end: Millisecond,
119    interval: Millisecond,
120    lookback_delta: Millisecond,
121
122    // planner states
123    table_name: Option<String>,
124    time_index_column: Option<String>,
125    field_columns: Vec<String>,
126    tag_columns: Vec<String>,
127    field_column_matcher: Option<Vec<Matcher>>,
128    schema_name: Option<String>,
129    /// The range in millisecond of range selector. None if there is no range selector.
130    range: Option<Millisecond>,
131}
132
133impl PromPlannerContext {
134    fn from_eval_stmt(stmt: &EvalStmt) -> Self {
135        Self {
136            start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
137            end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
138            interval: stmt.interval.as_millis() as _,
139            lookback_delta: stmt.lookback_delta.as_millis() as _,
140            ..Default::default()
141        }
142    }
143
144    /// Reset all planner states
145    fn reset(&mut self) {
146        self.table_name = None;
147        self.time_index_column = None;
148        self.field_columns = vec![];
149        self.tag_columns = vec![];
150        self.field_column_matcher = None;
151        self.schema_name = None;
152        self.range = None;
153    }
154
155    /// Reset table name and schema to empty
156    fn reset_table_name_and_schema(&mut self) {
157        self.table_name = Some(String::new());
158        self.schema_name = None;
159    }
160
161    /// Check if `le` is present in tag columns
162    fn has_le_tag(&self) -> bool {
163        self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
164    }
165}
166
167pub struct PromPlanner {
168    table_provider: DfTableSourceProvider,
169    ctx: PromPlannerContext,
170}
171
172/// Unescapes the value of the matcher
173pub fn normalize_matcher(mut matcher: Matcher) -> Matcher {
174    if let Ok(unescaped_value) = unescaper::unescape(&matcher.value) {
175        matcher.value = unescaped_value;
176    }
177    matcher
178}
179
180impl PromPlanner {
181    pub async fn stmt_to_plan(
182        table_provider: DfTableSourceProvider,
183        stmt: &EvalStmt,
184        session_state: &SessionState,
185    ) -> Result<LogicalPlan> {
186        let mut planner = Self {
187            table_provider,
188            ctx: PromPlannerContext::from_eval_stmt(stmt),
189        };
190
191        planner.prom_expr_to_plan(&stmt.expr, session_state).await
192    }
193
194    #[async_recursion]
195    pub async fn prom_expr_to_plan(
196        &mut self,
197        prom_expr: &PromExpr,
198        session_state: &SessionState,
199    ) -> Result<LogicalPlan> {
200        let res = match prom_expr {
201            PromExpr::Aggregate(expr) => self.prom_aggr_expr_to_plan(session_state, expr).await?,
202            PromExpr::Unary(expr) => self.prom_unary_expr_to_plan(session_state, expr).await?,
203            PromExpr::Binary(expr) => self.prom_binary_expr_to_plan(session_state, expr).await?,
204            PromExpr::Paren(ParenExpr { expr }) => {
205                self.prom_expr_to_plan(expr, session_state).await?
206            }
207            PromExpr::Subquery(expr) => {
208                self.prom_subquery_expr_to_plan(session_state, expr).await?
209            }
210            PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
211            PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
212            PromExpr::VectorSelector(selector) => {
213                self.prom_vector_selector_to_plan(selector).await?
214            }
215            PromExpr::MatrixSelector(selector) => {
216                self.prom_matrix_selector_to_plan(selector).await?
217            }
218            PromExpr::Call(expr) => self.prom_call_expr_to_plan(session_state, expr).await?,
219            PromExpr::Extension(expr) => self.prom_ext_expr_to_plan(session_state, expr).await?,
220        };
221
222        Ok(res)
223    }
224
225    async fn prom_subquery_expr_to_plan(
226        &mut self,
227        session_state: &SessionState,
228        subquery_expr: &SubqueryExpr,
229    ) -> Result<LogicalPlan> {
230        let SubqueryExpr {
231            expr, range, step, ..
232        } = subquery_expr;
233
234        let current_interval = self.ctx.interval;
235        if let Some(step) = step {
236            self.ctx.interval = step.as_millis() as _;
237        }
238        let current_start = self.ctx.start;
239        self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
240        let input = self.prom_expr_to_plan(expr, session_state).await?;
241        self.ctx.interval = current_interval;
242        self.ctx.start = current_start;
243
244        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
245        let range_ms = range.as_millis() as _;
246        self.ctx.range = Some(range_ms);
247
248        let manipulate = RangeManipulate::new(
249            self.ctx.start,
250            self.ctx.end,
251            self.ctx.interval,
252            range_ms,
253            self.ctx
254                .time_index_column
255                .clone()
256                .expect("time index should be set in `setup_context`"),
257            self.ctx.field_columns.clone(),
258            input,
259        )
260        .context(DataFusionPlanningSnafu)?;
261
262        Ok(LogicalPlan::Extension(Extension {
263            node: Arc::new(manipulate),
264        }))
265    }
266
267    async fn prom_aggr_expr_to_plan(
268        &mut self,
269        session_state: &SessionState,
270        aggr_expr: &AggregateExpr,
271    ) -> Result<LogicalPlan> {
272        let AggregateExpr {
273            op,
274            expr,
275            modifier,
276            param,
277        } = aggr_expr;
278
279        let input = self.prom_expr_to_plan(expr, session_state).await?;
280
281        match (*op).id() {
282            token::T_TOPK | token::T_BOTTOMK => {
283                self.prom_topk_bottomk_to_plan(aggr_expr, input).await
284            }
285            _ => {
286                // calculate columns to group by
287                // Need to append time index column into group by columns
288                let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
289                // convert op and value columns to aggregate exprs
290                let (aggr_exprs, prev_field_exprs) =
291                    self.create_aggregate_exprs(*op, param, &input)?;
292
293                // create plan
294                let builder = LogicalPlanBuilder::from(input);
295                let builder = if op.id() == token::T_COUNT_VALUES {
296                    let label = Self::get_param_value_as_str(*op, param)?;
297                    // `count_values` must be grouped by fields,
298                    // and project the fields to the new label.
299                    group_exprs.extend(prev_field_exprs.clone());
300                    let project_fields = self
301                        .create_field_column_exprs()?
302                        .into_iter()
303                        .chain(self.create_tag_column_exprs()?)
304                        .chain(Some(self.create_time_index_column_expr()?))
305                        .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
306
307                    builder
308                        .aggregate(group_exprs.clone(), aggr_exprs)
309                        .context(DataFusionPlanningSnafu)?
310                        .project(project_fields)
311                        .context(DataFusionPlanningSnafu)?
312                } else {
313                    builder
314                        .aggregate(group_exprs.clone(), aggr_exprs)
315                        .context(DataFusionPlanningSnafu)?
316                };
317
318                let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
319
320                builder
321                    .sort(sort_expr)
322                    .context(DataFusionPlanningSnafu)?
323                    .build()
324                    .context(DataFusionPlanningSnafu)
325            }
326        }
327    }
328
329    /// Create logical plan for PromQL topk and bottomk expr.
330    async fn prom_topk_bottomk_to_plan(
331        &mut self,
332        aggr_expr: &AggregateExpr,
333        input: LogicalPlan,
334    ) -> Result<LogicalPlan> {
335        let AggregateExpr {
336            op,
337            param,
338            modifier,
339            ..
340        } = aggr_expr;
341
342        let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
343
344        let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
345
346        // convert op and value columns to window exprs.
347        let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
348
349        let rank_columns: Vec<_> = window_exprs
350            .iter()
351            .map(|expr| expr.schema_name().to_string())
352            .collect();
353
354        // Create ranks filter with `Operator::Or`.
355        // Safety: at least one rank column
356        let filter: DfExpr = rank_columns
357            .iter()
358            .fold(None, |expr, rank| {
359                let predicate = DfExpr::BinaryExpr(BinaryExpr {
360                    left: Box::new(col(rank)),
361                    op: Operator::LtEq,
362                    right: Box::new(val.clone()),
363                });
364
365                match expr {
366                    None => Some(predicate),
367                    Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
368                        left: Box::new(expr),
369                        op: Operator::Or,
370                        right: Box::new(predicate),
371                    })),
372                }
373            })
374            .unwrap();
375
376        let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
377
378        let mut new_group_exprs = group_exprs.clone();
379        // Order by ranks
380        new_group_exprs.extend(rank_columns);
381
382        let group_sort_expr = new_group_exprs
383            .into_iter()
384            .map(|expr| expr.sort(true, false));
385
386        let project_fields = self
387            .create_field_column_exprs()?
388            .into_iter()
389            .chain(self.create_tag_column_exprs()?)
390            .chain(Some(self.create_time_index_column_expr()?));
391
392        LogicalPlanBuilder::from(input)
393            .window(window_exprs)
394            .context(DataFusionPlanningSnafu)?
395            .filter(filter)
396            .context(DataFusionPlanningSnafu)?
397            .sort(group_sort_expr)
398            .context(DataFusionPlanningSnafu)?
399            .project(project_fields)
400            .context(DataFusionPlanningSnafu)?
401            .build()
402            .context(DataFusionPlanningSnafu)
403    }
404
405    async fn prom_unary_expr_to_plan(
406        &mut self,
407        session_state: &SessionState,
408        unary_expr: &UnaryExpr,
409    ) -> Result<LogicalPlan> {
410        let UnaryExpr { expr } = unary_expr;
411        // Unary Expr in PromQL implys the `-` operator
412        let input = self.prom_expr_to_plan(expr, session_state).await?;
413        self.projection_for_each_field_column(input, |col| {
414            Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
415        })
416    }
417
418    async fn prom_binary_expr_to_plan(
419        &mut self,
420        session_state: &SessionState,
421        binary_expr: &PromBinaryExpr,
422    ) -> Result<LogicalPlan> {
423        let PromBinaryExpr {
424            lhs,
425            rhs,
426            op,
427            modifier,
428        } = binary_expr;
429
430        // if set to true, comparison operator will return 0/1 (for true/false) instead of
431        // filter on the result column
432        let should_return_bool = if let Some(m) = modifier {
433            m.return_bool
434        } else {
435            false
436        };
437        let is_comparison_op = Self::is_token_a_comparison_op(*op);
438
439        // we should build a filter plan here if the op is comparison op and need not
440        // to return 0/1. Otherwise, we should build a projection plan
441        match (
442            Self::try_build_literal_expr(lhs),
443            Self::try_build_literal_expr(rhs),
444        ) {
445            (Some(lhs), Some(rhs)) => {
446                self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
447                self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
448                self.ctx.reset_table_name_and_schema();
449                let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
450                let mut field_expr = field_expr_builder(lhs, rhs)?;
451
452                if is_comparison_op && should_return_bool {
453                    field_expr = DfExpr::Cast(Cast {
454                        expr: Box::new(field_expr),
455                        data_type: ArrowDataType::Float64,
456                    });
457                }
458
459                Ok(LogicalPlan::Extension(Extension {
460                    node: Arc::new(
461                        EmptyMetric::new(
462                            self.ctx.start,
463                            self.ctx.end,
464                            self.ctx.interval,
465                            SPECIAL_TIME_FUNCTION.to_string(),
466                            DEFAULT_FIELD_COLUMN.to_string(),
467                            Some(field_expr),
468                        )
469                        .context(DataFusionPlanningSnafu)?,
470                    ),
471                }))
472            }
473            // lhs is a literal, rhs is a column
474            (Some(mut expr), None) => {
475                let input = self.prom_expr_to_plan(rhs, session_state).await?;
476                // check if the literal is a special time expr
477                if let Some(time_expr) = Self::try_build_special_time_expr(
478                    lhs,
479                    self.ctx.time_index_column.as_ref().unwrap(),
480                ) {
481                    expr = time_expr
482                }
483                let bin_expr_builder = |col: &String| {
484                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
485                    let mut binary_expr =
486                        binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
487
488                    if is_comparison_op && should_return_bool {
489                        binary_expr = DfExpr::Cast(Cast {
490                            expr: Box::new(binary_expr),
491                            data_type: ArrowDataType::Float64,
492                        });
493                    }
494                    Ok(binary_expr)
495                };
496                if is_comparison_op && !should_return_bool {
497                    self.filter_on_field_column(input, bin_expr_builder)
498                } else {
499                    self.projection_for_each_field_column(input, bin_expr_builder)
500                }
501            }
502            // lhs is a column, rhs is a literal
503            (None, Some(mut expr)) => {
504                let input = self.prom_expr_to_plan(lhs, session_state).await?;
505                // check if the literal is a special time expr
506                if let Some(time_expr) = Self::try_build_special_time_expr(
507                    rhs,
508                    self.ctx.time_index_column.as_ref().unwrap(),
509                ) {
510                    expr = time_expr
511                }
512                let bin_expr_builder = |col: &String| {
513                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
514                    let mut binary_expr =
515                        binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
516
517                    if is_comparison_op && should_return_bool {
518                        binary_expr = DfExpr::Cast(Cast {
519                            expr: Box::new(binary_expr),
520                            data_type: ArrowDataType::Float64,
521                        });
522                    }
523                    Ok(binary_expr)
524                };
525                if is_comparison_op && !should_return_bool {
526                    self.filter_on_field_column(input, bin_expr_builder)
527                } else {
528                    self.projection_for_each_field_column(input, bin_expr_builder)
529                }
530            }
531            // both are columns. join them on time index
532            (None, None) => {
533                let left_input = self.prom_expr_to_plan(lhs, session_state).await?;
534                let left_field_columns = self.ctx.field_columns.clone();
535                let left_time_index_column = self.ctx.time_index_column.clone();
536                let mut left_table_ref = self
537                    .table_ref()
538                    .unwrap_or_else(|_| TableReference::bare(""));
539                let left_context = self.ctx.clone();
540
541                let right_input = self.prom_expr_to_plan(rhs, session_state).await?;
542                let right_field_columns = self.ctx.field_columns.clone();
543                let right_time_index_column = self.ctx.time_index_column.clone();
544                let mut right_table_ref = self
545                    .table_ref()
546                    .unwrap_or_else(|_| TableReference::bare(""));
547                let right_context = self.ctx.clone();
548
549                // TODO(ruihang): avoid join if left and right are the same table
550
551                // set op has "special" join semantics
552                if Self::is_token_a_set_op(*op) {
553                    return self.set_op_on_non_field_columns(
554                        left_input,
555                        right_input,
556                        left_context,
557                        right_context,
558                        *op,
559                        modifier,
560                    );
561                }
562
563                // normal join
564                if left_table_ref == right_table_ref {
565                    // rename table references to avoid ambiguity
566                    left_table_ref = TableReference::bare("lhs");
567                    right_table_ref = TableReference::bare("rhs");
568                    // `self.ctx` have ctx in right plan, if right plan have no tag,
569                    // we use left plan ctx as the ctx for subsequent calculations,
570                    // to avoid case like `host + scalar(...)`
571                    // we need preserve tag column on `host` table in subsequent projection,
572                    // which only show in left plan ctx.
573                    if self.ctx.tag_columns.is_empty() {
574                        self.ctx = left_context.clone();
575                        self.ctx.table_name = Some("lhs".to_string());
576                    } else {
577                        self.ctx.table_name = Some("rhs".to_string());
578                    }
579                }
580                let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
581
582                let join_plan = self.join_on_non_field_columns(
583                    left_input,
584                    right_input,
585                    left_table_ref.clone(),
586                    right_table_ref.clone(),
587                    left_time_index_column,
588                    right_time_index_column,
589                    // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
590                    // under this case we only join on time index
591                    left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
592                    modifier,
593                )?;
594                let join_plan_schema = join_plan.schema().clone();
595
596                let bin_expr_builder = |_: &String| {
597                    let (left_col_name, right_col_name) = field_columns.next().unwrap();
598                    let left_col = join_plan_schema
599                        .qualified_field_with_name(Some(&left_table_ref), left_col_name)
600                        .context(DataFusionPlanningSnafu)?
601                        .into();
602                    let right_col = join_plan_schema
603                        .qualified_field_with_name(Some(&right_table_ref), right_col_name)
604                        .context(DataFusionPlanningSnafu)?
605                        .into();
606
607                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
608                    let mut binary_expr =
609                        binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
610                    if is_comparison_op && should_return_bool {
611                        binary_expr = DfExpr::Cast(Cast {
612                            expr: Box::new(binary_expr),
613                            data_type: ArrowDataType::Float64,
614                        });
615                    }
616                    Ok(binary_expr)
617                };
618                if is_comparison_op && !should_return_bool {
619                    self.filter_on_field_column(join_plan, bin_expr_builder)
620                } else {
621                    self.projection_for_each_field_column(join_plan, bin_expr_builder)
622                }
623            }
624        }
625    }
626
627    fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
628        let NumberLiteral { val } = number_literal;
629        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
630        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
631        self.ctx.reset_table_name_and_schema();
632        let literal_expr = df_prelude::lit(*val);
633
634        let plan = LogicalPlan::Extension(Extension {
635            node: Arc::new(
636                EmptyMetric::new(
637                    self.ctx.start,
638                    self.ctx.end,
639                    self.ctx.interval,
640                    SPECIAL_TIME_FUNCTION.to_string(),
641                    DEFAULT_FIELD_COLUMN.to_string(),
642                    Some(literal_expr),
643                )
644                .context(DataFusionPlanningSnafu)?,
645            ),
646        });
647        Ok(plan)
648    }
649
650    fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
651        let StringLiteral { val } = string_literal;
652        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
653        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
654        self.ctx.reset_table_name_and_schema();
655        let literal_expr = df_prelude::lit(val.to_string());
656
657        let plan = LogicalPlan::Extension(Extension {
658            node: Arc::new(
659                EmptyMetric::new(
660                    self.ctx.start,
661                    self.ctx.end,
662                    self.ctx.interval,
663                    SPECIAL_TIME_FUNCTION.to_string(),
664                    DEFAULT_FIELD_COLUMN.to_string(),
665                    Some(literal_expr),
666                )
667                .context(DataFusionPlanningSnafu)?,
668            ),
669        });
670        Ok(plan)
671    }
672
673    async fn prom_vector_selector_to_plan(
674        &mut self,
675        vector_selector: &VectorSelector,
676    ) -> Result<LogicalPlan> {
677        let VectorSelector {
678            name,
679            offset,
680            matchers,
681            at: _,
682        } = vector_selector;
683        let matchers = self.preprocess_label_matchers(matchers, name)?;
684        if let Some(empty_plan) = self.setup_context().await? {
685            return Ok(empty_plan);
686        }
687        let normalize = self
688            .selector_to_series_normalize_plan(offset, matchers, false)
689            .await?;
690        let manipulate = InstantManipulate::new(
691            self.ctx.start,
692            self.ctx.end,
693            self.ctx.lookback_delta,
694            self.ctx.interval,
695            self.ctx
696                .time_index_column
697                .clone()
698                .expect("time index should be set in `setup_context`"),
699            self.ctx.field_columns.first().cloned(),
700            normalize,
701        );
702        Ok(LogicalPlan::Extension(Extension {
703            node: Arc::new(manipulate),
704        }))
705    }
706
707    async fn prom_matrix_selector_to_plan(
708        &mut self,
709        matrix_selector: &MatrixSelector,
710    ) -> Result<LogicalPlan> {
711        let MatrixSelector { vs, range } = matrix_selector;
712        let VectorSelector {
713            name,
714            offset,
715            matchers,
716            ..
717        } = vs;
718        let matchers = self.preprocess_label_matchers(matchers, name)?;
719        if let Some(empty_plan) = self.setup_context().await? {
720            return Ok(empty_plan);
721        }
722
723        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
724        let range_ms = range.as_millis() as _;
725        self.ctx.range = Some(range_ms);
726
727        let normalize = self
728            .selector_to_series_normalize_plan(offset, matchers, true)
729            .await?;
730        let manipulate = RangeManipulate::new(
731            self.ctx.start,
732            self.ctx.end,
733            self.ctx.interval,
734            // TODO(ruihang): convert via Timestamp datatypes to support different time units
735            range_ms,
736            self.ctx
737                .time_index_column
738                .clone()
739                .expect("time index should be set in `setup_context`"),
740            self.ctx.field_columns.clone(),
741            normalize,
742        )
743        .context(DataFusionPlanningSnafu)?;
744
745        Ok(LogicalPlan::Extension(Extension {
746            node: Arc::new(manipulate),
747        }))
748    }
749
750    async fn prom_call_expr_to_plan(
751        &mut self,
752        session_state: &SessionState,
753        call_expr: &Call,
754    ) -> Result<LogicalPlan> {
755        let Call { func, args } = call_expr;
756        // some special functions that are not expression but a plan
757        match func.name {
758            SPECIAL_HISTOGRAM_QUANTILE => {
759                return self.create_histogram_plan(args, session_state).await
760            }
761            SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
762            SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await,
763            _ => {}
764        }
765
766        // transform function arguments
767        let args = self.create_function_args(&args.args)?;
768        let input = if let Some(prom_expr) = &args.input {
769            self.prom_expr_to_plan(prom_expr, session_state).await?
770        } else {
771            self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
772            self.ctx.reset_table_name_and_schema();
773            self.ctx.tag_columns = vec![];
774            self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
775            LogicalPlan::Extension(Extension {
776                node: Arc::new(
777                    EmptyMetric::new(
778                        self.ctx.start,
779                        self.ctx.end,
780                        self.ctx.interval,
781                        SPECIAL_TIME_FUNCTION.to_string(),
782                        DEFAULT_FIELD_COLUMN.to_string(),
783                        None,
784                    )
785                    .context(DataFusionPlanningSnafu)?,
786                ),
787            })
788        };
789        let mut func_exprs =
790            self.create_function_expr(func, args.literals.clone(), session_state)?;
791        func_exprs.insert(0, self.create_time_index_column_expr()?);
792        func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
793
794        let builder = LogicalPlanBuilder::from(input)
795            .project(func_exprs)
796            .context(DataFusionPlanningSnafu)?
797            .filter(self.create_empty_values_filter_expr()?)
798            .context(DataFusionPlanningSnafu)?;
799
800        let builder = match func.name {
801            "sort" => builder
802                .sort(self.create_field_columns_sort_exprs(true))
803                .context(DataFusionPlanningSnafu)?,
804            "sort_desc" => builder
805                .sort(self.create_field_columns_sort_exprs(false))
806                .context(DataFusionPlanningSnafu)?,
807            "sort_by_label" => builder
808                .sort(Self::create_sort_exprs_by_tags(
809                    func.name,
810                    args.literals,
811                    true,
812                )?)
813                .context(DataFusionPlanningSnafu)?,
814            "sort_by_label_desc" => builder
815                .sort(Self::create_sort_exprs_by_tags(
816                    func.name,
817                    args.literals,
818                    false,
819                )?)
820                .context(DataFusionPlanningSnafu)?,
821
822            _ => builder,
823        };
824
825        builder.build().context(DataFusionPlanningSnafu)
826    }
827
828    async fn prom_ext_expr_to_plan(
829        &mut self,
830        session_state: &SessionState,
831        ext_expr: &promql_parser::parser::ast::Extension,
832    ) -> Result<LogicalPlan> {
833        // let promql_parser::parser::ast::Extension { expr } = ext_expr;
834        let expr = &ext_expr.expr;
835        let children = expr.children();
836        let plan = self.prom_expr_to_plan(&children[0], session_state).await?;
837        // Wrapper for the explanation/analyze of the existing plan
838        // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
839        // if `analyze` is true, runs the actual plan and produces
840        // information about metrics during run.
841        // if `verbose` is true, prints out additional details when VERBOSE keyword is specified
842        match expr.name() {
843            "ANALYZE" => LogicalPlanBuilder::from(plan)
844                .explain(false, true)
845                .unwrap()
846                .build()
847                .context(DataFusionPlanningSnafu),
848            "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
849                .explain(true, true)
850                .unwrap()
851                .build()
852                .context(DataFusionPlanningSnafu),
853            "EXPLAIN" => LogicalPlanBuilder::from(plan)
854                .explain(false, false)
855                .unwrap()
856                .build()
857                .context(DataFusionPlanningSnafu),
858            "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
859                .explain(true, false)
860                .unwrap()
861                .build()
862                .context(DataFusionPlanningSnafu),
863            _ => LogicalPlanBuilder::empty(true)
864                .build()
865                .context(DataFusionPlanningSnafu),
866        }
867    }
868
869    /// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].
870    /// Returns a new [Matchers] that doesn't contain metric name matcher.
871    ///
872    /// Each call to this function means new selector is started. Thus, the context will be reset
873    /// at first.
874    ///
875    /// Name rule:
876    /// - if `name` is some, then the matchers MUST NOT contain `__name__` matcher.
877    /// - if `name` is none, then the matchers MAY contain NONE OR MULTIPLE `__name__` matchers.
878    #[allow(clippy::mutable_key_type)]
879    fn preprocess_label_matchers(
880        &mut self,
881        label_matchers: &Matchers,
882        name: &Option<String>,
883    ) -> Result<Matchers> {
884        self.ctx.reset();
885
886        let metric_name;
887        if let Some(name) = name.clone() {
888            metric_name = Some(name);
889            ensure!(
890                label_matchers.find_matchers(METRIC_NAME).is_empty(),
891                MultipleMetricMatchersSnafu
892            );
893        } else {
894            let mut matches = label_matchers.find_matchers(METRIC_NAME);
895            ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
896            ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
897            ensure!(
898                matches[0].op == MatchOp::Equal,
899                UnsupportedMatcherOpSnafu {
900                    matcher_op: matches[0].op.to_string(),
901                    matcher: METRIC_NAME
902                }
903            );
904            metric_name = matches.pop().map(|m| m.value);
905        }
906
907        self.ctx.table_name = metric_name;
908
909        let mut matchers = HashSet::new();
910        for matcher in &label_matchers.matchers {
911            // TODO(ruihang): support other metric match ops
912            if matcher.name == FIELD_COLUMN_MATCHER {
913                self.ctx
914                    .field_column_matcher
915                    .get_or_insert_default()
916                    .push(matcher.clone());
917            } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
918                ensure!(
919                    matcher.op == MatchOp::Equal,
920                    UnsupportedMatcherOpSnafu {
921                        matcher: matcher.name.to_string(),
922                        matcher_op: matcher.op.to_string(),
923                    }
924                );
925                self.ctx.schema_name = Some(matcher.value.clone());
926            } else if matcher.name != METRIC_NAME {
927                let _ = matchers.insert(matcher.clone());
928            }
929        }
930
931        Ok(Matchers::new(
932            matchers.into_iter().map(normalize_matcher).collect(),
933        ))
934    }
935
936    async fn selector_to_series_normalize_plan(
937        &mut self,
938        offset: &Option<Offset>,
939        label_matchers: Matchers,
940        is_range_selector: bool,
941    ) -> Result<LogicalPlan> {
942        // make table scan plan
943        let table_ref = self.table_ref()?;
944        let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
945        let table_schema = table_scan.schema();
946
947        // make filter exprs
948        let offset_duration = match offset {
949            Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
950            Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
951            None => 0,
952        };
953        let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
954        if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
955            scan_filters.push(time_index_filter);
956        }
957        table_scan = LogicalPlanBuilder::from(table_scan)
958            .filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
959            .context(DataFusionPlanningSnafu)?
960            .build()
961            .context(DataFusionPlanningSnafu)?;
962
963        // make a projection plan if there is any `__field__` matcher
964        if let Some(field_matchers) = &self.ctx.field_column_matcher {
965            let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
966            // opt-in set
967            let mut result_set = HashSet::new();
968            // opt-out set
969            let mut reverse_set = HashSet::new();
970            for matcher in field_matchers {
971                match &matcher.op {
972                    MatchOp::Equal => {
973                        if col_set.contains(&matcher.value) {
974                            let _ = result_set.insert(matcher.value.clone());
975                        } else {
976                            return Err(ColumnNotFoundSnafu {
977                                col: matcher.value.clone(),
978                            }
979                            .build());
980                        }
981                    }
982                    MatchOp::NotEqual => {
983                        if col_set.contains(&matcher.value) {
984                            let _ = reverse_set.insert(matcher.value.clone());
985                        } else {
986                            return Err(ColumnNotFoundSnafu {
987                                col: matcher.value.clone(),
988                            }
989                            .build());
990                        }
991                    }
992                    MatchOp::Re(regex) => {
993                        for col in &self.ctx.field_columns {
994                            if regex.is_match(col) {
995                                let _ = result_set.insert(col.clone());
996                            }
997                        }
998                    }
999                    MatchOp::NotRe(regex) => {
1000                        for col in &self.ctx.field_columns {
1001                            if regex.is_match(col) {
1002                                let _ = reverse_set.insert(col.clone());
1003                            }
1004                        }
1005                    }
1006                }
1007            }
1008            // merge two set
1009            if result_set.is_empty() {
1010                result_set = col_set.into_iter().cloned().collect();
1011            }
1012            for col in reverse_set {
1013                let _ = result_set.remove(&col);
1014            }
1015
1016            // mask the field columns in context using computed result set
1017            self.ctx.field_columns = self
1018                .ctx
1019                .field_columns
1020                .drain(..)
1021                .filter(|col| result_set.contains(col))
1022                .collect();
1023
1024            let exprs = result_set
1025                .into_iter()
1026                .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1027                .chain(self.create_tag_column_exprs()?)
1028                .chain(Some(self.create_time_index_column_expr()?))
1029                .collect::<Vec<_>>();
1030
1031            // reuse this variable for simplicity
1032            table_scan = LogicalPlanBuilder::from(table_scan)
1033                .project(exprs)
1034                .context(DataFusionPlanningSnafu)?
1035                .build()
1036                .context(DataFusionPlanningSnafu)?;
1037        }
1038
1039        // make sort plan
1040        let sort_plan = LogicalPlanBuilder::from(table_scan)
1041            .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1042            .context(DataFusionPlanningSnafu)?
1043            .build()
1044            .context(DataFusionPlanningSnafu)?;
1045
1046        // make divide plan
1047        let time_index_column =
1048            self.ctx
1049                .time_index_column
1050                .clone()
1051                .with_context(|| TimeIndexNotFoundSnafu {
1052                    table: table_ref.to_string(),
1053                })?;
1054        let divide_plan = LogicalPlan::Extension(Extension {
1055            node: Arc::new(SeriesDivide::new(
1056                self.ctx.tag_columns.clone(),
1057                time_index_column,
1058                sort_plan,
1059            )),
1060        });
1061
1062        // make series_normalize plan
1063        if !is_range_selector && offset_duration == 0 {
1064            return Ok(divide_plan);
1065        }
1066        let series_normalize = SeriesNormalize::new(
1067            offset_duration,
1068            self.ctx
1069                .time_index_column
1070                .clone()
1071                .with_context(|| TimeIndexNotFoundSnafu {
1072                    table: table_ref.to_quoted_string(),
1073                })?,
1074            is_range_selector,
1075            self.ctx.tag_columns.clone(),
1076            divide_plan,
1077        );
1078        let logical_plan = LogicalPlan::Extension(Extension {
1079            node: Arc::new(series_normalize),
1080        });
1081
1082        Ok(logical_plan)
1083    }
1084
1085    /// Convert [LabelModifier] to [Column] exprs for aggregation.
1086    /// Timestamp column and tag columns will be included.
1087    ///
1088    /// # Side effect
1089    ///
1090    /// This method will also change the tag columns in ctx if `update_ctx` is true.
1091    fn agg_modifier_to_col(
1092        &mut self,
1093        input_schema: &DFSchemaRef,
1094        modifier: &Option<LabelModifier>,
1095        update_ctx: bool,
1096    ) -> Result<Vec<DfExpr>> {
1097        match modifier {
1098            None => {
1099                if update_ctx {
1100                    self.ctx.tag_columns.clear();
1101                }
1102                Ok(vec![self.create_time_index_column_expr()?])
1103            }
1104            Some(LabelModifier::Include(labels)) => {
1105                if update_ctx {
1106                    self.ctx.tag_columns.clear();
1107                }
1108                let mut exprs = Vec::with_capacity(labels.labels.len());
1109                for label in &labels.labels {
1110                    // nonexistence label will be ignored
1111                    if let Ok(field) = input_schema.field_with_unqualified_name(label) {
1112                        exprs.push(DfExpr::Column(Column::from(field.name())));
1113
1114                        if update_ctx {
1115                            // update the tag columns in context
1116                            self.ctx.tag_columns.push(label.clone());
1117                        }
1118                    }
1119                }
1120                // add timestamp column
1121                exprs.push(self.create_time_index_column_expr()?);
1122
1123                Ok(exprs)
1124            }
1125            Some(LabelModifier::Exclude(labels)) => {
1126                let mut all_fields = input_schema
1127                    .fields()
1128                    .iter()
1129                    .map(|f| f.name())
1130                    .collect::<BTreeSet<_>>();
1131
1132                // remove "without"-ed fields
1133                // nonexistence label will be ignored
1134                for label in &labels.labels {
1135                    let _ = all_fields.remove(label);
1136                }
1137
1138                // remove time index and value fields
1139                if let Some(time_index) = &self.ctx.time_index_column {
1140                    let _ = all_fields.remove(time_index);
1141                }
1142                for value in &self.ctx.field_columns {
1143                    let _ = all_fields.remove(value);
1144                }
1145
1146                if update_ctx {
1147                    // change the tag columns in context
1148                    self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1149                }
1150
1151                // collect remaining fields and convert to col expr
1152                let mut exprs = all_fields
1153                    .into_iter()
1154                    .map(|c| DfExpr::Column(Column::from(c)))
1155                    .collect::<Vec<_>>();
1156
1157                // add timestamp column
1158                exprs.push(self.create_time_index_column_expr()?);
1159
1160                Ok(exprs)
1161            }
1162        }
1163    }
1164
1165    // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
1166    pub fn matchers_to_expr(
1167        label_matchers: Matchers,
1168        table_schema: &DFSchemaRef,
1169    ) -> Result<Vec<DfExpr>> {
1170        let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1171        for matcher in label_matchers.matchers {
1172            let col = if table_schema
1173                .field_with_unqualified_name(&matcher.name)
1174                .is_err()
1175            {
1176                DfExpr::Literal(ScalarValue::Utf8(Some(String::new()))).alias(matcher.name)
1177            } else {
1178                DfExpr::Column(Column::from_name(matcher.name))
1179            };
1180            let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)));
1181            let expr = match matcher.op {
1182                MatchOp::Equal => col.eq(lit),
1183                MatchOp::NotEqual => col.not_eq(lit),
1184                MatchOp::Re(re) => {
1185                    // TODO(ruihang): a more programmatic way to handle this in datafusion
1186                    if re.as_str() == ".*" {
1187                        continue;
1188                    }
1189                    DfExpr::BinaryExpr(BinaryExpr {
1190                        left: Box::new(col),
1191                        op: Operator::RegexMatch,
1192                        right: Box::new(DfExpr::Literal(ScalarValue::Utf8(Some(
1193                            re.as_str().to_string(),
1194                        )))),
1195                    })
1196                }
1197                MatchOp::NotRe(re) => DfExpr::BinaryExpr(BinaryExpr {
1198                    left: Box::new(col),
1199                    op: Operator::RegexNotMatch,
1200                    right: Box::new(DfExpr::Literal(ScalarValue::Utf8(Some(
1201                        re.as_str().to_string(),
1202                    )))),
1203                }),
1204            };
1205            exprs.push(expr);
1206        }
1207
1208        Ok(exprs)
1209    }
1210
1211    fn table_ref(&self) -> Result<TableReference> {
1212        let table_name = self
1213            .ctx
1214            .table_name
1215            .clone()
1216            .context(TableNameNotFoundSnafu)?;
1217
1218        // set schema name if `__schema__` is given
1219        let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1220            TableReference::partial(schema_name.as_str(), table_name.as_str())
1221        } else {
1222            TableReference::bare(table_name.as_str())
1223        };
1224
1225        Ok(table_ref)
1226    }
1227
1228    fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1229        let start = self.ctx.start;
1230        let end = self.ctx.end;
1231        if end < start {
1232            return InvalidTimeRangeSnafu { start, end }.fail();
1233        }
1234        let lookback_delta = self.ctx.lookback_delta;
1235        let range = self.ctx.range.unwrap_or_default();
1236        let interval = self.ctx.interval;
1237        let time_index_expr = self.create_time_index_column_expr()?;
1238        let num_points = (end - start) / interval;
1239
1240        // Scan a continuous time range
1241        if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1242            let single_time_range = time_index_expr
1243                .clone()
1244                .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1245                    Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
1246                    None,
1247                )))
1248                .and(
1249                    time_index_expr.lt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1250                        Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
1251                        None,
1252                    ))),
1253                );
1254            return Ok(Some(single_time_range));
1255        }
1256
1257        // Otherwise scan scatter ranges separately
1258        let mut filters = Vec::with_capacity(num_points as usize);
1259        for timestamp in (start..end).step_by(interval as usize) {
1260            filters.push(
1261                time_index_expr
1262                    .clone()
1263                    .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1264                        Some(timestamp - offset_duration - lookback_delta - range),
1265                        None,
1266                    )))
1267                    .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1268                        ScalarValue::TimestampMillisecond(
1269                            Some(timestamp - offset_duration + lookback_delta),
1270                            None,
1271                        ),
1272                    ))),
1273            )
1274        }
1275
1276        Ok(filters.into_iter().reduce(DfExpr::or))
1277    }
1278
1279    /// Create a table scan plan and a filter plan with given filter.
1280    ///
1281    /// # Panic
1282    /// If the filter is empty
1283    async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1284        let provider = self
1285            .table_provider
1286            .resolve_table(table_ref.clone())
1287            .await
1288            .context(CatalogSnafu)?;
1289
1290        let is_time_index_ms = provider
1291            .as_any()
1292            .downcast_ref::<DefaultTableSource>()
1293            .context(UnknownTableSnafu)?
1294            .table_provider
1295            .as_any()
1296            .downcast_ref::<DfTableProviderAdapter>()
1297            .context(UnknownTableSnafu)?
1298            .table()
1299            .schema()
1300            .timestamp_column()
1301            .with_context(|| TimeIndexNotFoundSnafu {
1302                table: table_ref.to_quoted_string(),
1303            })?
1304            .data_type
1305            == ConcreteDataType::timestamp_millisecond_datatype();
1306
1307        let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1308            .context(DataFusionPlanningSnafu)?
1309            .build()
1310            .context(DataFusionPlanningSnafu)?;
1311
1312        if !is_time_index_ms {
1313            // cast to ms if time_index not in Millisecond precision
1314            let expr: Vec<_> = self
1315                .ctx
1316                .field_columns
1317                .iter()
1318                .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1319                .chain(self.create_tag_column_exprs()?)
1320                .chain(Some(DfExpr::Alias(Alias {
1321                    expr: Box::new(DfExpr::Cast(Cast {
1322                        expr: Box::new(self.create_time_index_column_expr()?),
1323                        data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1324                    })),
1325                    relation: Some(table_ref.clone()),
1326                    name: self
1327                        .ctx
1328                        .time_index_column
1329                        .as_ref()
1330                        .with_context(|| TimeIndexNotFoundSnafu {
1331                            table: table_ref.to_quoted_string(),
1332                        })?
1333                        .clone(),
1334                })))
1335                .collect::<Vec<_>>();
1336            scan_plan = LogicalPlanBuilder::from(scan_plan)
1337                .project(expr)
1338                .context(DataFusionPlanningSnafu)?
1339                .build()
1340                .context(DataFusionPlanningSnafu)?;
1341        }
1342
1343        let result = LogicalPlanBuilder::from(scan_plan)
1344            .build()
1345            .context(DataFusionPlanningSnafu)?;
1346        Ok(result)
1347    }
1348
1349    /// Setup [PromPlannerContext]'s state fields.
1350    ///
1351    /// Returns a logical plan for an empty metric.
1352    async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1353        let table_ref = self.table_ref()?;
1354        let table = match self.table_provider.resolve_table(table_ref.clone()).await {
1355            Err(e) if e.status_code() == StatusCode::TableNotFound => {
1356                let plan = self.setup_context_for_empty_metric()?;
1357                return Ok(Some(plan));
1358            }
1359            res => res.context(CatalogSnafu)?,
1360        };
1361        let table = table
1362            .as_any()
1363            .downcast_ref::<DefaultTableSource>()
1364            .context(UnknownTableSnafu)?
1365            .table_provider
1366            .as_any()
1367            .downcast_ref::<DfTableProviderAdapter>()
1368            .context(UnknownTableSnafu)?
1369            .table();
1370
1371        // set time index column name
1372        let time_index = table
1373            .schema()
1374            .timestamp_column()
1375            .with_context(|| TimeIndexNotFoundSnafu {
1376                table: table_ref.to_quoted_string(),
1377            })?
1378            .name
1379            .clone();
1380        self.ctx.time_index_column = Some(time_index);
1381
1382        // set values columns
1383        let values = table
1384            .table_info()
1385            .meta
1386            .field_column_names()
1387            .cloned()
1388            .collect();
1389        self.ctx.field_columns = values;
1390
1391        // set primary key (tag) columns
1392        let tags = table
1393            .table_info()
1394            .meta
1395            .row_key_column_names()
1396            .filter(|col| {
1397                // remove metric engine's internal columns
1398                col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1399            })
1400            .cloned()
1401            .collect();
1402        self.ctx.tag_columns = tags;
1403
1404        Ok(None)
1405    }
1406
1407    /// Setup [PromPlannerContext]'s state fields for a non existent table
1408    /// without any rows.
1409    fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
1410        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1411        self.ctx.reset_table_name_and_schema();
1412        self.ctx.tag_columns = vec![];
1413        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1414
1415        // The table doesn't have any data, so we set start to 0 and end to -1.
1416        let plan = LogicalPlan::Extension(Extension {
1417            node: Arc::new(
1418                EmptyMetric::new(
1419                    0,
1420                    -1,
1421                    self.ctx.interval,
1422                    SPECIAL_TIME_FUNCTION.to_string(),
1423                    DEFAULT_FIELD_COLUMN.to_string(),
1424                    Some(DfExpr::Literal(ScalarValue::Float64(Some(0.0)))),
1425                )
1426                .context(DataFusionPlanningSnafu)?,
1427            ),
1428        });
1429        Ok(plan)
1430    }
1431
1432    // TODO(ruihang): insert column expr
1433    fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1434        let mut result = FunctionArgs::default();
1435
1436        for arg in args {
1437            match *arg.clone() {
1438                PromExpr::Subquery(_)
1439                | PromExpr::VectorSelector(_)
1440                | PromExpr::MatrixSelector(_)
1441                | PromExpr::Extension(_)
1442                | PromExpr::Aggregate(_)
1443                | PromExpr::Paren(_)
1444                | PromExpr::Call(_) => {
1445                    if result.input.replace(*arg.clone()).is_some() {
1446                        MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1447                    }
1448                }
1449
1450                _ => {
1451                    let expr =
1452                        Self::get_param_as_literal_expr(&Some(Box::new(*arg.clone())), None, None)?;
1453                    result.literals.push(expr);
1454                }
1455            }
1456        }
1457
1458        Ok(result)
1459    }
1460
1461    /// # Side Effects
1462    ///
1463    /// This method will update [PromPlannerContext]'s value fields.
1464    fn create_function_expr(
1465        &mut self,
1466        func: &Function,
1467        other_input_exprs: Vec<DfExpr>,
1468        session_state: &SessionState,
1469    ) -> Result<Vec<DfExpr>> {
1470        // TODO(ruihang): check function args list
1471        let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1472
1473        // TODO(ruihang): set this according to in-param list
1474        let field_column_pos = 0;
1475        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1476        let scalar_func = match func.name {
1477            "increase" => ScalarFunc::ExtrapolateUdf(
1478                Arc::new(Increase::scalar_udf()),
1479                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1480            ),
1481            "rate" => ScalarFunc::ExtrapolateUdf(
1482                Arc::new(Rate::scalar_udf()),
1483                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1484            ),
1485            "delta" => ScalarFunc::ExtrapolateUdf(
1486                Arc::new(Delta::scalar_udf()),
1487                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1488            ),
1489            "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1490            "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1491            "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1492            "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1493            "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1494            "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1495            "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1496            "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1497            "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1498            "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1499            "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1500            "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1501            "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1502            "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1503            "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1504            "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1505            "predict_linear" => {
1506                other_input_exprs[0] = DfExpr::Cast(Cast {
1507                    expr: Box::new(other_input_exprs[0].clone()),
1508                    data_type: ArrowDataType::Int64,
1509                });
1510                ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
1511            }
1512            "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1513            "time" => {
1514                exprs.push(build_special_time_expr(
1515                    self.ctx.time_index_column.as_ref().unwrap(),
1516                ));
1517                ScalarFunc::GeneratedExpr
1518            }
1519            "minute" => {
1520                // date_part('minute', time_index)
1521                let expr = self.date_part_on_time_index("minute")?;
1522                exprs.push(expr);
1523                ScalarFunc::GeneratedExpr
1524            }
1525            "hour" => {
1526                // date_part('hour', time_index)
1527                let expr = self.date_part_on_time_index("hour")?;
1528                exprs.push(expr);
1529                ScalarFunc::GeneratedExpr
1530            }
1531            "month" => {
1532                // date_part('month', time_index)
1533                let expr = self.date_part_on_time_index("month")?;
1534                exprs.push(expr);
1535                ScalarFunc::GeneratedExpr
1536            }
1537            "year" => {
1538                // date_part('year', time_index)
1539                let expr = self.date_part_on_time_index("year")?;
1540                exprs.push(expr);
1541                ScalarFunc::GeneratedExpr
1542            }
1543            "day_of_month" => {
1544                // date_part('day', time_index)
1545                let expr = self.date_part_on_time_index("day")?;
1546                exprs.push(expr);
1547                ScalarFunc::GeneratedExpr
1548            }
1549            "day_of_week" => {
1550                // date_part('dow', time_index)
1551                let expr = self.date_part_on_time_index("dow")?;
1552                exprs.push(expr);
1553                ScalarFunc::GeneratedExpr
1554            }
1555            "day_of_year" => {
1556                // date_part('doy', time_index)
1557                let expr = self.date_part_on_time_index("doy")?;
1558                exprs.push(expr);
1559                ScalarFunc::GeneratedExpr
1560            }
1561            "days_in_month" => {
1562                // date_part(
1563                //     'days',
1564                //     (date_trunc('month', <TIME INDEX>::date) + interval '1 month - 1 day')
1565                // );
1566                let day_lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some("day".to_string())));
1567                let month_lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some("month".to_string())));
1568                let interval_1month_lit_expr =
1569                    DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)));
1570                let interval_1day_lit_expr = DfExpr::Literal(ScalarValue::IntervalDayTime(Some(
1571                    IntervalDayTime::new(1, 0),
1572                )));
1573                let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1574                    left: Box::new(interval_1month_lit_expr),
1575                    op: Operator::Minus,
1576                    right: Box::new(interval_1day_lit_expr),
1577                });
1578                let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1579                    func: datafusion_functions::datetime::date_trunc(),
1580                    args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1581                });
1582                let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1583                    left: Box::new(date_trunc_expr),
1584                    op: Operator::Plus,
1585                    right: Box::new(the_1month_minus_1day_expr),
1586                });
1587                let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1588                    func: datafusion_functions::datetime::date_part(),
1589                    args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1590                });
1591
1592                exprs.push(date_part_expr);
1593                ScalarFunc::GeneratedExpr
1594            }
1595
1596            "label_join" => {
1597                let (concat_expr, dst_label) =
1598                    Self::build_concat_labels_expr(&mut other_input_exprs, session_state)?;
1599
1600                // Reserve the current field columns except the `dst_label`.
1601                for value in &self.ctx.field_columns {
1602                    if *value != dst_label {
1603                        let expr = DfExpr::Column(Column::from_name(value));
1604                        exprs.push(expr);
1605                    }
1606                }
1607
1608                // Remove it from tag columns
1609                self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1610
1611                // Add the new label expr
1612                exprs.push(concat_expr);
1613
1614                ScalarFunc::GeneratedExpr
1615            }
1616            "label_replace" => {
1617                let (replace_expr, dst_label) =
1618                    Self::build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?;
1619
1620                // Reserve the current field columns except the `dst_label`.
1621                for value in &self.ctx.field_columns {
1622                    if *value != dst_label {
1623                        let expr = DfExpr::Column(Column::from_name(value));
1624                        exprs.push(expr);
1625                    }
1626                }
1627
1628                // Remove it from tag columns
1629                self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1630
1631                // Add the new label expr
1632                exprs.push(replace_expr);
1633
1634                ScalarFunc::GeneratedExpr
1635            }
1636            "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" => {
1637                // These functions are not expression but a part of plan,
1638                // they are processed by `prom_call_expr_to_plan`.
1639                for value in &self.ctx.field_columns {
1640                    let expr = DfExpr::Column(Column::from_name(value));
1641                    exprs.push(expr);
1642                }
1643
1644                ScalarFunc::GeneratedExpr
1645            }
1646            "round" => {
1647                if other_input_exprs.is_empty() {
1648                    other_input_exprs.push_front(DfExpr::Literal(ScalarValue::Float64(Some(0.0))));
1649                }
1650                ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1651            }
1652
1653            _ => {
1654                if let Some(f) = session_state.scalar_functions().get(func.name) {
1655                    ScalarFunc::DataFusionBuiltin(f.clone())
1656                } else if let Some(f) = datafusion_functions::math::functions()
1657                    .iter()
1658                    .find(|f| f.name() == func.name)
1659                {
1660                    ScalarFunc::DataFusionUdf(f.clone())
1661                } else {
1662                    return UnsupportedExprSnafu {
1663                        name: func.name.to_string(),
1664                    }
1665                    .fail();
1666                }
1667            }
1668        };
1669
1670        for value in &self.ctx.field_columns {
1671            let col_expr = DfExpr::Column(Column::from_name(value));
1672
1673            match scalar_func.clone() {
1674                ScalarFunc::DataFusionBuiltin(func) => {
1675                    other_input_exprs.insert(field_column_pos, col_expr);
1676                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1677                        func,
1678                        args: other_input_exprs.clone().into(),
1679                    });
1680                    exprs.push(fn_expr);
1681                    let _ = other_input_exprs.remove(field_column_pos);
1682                }
1683                ScalarFunc::DataFusionUdf(func) => {
1684                    let args = itertools::chain!(
1685                        other_input_exprs.iter().take(field_column_pos).cloned(),
1686                        std::iter::once(col_expr),
1687                        other_input_exprs.iter().skip(field_column_pos).cloned()
1688                    )
1689                    .collect_vec();
1690                    exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1691                }
1692                ScalarFunc::Udf(func) => {
1693                    let ts_range_expr = DfExpr::Column(Column::from_name(
1694                        RangeManipulate::build_timestamp_range_name(
1695                            self.ctx.time_index_column.as_ref().unwrap(),
1696                        ),
1697                    ));
1698                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1699                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1700                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1701                        func,
1702                        args: other_input_exprs.clone().into(),
1703                    });
1704                    exprs.push(fn_expr);
1705                    let _ = other_input_exprs.remove(field_column_pos + 1);
1706                    let _ = other_input_exprs.remove(field_column_pos);
1707                }
1708                ScalarFunc::ExtrapolateUdf(func, range_length) => {
1709                    let ts_range_expr = DfExpr::Column(Column::from_name(
1710                        RangeManipulate::build_timestamp_range_name(
1711                            self.ctx.time_index_column.as_ref().unwrap(),
1712                        ),
1713                    ));
1714                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1715                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1716                    other_input_exprs
1717                        .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1718                    other_input_exprs.push_back(lit(range_length));
1719                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1720                        func,
1721                        args: other_input_exprs.clone().into(),
1722                    });
1723                    exprs.push(fn_expr);
1724                    let _ = other_input_exprs.pop_back();
1725                    let _ = other_input_exprs.remove(field_column_pos + 2);
1726                    let _ = other_input_exprs.remove(field_column_pos + 1);
1727                    let _ = other_input_exprs.remove(field_column_pos);
1728                }
1729                ScalarFunc::GeneratedExpr => {}
1730            }
1731        }
1732
1733        // update value columns' name, and alias them to remove qualifiers
1734        let mut new_field_columns = Vec::with_capacity(exprs.len());
1735
1736        exprs = exprs
1737            .into_iter()
1738            .map(|expr| {
1739                let display_name = expr.schema_name().to_string();
1740                new_field_columns.push(display_name.clone());
1741                Ok(expr.alias(display_name))
1742            })
1743            .collect::<std::result::Result<Vec<_>, _>>()
1744            .context(DataFusionPlanningSnafu)?;
1745
1746        self.ctx.field_columns = new_field_columns;
1747
1748        Ok(exprs)
1749    }
1750
1751    /// Build expr for `label_replace` function
1752    fn build_regexp_replace_label_expr(
1753        other_input_exprs: &mut VecDeque<DfExpr>,
1754        session_state: &SessionState,
1755    ) -> Result<(DfExpr, String)> {
1756        // label_replace(vector, dst_label, replacement, src_label, regex)
1757        let dst_label = match other_input_exprs.pop_front() {
1758            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1759            other => UnexpectedPlanExprSnafu {
1760                desc: format!("expected dst_label string literal, but found {:?}", other),
1761            }
1762            .fail()?,
1763        };
1764        let replacement = match other_input_exprs.pop_front() {
1765            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
1766            other => UnexpectedPlanExprSnafu {
1767                desc: format!("expected replacement string literal, but found {:?}", other),
1768            }
1769            .fail()?,
1770        };
1771        let src_label = match other_input_exprs.pop_front() {
1772            Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)))) => s,
1773            other => UnexpectedPlanExprSnafu {
1774                desc: format!("expected src_label string literal, but found {:?}", other),
1775            }
1776            .fail()?,
1777        };
1778        let regex = match other_input_exprs.pop_front() {
1779            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
1780            other => UnexpectedPlanExprSnafu {
1781                desc: format!("expected regex string literal, but found {:?}", other),
1782            }
1783            .fail()?,
1784        };
1785
1786        let func = session_state
1787            .scalar_functions()
1788            .get("regexp_replace")
1789            .context(UnsupportedExprSnafu {
1790                name: "regexp_replace",
1791            })?;
1792
1793        // regexp_replace(src_label, regex, replacement)
1794        let args = vec![
1795            if src_label.is_empty() {
1796                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())))
1797            } else {
1798                DfExpr::Column(Column::from_name(src_label))
1799            },
1800            DfExpr::Literal(ScalarValue::Utf8(Some(regex))),
1801            DfExpr::Literal(ScalarValue::Utf8(Some(replacement))),
1802        ];
1803
1804        Ok((
1805            DfExpr::ScalarFunction(ScalarFunction {
1806                func: func.clone(),
1807                args,
1808            })
1809            .alias(&dst_label),
1810            dst_label,
1811        ))
1812    }
1813
1814    /// Build expr for `label_join` function
1815    fn build_concat_labels_expr(
1816        other_input_exprs: &mut VecDeque<DfExpr>,
1817        session_state: &SessionState,
1818    ) -> Result<(DfExpr, String)> {
1819        // label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)
1820
1821        let dst_label = match other_input_exprs.pop_front() {
1822            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1823            other => UnexpectedPlanExprSnafu {
1824                desc: format!("expected dst_label string literal, but found {:?}", other),
1825            }
1826            .fail()?,
1827        };
1828        let separator = match other_input_exprs.pop_front() {
1829            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1830            other => UnexpectedPlanExprSnafu {
1831                desc: format!("expected separator string literal, but found {:?}", other),
1832            }
1833            .fail()?,
1834        };
1835        let src_labels = other_input_exprs
1836            .clone()
1837            .into_iter()
1838            .map(|expr| {
1839                // Cast source label into column
1840                match expr {
1841                    DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
1842                        if label.is_empty() {
1843                            Ok(DfExpr::Literal(ScalarValue::Null))
1844                        } else {
1845                            Ok(DfExpr::Column(Column::from_name(label)))
1846                        }
1847                    }
1848                    other => UnexpectedPlanExprSnafu {
1849                        desc: format!(
1850                            "expected source label string literal, but found {:?}",
1851                            other
1852                        ),
1853                    }
1854                    .fail(),
1855                }
1856            })
1857            .collect::<Result<Vec<_>>>()?;
1858        ensure!(
1859            !src_labels.is_empty(),
1860            FunctionInvalidArgumentSnafu {
1861                fn_name: "label_join"
1862            }
1863        );
1864
1865        let func = session_state
1866            .scalar_functions()
1867            .get("concat_ws")
1868            .context(UnsupportedExprSnafu { name: "concat_ws" })?;
1869
1870        // concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
1871        let mut args = Vec::with_capacity(1 + src_labels.len());
1872        args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator))));
1873        args.extend(src_labels);
1874
1875        Ok((
1876            DfExpr::ScalarFunction(ScalarFunction {
1877                func: func.clone(),
1878                args,
1879            })
1880            .alias(&dst_label),
1881            dst_label,
1882        ))
1883    }
1884
1885    fn create_time_index_column_expr(&self) -> Result<DfExpr> {
1886        Ok(DfExpr::Column(Column::from_name(
1887            self.ctx
1888                .time_index_column
1889                .clone()
1890                .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
1891        )))
1892    }
1893
1894    fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
1895        let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
1896        for tag in &self.ctx.tag_columns {
1897            let expr = DfExpr::Column(Column::from_name(tag));
1898            result.push(expr);
1899        }
1900        Ok(result)
1901    }
1902
1903    fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
1904        let mut result = Vec::with_capacity(self.ctx.field_columns.len());
1905        for field in &self.ctx.field_columns {
1906            let expr = DfExpr::Column(Column::from_name(field));
1907            result.push(expr);
1908        }
1909        Ok(result)
1910    }
1911
1912    fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
1913        let mut result = self
1914            .ctx
1915            .tag_columns
1916            .iter()
1917            .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
1918            .collect::<Vec<_>>();
1919        result.push(self.create_time_index_column_expr()?.sort(true, true));
1920        Ok(result)
1921    }
1922
1923    fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
1924        self.ctx
1925            .field_columns
1926            .iter()
1927            .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
1928            .collect::<Vec<_>>()
1929    }
1930
1931    fn create_sort_exprs_by_tags(
1932        func: &str,
1933        tags: Vec<DfExpr>,
1934        asc: bool,
1935    ) -> Result<Vec<SortExpr>> {
1936        ensure!(
1937            !tags.is_empty(),
1938            FunctionInvalidArgumentSnafu { fn_name: func }
1939        );
1940
1941        tags.iter()
1942            .map(|col| match col {
1943                DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
1944                    Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
1945                }
1946                other => UnexpectedPlanExprSnafu {
1947                    desc: format!("expected label string literal, but found {:?}", other),
1948                }
1949                .fail(),
1950            })
1951            .collect::<Result<Vec<_>>>()
1952    }
1953
1954    fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
1955        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1956        for value in &self.ctx.field_columns {
1957            let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
1958            exprs.push(expr);
1959        }
1960
1961        conjunction(exprs).context(ValueNotFoundSnafu {
1962            table: self.table_ref()?.to_quoted_string(),
1963        })
1964    }
1965
1966    /// Creates a set of DataFusion `DfExpr::AggregateFunction` expressions for each value column using the specified aggregate function.
1967    ///
1968    /// # Side Effects
1969    ///
1970    /// This method modifies the value columns in the context by replacing them with the new columns
1971    /// created by the aggregate function application.
1972    ///
1973    /// # Returns
1974    ///
1975    /// Returns a tuple of `(aggregate_expressions, previous_field_expressions)` where:
1976    /// - `aggregate_expressions`: Expressions that apply the aggregate function to the original fields
1977    /// - `previous_field_expressions`: Original field expressions before aggregation. This is non-empty
1978    ///   only when the operation is `count_values`, as this operation requires preserving the original
1979    ///   values for grouping.
1980    ///
1981    fn create_aggregate_exprs(
1982        &mut self,
1983        op: TokenType,
1984        param: &Option<Box<PromExpr>>,
1985        input_plan: &LogicalPlan,
1986    ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
1987        let mut non_col_args = Vec::new();
1988        let aggr = match op.id() {
1989            token::T_SUM => sum_udaf(),
1990            token::T_QUANTILE => {
1991                let q =
1992                    Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
1993                non_col_args.push(q);
1994                quantile_udaf()
1995            }
1996            token::T_AVG => avg_udaf(),
1997            token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
1998            token::T_MIN => min_udaf(),
1999            token::T_MAX => max_udaf(),
2000            token::T_GROUP => grouping_udaf(),
2001            token::T_STDDEV => stddev_pop_udaf(),
2002            token::T_STDVAR => var_pop_udaf(),
2003            token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2004                name: format!("{op:?}"),
2005            }
2006            .fail()?,
2007            _ => UnexpectedTokenSnafu { token: op }.fail()?,
2008        };
2009
2010        // perform aggregate operation to each value column
2011        let exprs: Vec<DfExpr> = self
2012            .ctx
2013            .field_columns
2014            .iter()
2015            .map(|col| {
2016                non_col_args.push(DfExpr::Column(Column::from_name(col)));
2017                let expr = aggr.call(non_col_args.clone());
2018                non_col_args.pop();
2019                expr
2020            })
2021            .collect::<Vec<_>>();
2022
2023        // if the aggregator is `count_values`, it must be grouped by current fields.
2024        let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2025            let prev_field_exprs: Vec<_> = self
2026                .ctx
2027                .field_columns
2028                .iter()
2029                .map(|col| DfExpr::Column(Column::from_name(col)))
2030                .collect();
2031
2032            ensure!(
2033                self.ctx.field_columns.len() == 1,
2034                UnsupportedExprSnafu {
2035                    name: "count_values on multi-value input"
2036                }
2037            );
2038
2039            prev_field_exprs
2040        } else {
2041            vec![]
2042        };
2043
2044        // update value column name according to the aggregators,
2045        let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2046
2047        let normalized_exprs =
2048            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2049        for expr in normalized_exprs {
2050            new_field_columns.push(expr.schema_name().to_string());
2051        }
2052        self.ctx.field_columns = new_field_columns;
2053
2054        Ok((exprs, prev_field_exprs))
2055    }
2056
2057    fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2058        let param = param
2059            .as_deref()
2060            .with_context(|| FunctionInvalidArgumentSnafu {
2061                fn_name: op.to_string(),
2062            })?;
2063        let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2064            return FunctionInvalidArgumentSnafu {
2065                fn_name: op.to_string(),
2066            }
2067            .fail();
2068        };
2069
2070        Ok(val)
2071    }
2072
2073    fn get_param_as_literal_expr(
2074        param: &Option<Box<PromExpr>>,
2075        op: Option<TokenType>,
2076        expected_type: Option<ArrowDataType>,
2077    ) -> Result<DfExpr> {
2078        let prom_param = param.as_deref().with_context(|| {
2079            if let Some(op) = op {
2080                FunctionInvalidArgumentSnafu {
2081                    fn_name: op.to_string(),
2082                }
2083            } else {
2084                FunctionInvalidArgumentSnafu {
2085                    fn_name: "unknown".to_string(),
2086                }
2087            }
2088        })?;
2089
2090        let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2091            if let Some(op) = op {
2092                FunctionInvalidArgumentSnafu {
2093                    fn_name: op.to_string(),
2094                }
2095            } else {
2096                FunctionInvalidArgumentSnafu {
2097                    fn_name: "unknown".to_string(),
2098                }
2099            }
2100        })?;
2101
2102        // check if the type is expected
2103        if let Some(expected_type) = expected_type {
2104            // literal should not have reference to column
2105            let expr_type = expr
2106                .get_type(&DFSchema::empty())
2107                .context(DataFusionPlanningSnafu)?;
2108            if expected_type != expr_type {
2109                return FunctionInvalidArgumentSnafu {
2110                    fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2111                }
2112                .fail();
2113            }
2114        }
2115
2116        Ok(expr)
2117    }
2118
2119    /// Create [DfExpr::WindowFunction] expr for each value column with given window function.
2120    ///
2121    fn create_window_exprs(
2122        &mut self,
2123        op: TokenType,
2124        group_exprs: Vec<DfExpr>,
2125        input_plan: &LogicalPlan,
2126    ) -> Result<Vec<DfExpr>> {
2127        ensure!(
2128            self.ctx.field_columns.len() == 1,
2129            UnsupportedExprSnafu {
2130                name: "topk or bottomk on multi-value input"
2131            }
2132        );
2133
2134        assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2135
2136        let asc = matches!(op.id(), token::T_BOTTOMK);
2137
2138        let tag_sort_exprs = self
2139            .create_tag_column_exprs()?
2140            .into_iter()
2141            .map(|expr| expr.sort(asc, true));
2142
2143        // perform window operation to each value column
2144        let exprs: Vec<DfExpr> = self
2145            .ctx
2146            .field_columns
2147            .iter()
2148            .map(|col| {
2149                let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2150                // Order by value in the specific order
2151                sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2152                // Then tags if the values are equal,
2153                // Try to ensure the relative stability of the output results.
2154                sort_exprs.extend(tag_sort_exprs.clone());
2155
2156                DfExpr::WindowFunction(WindowFunction {
2157                    fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2158                    args: vec![],
2159                    partition_by: group_exprs.clone(),
2160                    order_by: sort_exprs,
2161                    window_frame: WindowFrame::new(Some(true)),
2162                    null_treatment: None,
2163                })
2164            })
2165            .collect();
2166
2167        let normalized_exprs =
2168            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2169        Ok(normalized_exprs)
2170    }
2171
2172    /// Try to build a [f64] from [PromExpr].
2173    #[deprecated(
2174        note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2175    )]
2176    fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2177        match expr {
2178            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2179            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2180            PromExpr::Unary(UnaryExpr { expr, .. }) => {
2181                Self::try_build_float_literal(expr).map(|f| -f)
2182            }
2183            PromExpr::StringLiteral(_)
2184            | PromExpr::Binary(_)
2185            | PromExpr::VectorSelector(_)
2186            | PromExpr::MatrixSelector(_)
2187            | PromExpr::Call(_)
2188            | PromExpr::Extension(_)
2189            | PromExpr::Aggregate(_)
2190            | PromExpr::Subquery(_) => None,
2191        }
2192    }
2193
2194    /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
2195    async fn create_histogram_plan(
2196        &mut self,
2197        args: &PromFunctionArgs,
2198        session_state: &SessionState,
2199    ) -> Result<LogicalPlan> {
2200        if args.args.len() != 2 {
2201            return FunctionInvalidArgumentSnafu {
2202                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2203            }
2204            .fail();
2205        }
2206        #[allow(deprecated)]
2207        let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2208            FunctionInvalidArgumentSnafu {
2209                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2210            }
2211        })?;
2212
2213        let input = args.args[1].as_ref().clone();
2214        let input_plan = self.prom_expr_to_plan(&input, session_state).await?;
2215
2216        if !self.ctx.has_le_tag() {
2217            return ColumnNotFoundSnafu {
2218                col: LE_COLUMN_NAME.to_string(),
2219            }
2220            .fail();
2221        }
2222        let time_index_column =
2223            self.ctx
2224                .time_index_column
2225                .clone()
2226                .with_context(|| TimeIndexNotFoundSnafu {
2227                    table: self.ctx.table_name.clone().unwrap_or_default(),
2228                })?;
2229        // FIXME(ruihang): support multi fields
2230        let field_column = self
2231            .ctx
2232            .field_columns
2233            .first()
2234            .with_context(|| FunctionInvalidArgumentSnafu {
2235                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2236            })?
2237            .clone();
2238        // remove le column from tag columns
2239        self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2240
2241        Ok(LogicalPlan::Extension(Extension {
2242            node: Arc::new(
2243                HistogramFold::new(
2244                    LE_COLUMN_NAME.to_string(),
2245                    field_column,
2246                    time_index_column,
2247                    phi,
2248                    input_plan,
2249                )
2250                .context(DataFusionPlanningSnafu)?,
2251            ),
2252        }))
2253    }
2254
2255    /// Create a [SPECIAL_VECTOR_FUNCTION] plan
2256    async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2257        if args.args.len() != 1 {
2258            return FunctionInvalidArgumentSnafu {
2259                fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2260            }
2261            .fail();
2262        }
2263        let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
2264
2265        // reuse `SPECIAL_TIME_FUNCTION` as name of time index column
2266        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2267        self.ctx.reset_table_name_and_schema();
2268        self.ctx.tag_columns = vec![];
2269        self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()];
2270        Ok(LogicalPlan::Extension(Extension {
2271            node: Arc::new(
2272                EmptyMetric::new(
2273                    self.ctx.start,
2274                    self.ctx.end,
2275                    self.ctx.interval,
2276                    SPECIAL_TIME_FUNCTION.to_string(),
2277                    GREPTIME_VALUE.to_string(),
2278                    Some(lit),
2279                )
2280                .context(DataFusionPlanningSnafu)?,
2281            ),
2282        }))
2283    }
2284
2285    /// Create a [SCALAR_FUNCTION] plan
2286    async fn create_scalar_plan(
2287        &mut self,
2288        args: &PromFunctionArgs,
2289        session_state: &SessionState,
2290    ) -> Result<LogicalPlan> {
2291        ensure!(
2292            args.len() == 1,
2293            FunctionInvalidArgumentSnafu {
2294                fn_name: SCALAR_FUNCTION
2295            }
2296        );
2297        let input = self.prom_expr_to_plan(&args.args[0], session_state).await?;
2298        ensure!(
2299            self.ctx.field_columns.len() == 1,
2300            MultiFieldsNotSupportedSnafu {
2301                operator: SCALAR_FUNCTION
2302            },
2303        );
2304        let scalar_plan = LogicalPlan::Extension(Extension {
2305            node: Arc::new(
2306                ScalarCalculate::new(
2307                    self.ctx.start,
2308                    self.ctx.end,
2309                    self.ctx.interval,
2310                    input,
2311                    self.ctx.time_index_column.as_ref().unwrap(),
2312                    &self.ctx.tag_columns,
2313                    &self.ctx.field_columns[0],
2314                    self.ctx.table_name.as_deref(),
2315                )
2316                .context(PromqlPlanNodeSnafu)?,
2317            ),
2318        });
2319        // scalar plan have no tag columns
2320        self.ctx.tag_columns.clear();
2321        self.ctx.field_columns.clear();
2322        self.ctx
2323            .field_columns
2324            .push(scalar_plan.schema().field(1).name().clone());
2325        Ok(scalar_plan)
2326    }
2327
2328    /// Try to build a DataFusion Literal Expression from PromQL Expr, return
2329    /// `None` if the input is not a literal expression.
2330    fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2331        match expr {
2332            PromExpr::NumberLiteral(NumberLiteral { val }) => {
2333                let scalar_value = ScalarValue::Float64(Some(*val));
2334                Some(DfExpr::Literal(scalar_value))
2335            }
2336            PromExpr::StringLiteral(StringLiteral { val }) => {
2337                let scalar_value = ScalarValue::Utf8(Some(val.to_string()));
2338                Some(DfExpr::Literal(scalar_value))
2339            }
2340            PromExpr::VectorSelector(_)
2341            | PromExpr::MatrixSelector(_)
2342            | PromExpr::Extension(_)
2343            | PromExpr::Aggregate(_)
2344            | PromExpr::Subquery(_) => None,
2345            PromExpr::Call(Call { func, .. }) => {
2346                if func.name == SPECIAL_TIME_FUNCTION {
2347                    Some(build_special_time_expr(SPECIAL_TIME_FUNCTION))
2348                } else {
2349                    None
2350                }
2351            }
2352            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2353            // TODO(ruihang): support Unary operator
2354            PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2355            PromExpr::Binary(PromBinaryExpr {
2356                lhs,
2357                rhs,
2358                op,
2359                modifier,
2360            }) => {
2361                let lhs = Self::try_build_literal_expr(lhs)?;
2362                let rhs = Self::try_build_literal_expr(rhs)?;
2363                let is_comparison_op = Self::is_token_a_comparison_op(*op);
2364                let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2365                let expr = expr_builder(lhs, rhs).ok()?;
2366
2367                let should_return_bool = if let Some(m) = modifier {
2368                    m.return_bool
2369                } else {
2370                    false
2371                };
2372                if is_comparison_op && should_return_bool {
2373                    Some(DfExpr::Cast(Cast {
2374                        expr: Box::new(expr),
2375                        data_type: ArrowDataType::Float64,
2376                    }))
2377                } else {
2378                    Some(expr)
2379                }
2380            }
2381        }
2382    }
2383
2384    fn try_build_special_time_expr(expr: &PromExpr, time_index_col: &str) -> Option<DfExpr> {
2385        match expr {
2386            PromExpr::Call(Call { func, .. }) => {
2387                if func.name == SPECIAL_TIME_FUNCTION {
2388                    Some(build_special_time_expr(time_index_col))
2389                } else {
2390                    None
2391                }
2392            }
2393            _ => None,
2394        }
2395    }
2396
2397    /// Return a lambda to build binary expression from token.
2398    /// Because some binary operator are function in DataFusion like `atan2` or `^`.
2399    #[allow(clippy::type_complexity)]
2400    fn prom_token_to_binary_expr_builder(
2401        token: TokenType,
2402    ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2403        match token.id() {
2404            token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2405            token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2406            token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2407            token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2408            token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2409            token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2410            token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2411            token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2412            token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2413            token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2414            token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2415            token::T_POW => Ok(Box::new(|lhs, rhs| {
2416                Ok(DfExpr::ScalarFunction(ScalarFunction {
2417                    func: datafusion_functions::math::power(),
2418                    args: vec![lhs, rhs],
2419                }))
2420            })),
2421            token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2422                Ok(DfExpr::ScalarFunction(ScalarFunction {
2423                    func: datafusion_functions::math::atan2(),
2424                    args: vec![lhs, rhs],
2425                }))
2426            })),
2427            _ => UnexpectedTokenSnafu { token }.fail(),
2428        }
2429    }
2430
2431    /// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
2432    fn is_token_a_comparison_op(token: TokenType) -> bool {
2433        matches!(
2434            token.id(),
2435            token::T_EQLC
2436                | token::T_NEQ
2437                | token::T_GTR
2438                | token::T_LSS
2439                | token::T_GTE
2440                | token::T_LTE
2441        )
2442    }
2443
2444    /// Check if the given op is a set operator (UNION, INTERSECT and EXCEPT in SQL).
2445    fn is_token_a_set_op(token: TokenType) -> bool {
2446        matches!(
2447            token.id(),
2448            token::T_LAND // INTERSECT
2449                | token::T_LOR // UNION
2450                | token::T_LUNLESS // EXCEPT
2451        )
2452    }
2453
2454    /// Build a inner join on time index column and tag columns to concat two logical plans.
2455    /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
2456    #[allow(clippy::too_many_arguments)]
2457    fn join_on_non_field_columns(
2458        &self,
2459        left: LogicalPlan,
2460        right: LogicalPlan,
2461        left_table_ref: TableReference,
2462        right_table_ref: TableReference,
2463        left_time_index_column: Option<String>,
2464        right_time_index_column: Option<String>,
2465        only_join_time_index: bool,
2466        modifier: &Option<BinModifier>,
2467    ) -> Result<LogicalPlan> {
2468        let mut left_tag_columns = if only_join_time_index {
2469            BTreeSet::new()
2470        } else {
2471            self.ctx
2472                .tag_columns
2473                .iter()
2474                .cloned()
2475                .collect::<BTreeSet<_>>()
2476        };
2477        let mut right_tag_columns = left_tag_columns.clone();
2478
2479        // apply modifier
2480        if let Some(modifier) = modifier {
2481            // apply label modifier
2482            if let Some(matching) = &modifier.matching {
2483                match matching {
2484                    // keeps columns mentioned in `on`
2485                    LabelModifier::Include(on) => {
2486                        let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2487                        left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2488                        right_tag_columns =
2489                            right_tag_columns.intersection(&mask).cloned().collect();
2490                    }
2491                    // removes columns memtioned in `ignoring`
2492                    LabelModifier::Exclude(ignoring) => {
2493                        // doesn't check existence of label
2494                        for label in &ignoring.labels {
2495                            let _ = left_tag_columns.remove(label);
2496                            let _ = right_tag_columns.remove(label);
2497                        }
2498                    }
2499                }
2500            }
2501        }
2502
2503        // push time index column if it exists
2504        if let (Some(left_time_index_column), Some(right_time_index_column)) =
2505            (left_time_index_column, right_time_index_column)
2506        {
2507            left_tag_columns.insert(left_time_index_column);
2508            right_tag_columns.insert(right_time_index_column);
2509        }
2510
2511        let right = LogicalPlanBuilder::from(right)
2512            .alias(right_table_ref)
2513            .context(DataFusionPlanningSnafu)?
2514            .build()
2515            .context(DataFusionPlanningSnafu)?;
2516
2517        // Inner Join on time index column to concat two operator
2518        LogicalPlanBuilder::from(left)
2519            .alias(left_table_ref)
2520            .context(DataFusionPlanningSnafu)?
2521            .join_detailed(
2522                right,
2523                JoinType::Inner,
2524                (
2525                    left_tag_columns
2526                        .into_iter()
2527                        .map(Column::from_name)
2528                        .collect::<Vec<_>>(),
2529                    right_tag_columns
2530                        .into_iter()
2531                        .map(Column::from_name)
2532                        .collect::<Vec<_>>(),
2533                ),
2534                None,
2535                true,
2536            )
2537            .context(DataFusionPlanningSnafu)?
2538            .build()
2539            .context(DataFusionPlanningSnafu)
2540    }
2541
2542    /// Build a set operator (AND/OR/UNLESS)
2543    fn set_op_on_non_field_columns(
2544        &mut self,
2545        left: LogicalPlan,
2546        mut right: LogicalPlan,
2547        left_context: PromPlannerContext,
2548        right_context: PromPlannerContext,
2549        op: TokenType,
2550        modifier: &Option<BinModifier>,
2551    ) -> Result<LogicalPlan> {
2552        let mut left_tag_col_set = left_context
2553            .tag_columns
2554            .iter()
2555            .cloned()
2556            .collect::<HashSet<_>>();
2557        let mut right_tag_col_set = right_context
2558            .tag_columns
2559            .iter()
2560            .cloned()
2561            .collect::<HashSet<_>>();
2562
2563        if matches!(op.id(), token::T_LOR) {
2564            return self.or_operator(
2565                left,
2566                right,
2567                left_tag_col_set,
2568                right_tag_col_set,
2569                left_context,
2570                right_context,
2571                modifier,
2572            );
2573        }
2574
2575        // apply modifier
2576        if let Some(modifier) = modifier {
2577            // one-to-many and many-to-one are not supported
2578            ensure!(
2579                matches!(
2580                    modifier.card,
2581                    VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2582                ),
2583                UnsupportedVectorMatchSnafu {
2584                    name: modifier.card.clone(),
2585                },
2586            );
2587            // apply label modifier
2588            if let Some(matching) = &modifier.matching {
2589                match matching {
2590                    // keeps columns mentioned in `on`
2591                    LabelModifier::Include(on) => {
2592                        let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2593                        left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2594                        right_tag_col_set =
2595                            right_tag_col_set.intersection(&mask).cloned().collect();
2596                    }
2597                    // removes columns memtioned in `ignoring`
2598                    LabelModifier::Exclude(ignoring) => {
2599                        // doesn't check existence of label
2600                        for label in &ignoring.labels {
2601                            let _ = left_tag_col_set.remove(label);
2602                            let _ = right_tag_col_set.remove(label);
2603                        }
2604                    }
2605                }
2606            }
2607        }
2608        // ensure two sides have the same tag columns
2609        if !matches!(op.id(), token::T_LOR) {
2610            ensure!(
2611                left_tag_col_set == right_tag_col_set,
2612                CombineTableColumnMismatchSnafu {
2613                    left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2614                    right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2615                }
2616            )
2617        };
2618        let left_time_index = left_context.time_index_column.clone().unwrap();
2619        let right_time_index = right_context.time_index_column.clone().unwrap();
2620        let join_keys = left_tag_col_set
2621            .iter()
2622            .cloned()
2623            .chain([left_time_index.clone()])
2624            .collect::<Vec<_>>();
2625        self.ctx.time_index_column = Some(left_time_index.clone());
2626
2627        // alias right time index column if necessary
2628        if left_context.time_index_column != right_context.time_index_column {
2629            let right_project_exprs = right
2630                .schema()
2631                .fields()
2632                .iter()
2633                .map(|field| {
2634                    if field.name() == &right_time_index {
2635                        DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
2636                    } else {
2637                        DfExpr::Column(Column::from_name(field.name()))
2638                    }
2639                })
2640                .collect::<Vec<_>>();
2641
2642            right = LogicalPlanBuilder::from(right)
2643                .project(right_project_exprs)
2644                .context(DataFusionPlanningSnafu)?
2645                .build()
2646                .context(DataFusionPlanningSnafu)?;
2647        }
2648
2649        ensure!(
2650            left_context.field_columns.len() == 1,
2651            MultiFieldsNotSupportedSnafu {
2652                operator: "AND operator"
2653            }
2654        );
2655        // Update the field column in context.
2656        // The AND/UNLESS operator only keep the field column in left input.
2657        let left_field_col = left_context.field_columns.first().unwrap();
2658        self.ctx.field_columns = vec![left_field_col.clone()];
2659
2660        // Generate join plan.
2661        // All set operations in PromQL are "distinct"
2662        match op.id() {
2663            token::T_LAND => LogicalPlanBuilder::from(left)
2664                .distinct()
2665                .context(DataFusionPlanningSnafu)?
2666                .join_detailed(
2667                    right,
2668                    JoinType::LeftSemi,
2669                    (join_keys.clone(), join_keys),
2670                    None,
2671                    true,
2672                )
2673                .context(DataFusionPlanningSnafu)?
2674                .build()
2675                .context(DataFusionPlanningSnafu),
2676            token::T_LUNLESS => LogicalPlanBuilder::from(left)
2677                .distinct()
2678                .context(DataFusionPlanningSnafu)?
2679                .join_detailed(
2680                    right,
2681                    JoinType::LeftAnti,
2682                    (join_keys.clone(), join_keys),
2683                    None,
2684                    true,
2685                )
2686                .context(DataFusionPlanningSnafu)?
2687                .build()
2688                .context(DataFusionPlanningSnafu),
2689            token::T_LOR => {
2690                // OR is handled at the beginning of this function, as it cannot
2691                // be expressed using JOIN like AND and UNLESS.
2692                unreachable!()
2693            }
2694            _ => UnexpectedTokenSnafu { token: op }.fail(),
2695        }
2696    }
2697
2698    // TODO(ruihang): change function name
2699    #[allow(clippy::too_many_arguments)]
2700    fn or_operator(
2701        &mut self,
2702        left: LogicalPlan,
2703        right: LogicalPlan,
2704        left_tag_cols_set: HashSet<String>,
2705        right_tag_cols_set: HashSet<String>,
2706        left_context: PromPlannerContext,
2707        right_context: PromPlannerContext,
2708        modifier: &Option<BinModifier>,
2709    ) -> Result<LogicalPlan> {
2710        // checks
2711        ensure!(
2712            left_context.field_columns.len() == right_context.field_columns.len(),
2713            CombineTableColumnMismatchSnafu {
2714                left: left_context.field_columns.clone(),
2715                right: right_context.field_columns.clone()
2716            }
2717        );
2718        ensure!(
2719            left_context.field_columns.len() == 1,
2720            MultiFieldsNotSupportedSnafu {
2721                operator: "OR operator"
2722            }
2723        );
2724
2725        // prepare hash sets
2726        let all_tags = left_tag_cols_set
2727            .union(&right_tag_cols_set)
2728            .cloned()
2729            .collect::<HashSet<_>>();
2730        let tags_not_in_left = all_tags
2731            .difference(&left_tag_cols_set)
2732            .cloned()
2733            .collect::<Vec<_>>();
2734        let tags_not_in_right = all_tags
2735            .difference(&right_tag_cols_set)
2736            .cloned()
2737            .collect::<Vec<_>>();
2738        let left_qualifier = left.schema().qualified_field(0).0.cloned();
2739        let right_qualifier = right.schema().qualified_field(0).0.cloned();
2740        let left_qualifier_string = left_qualifier
2741            .as_ref()
2742            .map(|l| l.to_string())
2743            .unwrap_or_default();
2744        let right_qualifier_string = right_qualifier
2745            .as_ref()
2746            .map(|r| r.to_string())
2747            .unwrap_or_default();
2748        let left_time_index_column =
2749            left_context
2750                .time_index_column
2751                .clone()
2752                .with_context(|| TimeIndexNotFoundSnafu {
2753                    table: left_qualifier_string.clone(),
2754                })?;
2755        let right_time_index_column =
2756            right_context
2757                .time_index_column
2758                .clone()
2759                .with_context(|| TimeIndexNotFoundSnafu {
2760                    table: right_qualifier_string.clone(),
2761                })?;
2762        // Take the name of first field column. The length is checked above.
2763        let left_field_col = left_context.field_columns.first().unwrap();
2764        let right_field_col = right_context.field_columns.first().unwrap();
2765
2766        // step 0: fill all columns in output schema
2767        let mut all_columns_set = left
2768            .schema()
2769            .fields()
2770            .iter()
2771            .chain(right.schema().fields().iter())
2772            .map(|field| field.name().clone())
2773            .collect::<HashSet<_>>();
2774        // remove time index column
2775        all_columns_set.remove(&left_time_index_column);
2776        all_columns_set.remove(&right_time_index_column);
2777        // remove field column in the right
2778        if left_field_col != right_field_col {
2779            all_columns_set.remove(right_field_col);
2780        }
2781        let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
2782        // sort to ensure the generated schema is not volatile
2783        all_columns.sort_unstable();
2784        // use left time index column name as the result time index column name
2785        all_columns.insert(0, left_time_index_column.clone());
2786
2787        // step 1: align schema using project, fill non-exist columns with null
2788        let left_proj_exprs = all_columns.iter().map(|col| {
2789            if tags_not_in_left.contains(col) {
2790                DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
2791            } else {
2792                DfExpr::Column(Column::new(None::<String>, col))
2793            }
2794        });
2795        let right_time_index_expr = DfExpr::Column(Column::new(
2796            right_qualifier.clone(),
2797            right_time_index_column,
2798        ))
2799        .alias(left_time_index_column.clone());
2800        // The field column in right side may not have qualifier (it may be removed by join operation),
2801        // so we need to find it from the schema.
2802        let right_qualifier_for_field = right
2803            .schema()
2804            .iter()
2805            .find(|(_, f)| f.name() == right_field_col)
2806            .map(|(q, _)| q)
2807            .context(ColumnNotFoundSnafu {
2808                col: right_field_col.to_string(),
2809            })?
2810            .cloned();
2811
2812        // `skip(1)` to skip the time index column
2813        let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
2814            // expr
2815            if col == left_field_col && left_field_col != right_field_col {
2816                // qualify field in right side if necessary to handle different field name
2817                DfExpr::Column(Column::new(
2818                    right_qualifier_for_field.clone(),
2819                    right_field_col,
2820                ))
2821            } else if tags_not_in_right.contains(col) {
2822                DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
2823            } else {
2824                DfExpr::Column(Column::new(None::<String>, col))
2825            }
2826        });
2827        let right_proj_exprs = [right_time_index_expr]
2828            .into_iter()
2829            .chain(right_proj_exprs_without_time_index);
2830
2831        let left_projected = LogicalPlanBuilder::from(left)
2832            .project(left_proj_exprs)
2833            .context(DataFusionPlanningSnafu)?
2834            .alias(left_qualifier_string.clone())
2835            .context(DataFusionPlanningSnafu)?
2836            .build()
2837            .context(DataFusionPlanningSnafu)?;
2838        let right_projected = LogicalPlanBuilder::from(right)
2839            .project(right_proj_exprs)
2840            .context(DataFusionPlanningSnafu)?
2841            .alias(right_qualifier_string.clone())
2842            .context(DataFusionPlanningSnafu)?
2843            .build()
2844            .context(DataFusionPlanningSnafu)?;
2845
2846        // step 2: compute match columns
2847        let mut match_columns = if let Some(modifier) = modifier
2848            && let Some(matching) = &modifier.matching
2849        {
2850            match matching {
2851                // keeps columns mentioned in `on`
2852                LabelModifier::Include(on) => on.labels.clone(),
2853                // removes columns memtioned in `ignoring`
2854                LabelModifier::Exclude(ignoring) => {
2855                    let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
2856                    all_tags.difference(&ignoring).cloned().collect()
2857                }
2858            }
2859        } else {
2860            all_tags.iter().cloned().collect()
2861        };
2862        // sort to ensure the generated plan is not volatile
2863        match_columns.sort_unstable();
2864        // step 3: build `UnionDistinctOn` plan
2865        let schema = left_projected.schema().clone();
2866        let union_distinct_on = UnionDistinctOn::new(
2867            left_projected,
2868            right_projected,
2869            match_columns,
2870            left_time_index_column.clone(),
2871            schema,
2872        );
2873        let result = LogicalPlan::Extension(Extension {
2874            node: Arc::new(union_distinct_on),
2875        });
2876
2877        // step 4: update context
2878        self.ctx.time_index_column = Some(left_time_index_column);
2879        self.ctx.tag_columns = all_tags.into_iter().collect();
2880        self.ctx.field_columns = vec![left_field_col.to_string()];
2881
2882        Ok(result)
2883    }
2884
2885    /// Build a projection that project and perform operation expr for every value columns.
2886    /// Non-value columns (tag and timestamp) will be preserved in the projection.
2887    ///
2888    /// # Side effect
2889    ///
2890    /// This function will update the value columns in the context. Those new column names
2891    /// don't contains qualifier.
2892    fn projection_for_each_field_column<F>(
2893        &mut self,
2894        input: LogicalPlan,
2895        name_to_expr: F,
2896    ) -> Result<LogicalPlan>
2897    where
2898        F: FnMut(&String) -> Result<DfExpr>,
2899    {
2900        let non_field_columns_iter = self
2901            .ctx
2902            .tag_columns
2903            .iter()
2904            .chain(self.ctx.time_index_column.iter())
2905            .map(|col| {
2906                Ok(DfExpr::Column(Column::new(
2907                    self.ctx.table_name.clone().map(TableReference::bare),
2908                    col,
2909                )))
2910            });
2911
2912        // build computation exprs
2913        let result_field_columns = self
2914            .ctx
2915            .field_columns
2916            .iter()
2917            .map(name_to_expr)
2918            .collect::<Result<Vec<_>>>()?;
2919
2920        // alias the computation exprs to remove qualifier
2921        self.ctx.field_columns = result_field_columns
2922            .iter()
2923            .map(|expr| expr.schema_name().to_string())
2924            .collect();
2925        let field_columns_iter = result_field_columns
2926            .into_iter()
2927            .zip(self.ctx.field_columns.iter())
2928            .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
2929
2930        // chain non-field columns (unchanged) and field columns (applied computation then alias)
2931        let project_fields = non_field_columns_iter
2932            .chain(field_columns_iter)
2933            .collect::<Result<Vec<_>>>()?;
2934
2935        LogicalPlanBuilder::from(input)
2936            .project(project_fields)
2937            .context(DataFusionPlanningSnafu)?
2938            .build()
2939            .context(DataFusionPlanningSnafu)
2940    }
2941
2942    /// Build a filter plan that filter on value column. Notice that only one value column
2943    /// is expected.
2944    fn filter_on_field_column<F>(
2945        &self,
2946        input: LogicalPlan,
2947        mut name_to_expr: F,
2948    ) -> Result<LogicalPlan>
2949    where
2950        F: FnMut(&String) -> Result<DfExpr>,
2951    {
2952        ensure!(
2953            self.ctx.field_columns.len() == 1,
2954            UnsupportedExprSnafu {
2955                name: "filter on multi-value input"
2956            }
2957        );
2958
2959        let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
2960
2961        LogicalPlanBuilder::from(input)
2962            .filter(field_column_filter)
2963            .context(DataFusionPlanningSnafu)?
2964            .build()
2965            .context(DataFusionPlanningSnafu)
2966    }
2967
2968    /// Generate an expr like `date_part("hour", <TIME_INDEX>)`. Caller should ensure the
2969    /// time index column in context is set
2970    fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
2971        let lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some(date_part.to_string())));
2972        let input_expr = datafusion::logical_expr::col(
2973            self.ctx
2974                .time_index_column
2975                .as_ref()
2976                // table name doesn't matters here
2977                .with_context(|| TimeIndexNotFoundSnafu {
2978                    table: "<doesn't matter>",
2979                })?,
2980        );
2981        let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2982            func: datafusion_functions::datetime::date_part(),
2983            args: vec![lit_expr, input_expr],
2984        });
2985        Ok(fn_expr)
2986    }
2987}
2988
2989#[derive(Default, Debug)]
2990struct FunctionArgs {
2991    input: Option<PromExpr>,
2992    literals: Vec<DfExpr>,
2993}
2994
2995#[derive(Debug, Clone)]
2996enum ScalarFunc {
2997    DataFusionBuiltin(Arc<ScalarUdfDef>),
2998    /// The UDF that is defined by Datafusion itself.
2999    DataFusionUdf(Arc<ScalarUdfDef>),
3000    Udf(Arc<ScalarUdfDef>),
3001    // todo(ruihang): maybe merge with Udf later
3002    /// UDF that require extra information like range length to be evaluated.
3003    /// The second argument is range length.
3004    ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3005    /// Func that doesn't require input, like `time()`.
3006    GeneratedExpr,
3007}
3008
3009#[cfg(test)]
3010mod test {
3011    use std::time::{Duration, UNIX_EPOCH};
3012
3013    use catalog::memory::MemoryCatalogManager;
3014    use catalog::RegisterTableRequest;
3015    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3016    use common_query::test_util::DummyDecoder;
3017    use datafusion::execution::SessionStateBuilder;
3018    use datatypes::prelude::ConcreteDataType;
3019    use datatypes::schema::{ColumnSchema, Schema};
3020    use promql_parser::label::Labels;
3021    use promql_parser::parser;
3022    use session::context::QueryContext;
3023    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3024    use table::test_util::EmptyTable;
3025
3026    use super::*;
3027
3028    fn build_session_state() -> SessionState {
3029        SessionStateBuilder::new().with_default_features().build()
3030    }
3031
3032    async fn build_test_table_provider(
3033        table_name_tuples: &[(String, String)],
3034        num_tag: usize,
3035        num_field: usize,
3036    ) -> DfTableSourceProvider {
3037        let catalog_list = MemoryCatalogManager::with_default_setup();
3038        for (schema_name, table_name) in table_name_tuples {
3039            let mut columns = vec![];
3040            for i in 0..num_tag {
3041                columns.push(ColumnSchema::new(
3042                    format!("tag_{i}"),
3043                    ConcreteDataType::string_datatype(),
3044                    false,
3045                ));
3046            }
3047            columns.push(
3048                ColumnSchema::new(
3049                    "timestamp".to_string(),
3050                    ConcreteDataType::timestamp_millisecond_datatype(),
3051                    false,
3052                )
3053                .with_time_index(true),
3054            );
3055            for i in 0..num_field {
3056                columns.push(ColumnSchema::new(
3057                    format!("field_{i}"),
3058                    ConcreteDataType::float64_datatype(),
3059                    true,
3060                ));
3061            }
3062            let schema = Arc::new(Schema::new(columns));
3063            let table_meta = TableMetaBuilder::empty()
3064                .schema(schema)
3065                .primary_key_indices((0..num_tag).collect())
3066                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3067                .next_column_id(1024)
3068                .build()
3069                .unwrap();
3070            let table_info = TableInfoBuilder::default()
3071                .name(table_name.to_string())
3072                .meta(table_meta)
3073                .build()
3074                .unwrap();
3075            let table = EmptyTable::from_table_info(&table_info);
3076
3077            assert!(catalog_list
3078                .register_table_sync(RegisterTableRequest {
3079                    catalog: DEFAULT_CATALOG_NAME.to_string(),
3080                    schema: schema_name.to_string(),
3081                    table_name: table_name.to_string(),
3082                    table_id: 1024,
3083                    table,
3084                })
3085                .is_ok());
3086        }
3087
3088        DfTableSourceProvider::new(
3089            catalog_list,
3090            false,
3091            QueryContext::arc(),
3092            DummyDecoder::arc(),
3093            false,
3094        )
3095    }
3096
3097    async fn build_test_table_provider_with_fields(
3098        table_name_tuples: &[(String, String)],
3099        tags: &[&str],
3100    ) -> DfTableSourceProvider {
3101        let catalog_list = MemoryCatalogManager::with_default_setup();
3102        for (schema_name, table_name) in table_name_tuples {
3103            let mut columns = vec![];
3104            let num_tag = tags.len();
3105            for tag in tags {
3106                columns.push(ColumnSchema::new(
3107                    tag.to_string(),
3108                    ConcreteDataType::string_datatype(),
3109                    false,
3110                ));
3111            }
3112            columns.push(
3113                ColumnSchema::new(
3114                    "greptime_timestamp".to_string(),
3115                    ConcreteDataType::timestamp_millisecond_datatype(),
3116                    false,
3117                )
3118                .with_time_index(true),
3119            );
3120            columns.push(ColumnSchema::new(
3121                "greptime_value".to_string(),
3122                ConcreteDataType::float64_datatype(),
3123                true,
3124            ));
3125            let schema = Arc::new(Schema::new(columns));
3126            let table_meta = TableMetaBuilder::empty()
3127                .schema(schema)
3128                .primary_key_indices((0..num_tag).collect())
3129                .next_column_id(1024)
3130                .build()
3131                .unwrap();
3132            let table_info = TableInfoBuilder::default()
3133                .name(table_name.to_string())
3134                .meta(table_meta)
3135                .build()
3136                .unwrap();
3137            let table = EmptyTable::from_table_info(&table_info);
3138
3139            assert!(catalog_list
3140                .register_table_sync(RegisterTableRequest {
3141                    catalog: DEFAULT_CATALOG_NAME.to_string(),
3142                    schema: schema_name.to_string(),
3143                    table_name: table_name.to_string(),
3144                    table_id: 1024,
3145                    table,
3146                })
3147                .is_ok());
3148        }
3149
3150        DfTableSourceProvider::new(
3151            catalog_list,
3152            false,
3153            QueryContext::arc(),
3154            DummyDecoder::arc(),
3155            false,
3156        )
3157    }
3158
3159    // {
3160    //     input: `abs(some_metric{foo!="bar"})`,
3161    //     expected: &Call{
3162    //         Func: MustGetFunction("abs"),
3163    //         Args: Expressions{
3164    //             &VectorSelector{
3165    //                 Name: "some_metric",
3166    //                 LabelMatchers: []*labels.Matcher{
3167    //                     MustLabelMatcher(labels.MatchNotEqual, "foo", "bar"),
3168    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3169    //                 },
3170    //             },
3171    //         },
3172    //     },
3173    // },
3174    async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3175        let prom_expr =
3176            parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3177        let eval_stmt = EvalStmt {
3178            expr: prom_expr,
3179            start: UNIX_EPOCH,
3180            end: UNIX_EPOCH
3181                .checked_add(Duration::from_secs(100_000))
3182                .unwrap(),
3183            interval: Duration::from_secs(5),
3184            lookback_delta: Duration::from_secs(1),
3185        };
3186
3187        let table_provider = build_test_table_provider(
3188            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3189            1,
3190            1,
3191        )
3192        .await;
3193        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3194            .await
3195            .unwrap();
3196
3197        let expected = String::from(
3198            "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3199            \n  Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3200            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3201            \n      PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3202            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3203            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3204            \n            TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3205        ).replace("TEMPLATE", plan_name);
3206
3207        assert_eq!(plan.display_indent_schema().to_string(), expected);
3208    }
3209
3210    #[tokio::test]
3211    async fn single_abs() {
3212        do_single_instant_function_call("abs", "abs").await;
3213    }
3214
3215    #[tokio::test]
3216    #[should_panic]
3217    async fn single_absent() {
3218        do_single_instant_function_call("absent", "").await;
3219    }
3220
3221    #[tokio::test]
3222    async fn single_ceil() {
3223        do_single_instant_function_call("ceil", "ceil").await;
3224    }
3225
3226    #[tokio::test]
3227    async fn single_exp() {
3228        do_single_instant_function_call("exp", "exp").await;
3229    }
3230
3231    #[tokio::test]
3232    async fn single_ln() {
3233        do_single_instant_function_call("ln", "ln").await;
3234    }
3235
3236    #[tokio::test]
3237    async fn single_log2() {
3238        do_single_instant_function_call("log2", "log2").await;
3239    }
3240
3241    #[tokio::test]
3242    async fn single_log10() {
3243        do_single_instant_function_call("log10", "log10").await;
3244    }
3245
3246    #[tokio::test]
3247    #[should_panic]
3248    async fn single_scalar() {
3249        do_single_instant_function_call("scalar", "").await;
3250    }
3251
3252    #[tokio::test]
3253    #[should_panic]
3254    async fn single_sgn() {
3255        do_single_instant_function_call("sgn", "").await;
3256    }
3257
3258    #[tokio::test]
3259    #[should_panic]
3260    async fn single_sort() {
3261        do_single_instant_function_call("sort", "").await;
3262    }
3263
3264    #[tokio::test]
3265    #[should_panic]
3266    async fn single_sort_desc() {
3267        do_single_instant_function_call("sort_desc", "").await;
3268    }
3269
3270    #[tokio::test]
3271    async fn single_sqrt() {
3272        do_single_instant_function_call("sqrt", "sqrt").await;
3273    }
3274
3275    #[tokio::test]
3276    #[should_panic]
3277    async fn single_timestamp() {
3278        do_single_instant_function_call("timestamp", "").await;
3279    }
3280
3281    #[tokio::test]
3282    async fn single_acos() {
3283        do_single_instant_function_call("acos", "acos").await;
3284    }
3285
3286    #[tokio::test]
3287    #[should_panic]
3288    async fn single_acosh() {
3289        do_single_instant_function_call("acosh", "").await;
3290    }
3291
3292    #[tokio::test]
3293    async fn single_asin() {
3294        do_single_instant_function_call("asin", "asin").await;
3295    }
3296
3297    #[tokio::test]
3298    #[should_panic]
3299    async fn single_asinh() {
3300        do_single_instant_function_call("asinh", "").await;
3301    }
3302
3303    #[tokio::test]
3304    async fn single_atan() {
3305        do_single_instant_function_call("atan", "atan").await;
3306    }
3307
3308    #[tokio::test]
3309    #[should_panic]
3310    async fn single_atanh() {
3311        do_single_instant_function_call("atanh", "").await;
3312    }
3313
3314    #[tokio::test]
3315    async fn single_cos() {
3316        do_single_instant_function_call("cos", "cos").await;
3317    }
3318
3319    #[tokio::test]
3320    #[should_panic]
3321    async fn single_cosh() {
3322        do_single_instant_function_call("cosh", "").await;
3323    }
3324
3325    #[tokio::test]
3326    async fn single_sin() {
3327        do_single_instant_function_call("sin", "sin").await;
3328    }
3329
3330    #[tokio::test]
3331    #[should_panic]
3332    async fn single_sinh() {
3333        do_single_instant_function_call("sinh", "").await;
3334    }
3335
3336    #[tokio::test]
3337    async fn single_tan() {
3338        do_single_instant_function_call("tan", "tan").await;
3339    }
3340
3341    #[tokio::test]
3342    #[should_panic]
3343    async fn single_tanh() {
3344        do_single_instant_function_call("tanh", "").await;
3345    }
3346
3347    #[tokio::test]
3348    #[should_panic]
3349    async fn single_deg() {
3350        do_single_instant_function_call("deg", "").await;
3351    }
3352
3353    #[tokio::test]
3354    #[should_panic]
3355    async fn single_rad() {
3356        do_single_instant_function_call("rad", "").await;
3357    }
3358
3359    // {
3360    //     input: "avg by (foo)(some_metric)",
3361    //     expected: &AggregateExpr{
3362    //         Op: AVG,
3363    //         Expr: &VectorSelector{
3364    //             Name: "some_metric",
3365    //             LabelMatchers: []*labels.Matcher{
3366    //                 MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3367    //             },
3368    //             PosRange: PositionRange{
3369    //                 Start: 13,
3370    //                 End:   24,
3371    //             },
3372    //         },
3373    //         Grouping: []string{"foo"},
3374    //         PosRange: PositionRange{
3375    //             Start: 0,
3376    //             End:   25,
3377    //         },
3378    //     },
3379    // },
3380    async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3381        let prom_expr = parser::parse(&format!(
3382            "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3383        ))
3384        .unwrap();
3385        let mut eval_stmt = EvalStmt {
3386            expr: prom_expr,
3387            start: UNIX_EPOCH,
3388            end: UNIX_EPOCH
3389                .checked_add(Duration::from_secs(100_000))
3390                .unwrap(),
3391            interval: Duration::from_secs(5),
3392            lookback_delta: Duration::from_secs(1),
3393        };
3394
3395        // test group by
3396        let table_provider = build_test_table_provider(
3397            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3398            2,
3399            2,
3400        )
3401        .await;
3402        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3403            .await
3404            .unwrap();
3405        let expected_no_without = String::from(
3406            "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3407            \n  Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3408            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3409            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3410            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3411            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3412            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3413        ).replace("TEMPLATE", plan_name);
3414        assert_eq!(
3415            plan.display_indent_schema().to_string(),
3416            expected_no_without
3417        );
3418
3419        // test group without
3420        if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3421            *modifier = Some(LabelModifier::Exclude(Labels {
3422                labels: vec![String::from("tag_1")].into_iter().collect(),
3423            }));
3424        }
3425        let table_provider = build_test_table_provider(
3426            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3427            2,
3428            2,
3429        )
3430        .await;
3431        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3432            .await
3433            .unwrap();
3434        let expected_without = String::from(
3435            "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3436            \n  Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3437            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3438            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3439            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3440            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3441            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3442        ).replace("TEMPLATE", plan_name);
3443        assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3444    }
3445
3446    #[tokio::test]
3447    async fn aggregate_sum() {
3448        do_aggregate_expr_plan("sum", "sum").await;
3449    }
3450
3451    #[tokio::test]
3452    async fn aggregate_avg() {
3453        do_aggregate_expr_plan("avg", "avg").await;
3454    }
3455
3456    #[tokio::test]
3457    #[should_panic] // output type doesn't match
3458    async fn aggregate_count() {
3459        do_aggregate_expr_plan("count", "count").await;
3460    }
3461
3462    #[tokio::test]
3463    async fn aggregate_min() {
3464        do_aggregate_expr_plan("min", "min").await;
3465    }
3466
3467    #[tokio::test]
3468    async fn aggregate_max() {
3469        do_aggregate_expr_plan("max", "max").await;
3470    }
3471
3472    #[tokio::test]
3473    #[should_panic] // output type doesn't match
3474    async fn aggregate_group() {
3475        do_aggregate_expr_plan("grouping", "GROUPING").await;
3476    }
3477
3478    #[tokio::test]
3479    async fn aggregate_stddev() {
3480        do_aggregate_expr_plan("stddev", "stddev_pop").await;
3481    }
3482
3483    #[tokio::test]
3484    async fn aggregate_stdvar() {
3485        do_aggregate_expr_plan("stdvar", "var_pop").await;
3486    }
3487
3488    // TODO(ruihang): add range fn tests once exprs are ready.
3489
3490    // {
3491    //     input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}",
3492    //     expected: &BinaryExpr{
3493    //         Op: ADD,
3494    //         LHS: &VectorSelector{
3495    //             Name: "a",
3496    //             LabelMatchers: []*labels.Matcher{
3497    //                     MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"),
3498    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3499    //             },
3500    //         },
3501    //         RHS: &VectorSelector{
3502    //             Name: "sum",
3503    //             LabelMatchers: []*labels.Matcher{
3504    //                     MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"),
3505    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3506    //             },
3507    //         },
3508    //         VectorMatching: &VectorMatching{},
3509    //     },
3510    // },
3511    #[tokio::test]
3512    async fn binary_op_column_column() {
3513        let prom_expr =
3514            parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3515        let eval_stmt = EvalStmt {
3516            expr: prom_expr,
3517            start: UNIX_EPOCH,
3518            end: UNIX_EPOCH
3519                .checked_add(Duration::from_secs(100_000))
3520                .unwrap(),
3521            interval: Duration::from_secs(5),
3522            lookback_delta: Duration::from_secs(1),
3523        };
3524
3525        let table_provider = build_test_table_provider(
3526            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3527            1,
3528            1,
3529        )
3530        .await;
3531        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3532            .await
3533            .unwrap();
3534
3535        let  expected = String::from(
3536            "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3537            \n  Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3538            \n    SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3539            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3540            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3541            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3542            \n            Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3543            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3544            \n    SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3545            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3546            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3547            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3548            \n            Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3549            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3550        );
3551
3552        assert_eq!(plan.display_indent_schema().to_string(), expected);
3553    }
3554
3555    async fn indie_query_plan_compare(query: &str, expected: String) {
3556        let prom_expr = parser::parse(query).unwrap();
3557        let eval_stmt = EvalStmt {
3558            expr: prom_expr,
3559            start: UNIX_EPOCH,
3560            end: UNIX_EPOCH
3561                .checked_add(Duration::from_secs(100_000))
3562                .unwrap(),
3563            interval: Duration::from_secs(5),
3564            lookback_delta: Duration::from_secs(1),
3565        };
3566
3567        let table_provider = build_test_table_provider(
3568            &[
3569                (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
3570                (
3571                    "greptime_private".to_string(),
3572                    "some_alt_metric".to_string(),
3573                ),
3574            ],
3575            1,
3576            1,
3577        )
3578        .await;
3579        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3580            .await
3581            .unwrap();
3582
3583        assert_eq!(plan.display_indent_schema().to_string(), expected);
3584    }
3585
3586    #[tokio::test]
3587    async fn binary_op_literal_column() {
3588        let query = r#"1 + some_metric{tag_0="bar"}"#;
3589        let expected = String::from(
3590            "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
3591            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3592            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3593            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3594            \n        Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3595            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3596        );
3597
3598        indie_query_plan_compare(query, expected).await;
3599    }
3600
3601    #[tokio::test]
3602    async fn binary_op_literal_literal() {
3603        let query = r#"1 + 1"#;
3604        let expected = String::from("EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]");
3605
3606        indie_query_plan_compare(query, expected).await;
3607    }
3608
3609    #[tokio::test]
3610    async fn simple_bool_grammar() {
3611        let query = "some_metric != bool 1.2345";
3612        let expected = String::from(
3613            "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
3614            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3615            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3616            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3617            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3618            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3619        );
3620
3621        indie_query_plan_compare(query, expected).await;
3622    }
3623
3624    #[tokio::test]
3625    async fn bool_with_additional_arithmetic() {
3626        let query = "some_metric + (1 == bool 2)";
3627        let expected = String::from(
3628            "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
3629            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3630            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3631            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3632            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3633            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3634        );
3635
3636        indie_query_plan_compare(query, expected).await;
3637    }
3638
3639    #[tokio::test]
3640    async fn simple_unary() {
3641        let query = "-some_metric";
3642        let expected = String::from(
3643            "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
3644            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3645            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3646            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3647            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3648            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3649        );
3650
3651        indie_query_plan_compare(query, expected).await;
3652    }
3653
3654    #[tokio::test]
3655    async fn increase_aggr() {
3656        let query = "increase(some_metric[5m])";
3657        let expected = String::from(
3658            "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
3659            \n  Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
3660            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
3661            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3662            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3663            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3664            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3665            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3666        );
3667
3668        indie_query_plan_compare(query, expected).await;
3669    }
3670
3671    #[tokio::test]
3672    async fn less_filter_on_value() {
3673        let query = "some_metric < 1.2345";
3674        let expected = String::from(
3675            "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3676            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3677            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3678            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3679            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3680            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3681        );
3682
3683        indie_query_plan_compare(query, expected).await;
3684    }
3685
3686    #[tokio::test]
3687    async fn count_over_time() {
3688        let query = "count_over_time(some_metric[5m])";
3689        let expected = String::from(
3690            "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
3691            \n  Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
3692            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
3693            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3694            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3695            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3696            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3697            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3698        );
3699
3700        indie_query_plan_compare(query, expected).await;
3701    }
3702
3703    #[tokio::test]
3704    async fn test_hash_join() {
3705        let mut eval_stmt = EvalStmt {
3706            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3707            start: UNIX_EPOCH,
3708            end: UNIX_EPOCH
3709                .checked_add(Duration::from_secs(100_000))
3710                .unwrap(),
3711            interval: Duration::from_secs(5),
3712            lookback_delta: Duration::from_secs(1),
3713        };
3714
3715        let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
3716
3717        let prom_expr = parser::parse(case).unwrap();
3718        eval_stmt.expr = prom_expr;
3719        let table_provider = build_test_table_provider_with_fields(
3720            &[
3721                (
3722                    DEFAULT_SCHEMA_NAME.to_string(),
3723                    "http_server_requests_seconds_sum".to_string(),
3724                ),
3725                (
3726                    DEFAULT_SCHEMA_NAME.to_string(),
3727                    "http_server_requests_seconds_count".to_string(),
3728                ),
3729            ],
3730            &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
3731        )
3732        .await;
3733        // Should be ok
3734        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3735            .await
3736            .unwrap();
3737        let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
3738            \n  Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
3739            \n    SubqueryAlias: http_server_requests_seconds_sum\
3740            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
3741            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
3742            \n          Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
3743            \n            Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
3744            \n              TableScan: http_server_requests_seconds_sum\
3745            \n    SubqueryAlias: http_server_requests_seconds_count\
3746            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
3747            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
3748            \n          Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
3749            \n            Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
3750            \n              TableScan: http_server_requests_seconds_count";
3751        assert_eq!(plan.to_string(), expected);
3752    }
3753
3754    #[tokio::test]
3755    async fn test_nested_histogram_quantile() {
3756        let mut eval_stmt = EvalStmt {
3757            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3758            start: UNIX_EPOCH,
3759            end: UNIX_EPOCH
3760                .checked_add(Duration::from_secs(100_000))
3761                .unwrap(),
3762            interval: Duration::from_secs(5),
3763            lookback_delta: Duration::from_secs(1),
3764        };
3765
3766        let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
3767
3768        let prom_expr = parser::parse(case).unwrap();
3769        eval_stmt.expr = prom_expr;
3770        let table_provider = build_test_table_provider_with_fields(
3771            &[(
3772                DEFAULT_SCHEMA_NAME.to_string(),
3773                "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
3774            )],
3775            &["pod", "le", "path", "code", "container"],
3776        )
3777        .await;
3778        // Should be ok
3779        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3780            .await
3781            .unwrap();
3782    }
3783
3784    #[tokio::test]
3785    async fn test_parse_and_operator() {
3786        let mut eval_stmt = EvalStmt {
3787            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3788            start: UNIX_EPOCH,
3789            end: UNIX_EPOCH
3790                .checked_add(Duration::from_secs(100_000))
3791                .unwrap(),
3792            interval: Duration::from_secs(5),
3793            lookback_delta: Duration::from_secs(1),
3794        };
3795
3796        let cases = [
3797            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
3798            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
3799        ];
3800
3801        for case in cases {
3802            let prom_expr = parser::parse(case).unwrap();
3803            eval_stmt.expr = prom_expr;
3804            let table_provider = build_test_table_provider_with_fields(
3805                &[
3806                    (
3807                        DEFAULT_SCHEMA_NAME.to_string(),
3808                        "kubelet_volume_stats_used_bytes".to_string(),
3809                    ),
3810                    (
3811                        DEFAULT_SCHEMA_NAME.to_string(),
3812                        "kubelet_volume_stats_capacity_bytes".to_string(),
3813                    ),
3814                ],
3815                &["namespace", "persistentvolumeclaim"],
3816            )
3817            .await;
3818            // Should be ok
3819            let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3820                .await
3821                .unwrap();
3822        }
3823    }
3824
3825    #[tokio::test]
3826    async fn test_nested_binary_op() {
3827        let mut eval_stmt = EvalStmt {
3828            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3829            start: UNIX_EPOCH,
3830            end: UNIX_EPOCH
3831                .checked_add(Duration::from_secs(100_000))
3832                .unwrap(),
3833            interval: Duration::from_secs(5),
3834            lookback_delta: Duration::from_secs(1),
3835        };
3836
3837        let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
3838        (
3839            sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
3840            or
3841            vector(0)
3842        )"#;
3843
3844        let prom_expr = parser::parse(case).unwrap();
3845        eval_stmt.expr = prom_expr;
3846        let table_provider = build_test_table_provider_with_fields(
3847            &[(
3848                DEFAULT_SCHEMA_NAME.to_string(),
3849                "nginx_ingress_controller_requests".to_string(),
3850            )],
3851            &["namespace", "job"],
3852        )
3853        .await;
3854        // Should be ok
3855        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3856            .await
3857            .unwrap();
3858    }
3859
3860    #[tokio::test]
3861    async fn test_parse_or_operator() {
3862        let mut eval_stmt = EvalStmt {
3863            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3864            start: UNIX_EPOCH,
3865            end: UNIX_EPOCH
3866                .checked_add(Duration::from_secs(100_000))
3867                .unwrap(),
3868            interval: Duration::from_secs(5),
3869            lookback_delta: Duration::from_secs(1),
3870        };
3871
3872        let case = r#"
3873        sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
3874        (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
3875            or
3876        200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
3877        sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
3878
3879        let table_provider = build_test_table_provider_with_fields(
3880            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
3881            &["tenant_name", "cluster_name"],
3882        )
3883        .await;
3884        eval_stmt.expr = parser::parse(case).unwrap();
3885        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3886            .await
3887            .unwrap();
3888
3889        let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3890            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
3891            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3892            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
3893            or
3894            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3895            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
3896            or
3897            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
3898            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
3899        let table_provider = build_test_table_provider_with_fields(
3900            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
3901            &["tenant_name", "cluster_name"],
3902        )
3903        .await;
3904        eval_stmt.expr = parser::parse(case).unwrap();
3905        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3906            .await
3907            .unwrap();
3908
3909        let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
3910            sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
3911            (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
3912            (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
3913        let table_provider = build_test_table_provider_with_fields(
3914            &[
3915                (
3916                    DEFAULT_SCHEMA_NAME.to_string(),
3917                    "background_waitevent_cnt".to_string(),
3918                ),
3919                (
3920                    DEFAULT_SCHEMA_NAME.to_string(),
3921                    "foreground_waitevent_cnt".to_string(),
3922                ),
3923            ],
3924            &["tenant_name", "cluster_name"],
3925        )
3926        .await;
3927        eval_stmt.expr = parser::parse(case).unwrap();
3928        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3929            .await
3930            .unwrap();
3931
3932        let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
3933        let table_provider = build_test_table_provider_with_fields(
3934            &[
3935                (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
3936                (
3937                    DEFAULT_SCHEMA_NAME.to_string(),
3938                    "container_cpu_load_average_10s".to_string(),
3939                ),
3940                (
3941                    DEFAULT_SCHEMA_NAME.to_string(),
3942                    "container_spec_cpu_quota".to_string(),
3943                ),
3944            ],
3945            &["cluster_name", "host_name"],
3946        )
3947        .await;
3948        eval_stmt.expr = parser::parse(case).unwrap();
3949        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3950            .await
3951            .unwrap();
3952    }
3953
3954    #[tokio::test]
3955    async fn value_matcher() {
3956        // template
3957        let mut eval_stmt = EvalStmt {
3958            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3959            start: UNIX_EPOCH,
3960            end: UNIX_EPOCH
3961                .checked_add(Duration::from_secs(100_000))
3962                .unwrap(),
3963            interval: Duration::from_secs(5),
3964            lookback_delta: Duration::from_secs(1),
3965        };
3966
3967        let cases = [
3968            // single equal matcher
3969            (
3970                r#"some_metric{__field__="field_1"}"#,
3971                vec![
3972                    "some_metric.field_1",
3973                    "some_metric.tag_0",
3974                    "some_metric.tag_1",
3975                    "some_metric.tag_2",
3976                    "some_metric.timestamp",
3977                ],
3978            ),
3979            // two equal matchers
3980            (
3981                r#"some_metric{__field__="field_1", __field__="field_0"}"#,
3982                vec![
3983                    "some_metric.field_0",
3984                    "some_metric.field_1",
3985                    "some_metric.tag_0",
3986                    "some_metric.tag_1",
3987                    "some_metric.tag_2",
3988                    "some_metric.timestamp",
3989                ],
3990            ),
3991            // single not_eq matcher
3992            (
3993                r#"some_metric{__field__!="field_1"}"#,
3994                vec![
3995                    "some_metric.field_0",
3996                    "some_metric.field_2",
3997                    "some_metric.tag_0",
3998                    "some_metric.tag_1",
3999                    "some_metric.tag_2",
4000                    "some_metric.timestamp",
4001                ],
4002            ),
4003            // two not_eq matchers
4004            (
4005                r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
4006                vec![
4007                    "some_metric.field_0",
4008                    "some_metric.tag_0",
4009                    "some_metric.tag_1",
4010                    "some_metric.tag_2",
4011                    "some_metric.timestamp",
4012                ],
4013            ),
4014            // equal and not_eq matchers (no conflict)
4015            (
4016                r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
4017                vec![
4018                    "some_metric.field_1",
4019                    "some_metric.tag_0",
4020                    "some_metric.tag_1",
4021                    "some_metric.tag_2",
4022                    "some_metric.timestamp",
4023                ],
4024            ),
4025            // equal and not_eq matchers (conflict)
4026            (
4027                r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
4028                vec![
4029                    "some_metric.tag_0",
4030                    "some_metric.tag_1",
4031                    "some_metric.tag_2",
4032                    "some_metric.timestamp",
4033                ],
4034            ),
4035            // single regex eq matcher
4036            (
4037                r#"some_metric{__field__=~"field_1|field_2"}"#,
4038                vec![
4039                    "some_metric.field_1",
4040                    "some_metric.field_2",
4041                    "some_metric.tag_0",
4042                    "some_metric.tag_1",
4043                    "some_metric.tag_2",
4044                    "some_metric.timestamp",
4045                ],
4046            ),
4047            // single regex not_eq matcher
4048            (
4049                r#"some_metric{__field__!~"field_1|field_2"}"#,
4050                vec![
4051                    "some_metric.field_0",
4052                    "some_metric.tag_0",
4053                    "some_metric.tag_1",
4054                    "some_metric.tag_2",
4055                    "some_metric.timestamp",
4056                ],
4057            ),
4058        ];
4059
4060        for case in cases {
4061            let prom_expr = parser::parse(case.0).unwrap();
4062            eval_stmt.expr = prom_expr;
4063            let table_provider = build_test_table_provider(
4064                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4065                3,
4066                3,
4067            )
4068            .await;
4069            let plan =
4070                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4071                    .await
4072                    .unwrap();
4073            let mut fields = plan.schema().field_names();
4074            let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
4075            fields.sort();
4076            expected.sort();
4077            assert_eq!(fields, expected, "case: {:?}", case.0);
4078        }
4079
4080        let bad_cases = [
4081            r#"some_metric{__field__="nonexistent"}"#,
4082            r#"some_metric{__field__!="nonexistent"}"#,
4083        ];
4084
4085        for case in bad_cases {
4086            let prom_expr = parser::parse(case).unwrap();
4087            eval_stmt.expr = prom_expr;
4088            let table_provider = build_test_table_provider(
4089                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4090                3,
4091                3,
4092            )
4093            .await;
4094            let plan =
4095                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
4096            assert!(plan.is_err(), "case: {:?}", case);
4097        }
4098    }
4099
4100    #[tokio::test]
4101    async fn custom_schema() {
4102        let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4103        let expected = String::from(
4104            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4105            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4106            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4107            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4108            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4109        );
4110
4111        indie_query_plan_compare(query, expected).await;
4112
4113        let query = "some_alt_metric{__database__=\"greptime_private\"}";
4114        let expected = String::from(
4115            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4116            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4117            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4118            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4119            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4120        );
4121
4122        indie_query_plan_compare(query, expected).await;
4123
4124        let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4125        let expected = String::from("Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4126        \n  Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4127        \n    SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4128        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4129        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4130        \n          Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4131        \n            Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4132        \n              TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4133        \n    SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4134        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4135        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4136        \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4137        \n            Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4138        \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]");
4139
4140        indie_query_plan_compare(query, expected).await;
4141    }
4142
4143    #[tokio::test]
4144    async fn only_equals_is_supported_for_special_matcher() {
4145        let queries = &[
4146            "some_alt_metric{__schema__!=\"greptime_private\"}",
4147            "some_alt_metric{__schema__=~\"lalala\"}",
4148            "some_alt_metric{__database__!=\"greptime_private\"}",
4149            "some_alt_metric{__database__=~\"lalala\"}",
4150        ];
4151
4152        for query in queries {
4153            let prom_expr = parser::parse(query).unwrap();
4154            let eval_stmt = EvalStmt {
4155                expr: prom_expr,
4156                start: UNIX_EPOCH,
4157                end: UNIX_EPOCH
4158                    .checked_add(Duration::from_secs(100_000))
4159                    .unwrap(),
4160                interval: Duration::from_secs(5),
4161                lookback_delta: Duration::from_secs(1),
4162            };
4163
4164            let table_provider = build_test_table_provider(
4165                &[
4166                    (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4167                    (
4168                        "greptime_private".to_string(),
4169                        "some_alt_metric".to_string(),
4170                    ),
4171                ],
4172                1,
4173                1,
4174            )
4175            .await;
4176
4177            let plan =
4178                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
4179            assert!(plan.is_err(), "query: {:?}", query);
4180        }
4181    }
4182
4183    #[tokio::test]
4184    async fn test_non_ms_precision() {
4185        let catalog_list = MemoryCatalogManager::with_default_setup();
4186        let columns = vec![
4187            ColumnSchema::new(
4188                "tag".to_string(),
4189                ConcreteDataType::string_datatype(),
4190                false,
4191            ),
4192            ColumnSchema::new(
4193                "timestamp".to_string(),
4194                ConcreteDataType::timestamp_nanosecond_datatype(),
4195                false,
4196            )
4197            .with_time_index(true),
4198            ColumnSchema::new(
4199                "field".to_string(),
4200                ConcreteDataType::float64_datatype(),
4201                true,
4202            ),
4203        ];
4204        let schema = Arc::new(Schema::new(columns));
4205        let table_meta = TableMetaBuilder::empty()
4206            .schema(schema)
4207            .primary_key_indices(vec![0])
4208            .value_indices(vec![2])
4209            .next_column_id(1024)
4210            .build()
4211            .unwrap();
4212        let table_info = TableInfoBuilder::default()
4213            .name("metrics".to_string())
4214            .meta(table_meta)
4215            .build()
4216            .unwrap();
4217        let table = EmptyTable::from_table_info(&table_info);
4218        assert!(catalog_list
4219            .register_table_sync(RegisterTableRequest {
4220                catalog: DEFAULT_CATALOG_NAME.to_string(),
4221                schema: DEFAULT_SCHEMA_NAME.to_string(),
4222                table_name: "metrics".to_string(),
4223                table_id: 1024,
4224                table,
4225            })
4226            .is_ok());
4227
4228        let plan = PromPlanner::stmt_to_plan(
4229            DfTableSourceProvider::new(
4230                catalog_list.clone(),
4231                false,
4232                QueryContext::arc(),
4233                DummyDecoder::arc(),
4234                true,
4235            ),
4236            &EvalStmt {
4237                expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4238                start: UNIX_EPOCH,
4239                end: UNIX_EPOCH
4240                    .checked_add(Duration::from_secs(100_000))
4241                    .unwrap(),
4242                interval: Duration::from_secs(5),
4243                lookback_delta: Duration::from_secs(1),
4244            },
4245            &build_session_state(),
4246        )
4247        .await
4248        .unwrap();
4249        assert_eq!(plan.display_indent_schema().to_string(),
4250        "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4251        \n  PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4252        \n    Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4253        \n      Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4254        \n        Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4255        \n          TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4256        );
4257        let plan = PromPlanner::stmt_to_plan(
4258            DfTableSourceProvider::new(
4259                catalog_list.clone(),
4260                false,
4261                QueryContext::arc(),
4262                DummyDecoder::arc(),
4263                true,
4264            ),
4265            &EvalStmt {
4266                expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4267                start: UNIX_EPOCH,
4268                end: UNIX_EPOCH
4269                    .checked_add(Duration::from_secs(100_000))
4270                    .unwrap(),
4271                interval: Duration::from_secs(5),
4272                lookback_delta: Duration::from_secs(1),
4273            },
4274            &build_session_state(),
4275        )
4276        .await
4277        .unwrap();
4278        assert_eq!(plan.display_indent_schema().to_string(),
4279        "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4280        \n  Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4281        \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4282        \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4283        \n        PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4284        \n          Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4285        \n            Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4286        \n              Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4287        \n                TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4288        );
4289    }
4290
4291    #[tokio::test]
4292    async fn test_nonexistent_label() {
4293        // template
4294        let mut eval_stmt = EvalStmt {
4295            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4296            start: UNIX_EPOCH,
4297            end: UNIX_EPOCH
4298                .checked_add(Duration::from_secs(100_000))
4299                .unwrap(),
4300            interval: Duration::from_secs(5),
4301            lookback_delta: Duration::from_secs(1),
4302        };
4303
4304        let case = r#"some_metric{nonexistent="hi"}"#;
4305        let prom_expr = parser::parse(case).unwrap();
4306        eval_stmt.expr = prom_expr;
4307        let table_provider = build_test_table_provider(
4308            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4309            3,
4310            3,
4311        )
4312        .await;
4313        // Should be ok
4314        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4315            .await
4316            .unwrap();
4317    }
4318
4319    #[tokio::test]
4320    async fn test_label_join() {
4321        let prom_expr = parser::parse(
4322            "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4323        )
4324        .unwrap();
4325        let eval_stmt = EvalStmt {
4326            expr: prom_expr,
4327            start: UNIX_EPOCH,
4328            end: UNIX_EPOCH
4329                .checked_add(Duration::from_secs(100_000))
4330                .unwrap(),
4331            interval: Duration::from_secs(5),
4332            lookback_delta: Duration::from_secs(1),
4333        };
4334
4335        let table_provider =
4336            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4337                .await;
4338        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4339            .await
4340            .unwrap();
4341
4342        let expected = "Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]\
4343        \n  Projection: up.timestamp, up.field_0 AS field_0, concat_ws(Utf8(\",\"), up.tag_1, up.tag_2, up.tag_3) AS foo AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]\
4344        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4345        \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\", \"tag_3\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4346        \n        Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4347        \n          Filter: up.tag_0 = Utf8(\"api-server\") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4348        \n            TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]";
4349
4350        assert_eq!(plan.display_indent_schema().to_string(), expected);
4351    }
4352
4353    #[tokio::test]
4354    async fn test_label_replace() {
4355        let prom_expr = parser::parse(
4356            "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4357        )
4358        .unwrap();
4359        let eval_stmt = EvalStmt {
4360            expr: prom_expr,
4361            start: UNIX_EPOCH,
4362            end: UNIX_EPOCH
4363                .checked_add(Duration::from_secs(100_000))
4364                .unwrap(),
4365            interval: Duration::from_secs(5),
4366            lookback_delta: Duration::from_secs(1),
4367        };
4368
4369        let table_provider =
4370            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4371                .await;
4372        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4373            .await
4374            .unwrap();
4375
4376        let expected = "Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]\
4377        \n  Projection: up.timestamp, up.field_0 AS field_0, regexp_replace(up.tag_0, Utf8(\"(.*):.*\"), Utf8(\"$1\")) AS foo AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]\
4378        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4379        \n      PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4380        \n        Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4381        \n          Filter: up.tag_0 = Utf8(\"a:c\") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4382        \n            TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]";
4383
4384        assert_eq!(plan.display_indent_schema().to_string(), expected);
4385    }
4386
4387    #[tokio::test]
4388    async fn test_matchers_to_expr() {
4389        let mut eval_stmt = EvalStmt {
4390            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4391            start: UNIX_EPOCH,
4392            end: UNIX_EPOCH
4393                .checked_add(Duration::from_secs(100_000))
4394                .unwrap(),
4395            interval: Duration::from_secs(5),
4396            lookback_delta: Duration::from_secs(1),
4397        };
4398        let case =
4399            r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4400
4401        let prom_expr = parser::parse(case).unwrap();
4402        eval_stmt.expr = prom_expr;
4403        let table_provider = build_test_table_provider(
4404            &[(
4405                DEFAULT_SCHEMA_NAME.to_string(),
4406                "prometheus_tsdb_head_series".to_string(),
4407            )],
4408            3,
4409            3,
4410        )
4411        .await;
4412        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4413            .await
4414            .unwrap();
4415        let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4416        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4417        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4418        \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4419        \n        Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4420        \n          Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4421        \n            TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4422        assert_eq!(plan.display_indent_schema().to_string(), expected);
4423    }
4424
4425    #[tokio::test]
4426    async fn test_topk_expr() {
4427        let mut eval_stmt = EvalStmt {
4428            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4429            start: UNIX_EPOCH,
4430            end: UNIX_EPOCH
4431                .checked_add(Duration::from_secs(100_000))
4432                .unwrap(),
4433            interval: Duration::from_secs(5),
4434            lookback_delta: Duration::from_secs(1),
4435        };
4436        let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4437
4438        let prom_expr = parser::parse(case).unwrap();
4439        eval_stmt.expr = prom_expr;
4440        let table_provider = build_test_table_provider_with_fields(
4441            &[
4442                (
4443                    DEFAULT_SCHEMA_NAME.to_string(),
4444                    "prometheus_tsdb_head_series".to_string(),
4445                ),
4446                (
4447                    DEFAULT_SCHEMA_NAME.to_string(),
4448                    "http_server_requests_seconds_count".to_string(),
4449                ),
4450            ],
4451            &["ip"],
4452        )
4453        .await;
4454
4455        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4456            .await
4457            .unwrap();
4458        let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4459        \n  Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4460        \n    Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4461        \n      WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4462        \n        Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4463        \n          Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4464        \n            PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4465        \n              PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4466        \n                Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4467        \n                  Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4468        \n                    TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4469
4470        assert_eq!(plan.display_indent_schema().to_string(), expected);
4471    }
4472
4473    #[tokio::test]
4474    async fn test_count_values_expr() {
4475        let mut eval_stmt = EvalStmt {
4476            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4477            start: UNIX_EPOCH,
4478            end: UNIX_EPOCH
4479                .checked_add(Duration::from_secs(100_000))
4480                .unwrap(),
4481            interval: Duration::from_secs(5),
4482            lookback_delta: Duration::from_secs(1),
4483        };
4484        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4485
4486        let prom_expr = parser::parse(case).unwrap();
4487        eval_stmt.expr = prom_expr;
4488        let table_provider = build_test_table_provider_with_fields(
4489            &[
4490                (
4491                    DEFAULT_SCHEMA_NAME.to_string(),
4492                    "prometheus_tsdb_head_series".to_string(),
4493                ),
4494                (
4495                    DEFAULT_SCHEMA_NAME.to_string(),
4496                    "http_server_requests_seconds_count".to_string(),
4497                ),
4498            ],
4499            &["ip"],
4500        )
4501        .await;
4502
4503        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4504            .await
4505            .unwrap();
4506        let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4507        \n  Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4508        \n    Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4509        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4510        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4511        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4512        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4513        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4514        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4515
4516        assert_eq!(plan.display_indent_schema().to_string(), expected);
4517    }
4518
4519    #[tokio::test]
4520    async fn test_quantile_expr() {
4521        let mut eval_stmt = EvalStmt {
4522            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4523            start: UNIX_EPOCH,
4524            end: UNIX_EPOCH
4525                .checked_add(Duration::from_secs(100_000))
4526                .unwrap(),
4527            interval: Duration::from_secs(5),
4528            lookback_delta: Duration::from_secs(1),
4529        };
4530        let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4531
4532        let prom_expr = parser::parse(case).unwrap();
4533        eval_stmt.expr = prom_expr;
4534        let table_provider = build_test_table_provider_with_fields(
4535            &[
4536                (
4537                    DEFAULT_SCHEMA_NAME.to_string(),
4538                    "prometheus_tsdb_head_series".to_string(),
4539                ),
4540                (
4541                    DEFAULT_SCHEMA_NAME.to_string(),
4542                    "http_server_requests_seconds_count".to_string(),
4543                ),
4544            ],
4545            &["ip"],
4546        )
4547        .await;
4548
4549        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4550            .await
4551            .unwrap();
4552        let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4553        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4554        \n    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4555        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4556        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4557        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4558        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4559        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4560        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4561
4562        assert_eq!(plan.display_indent_schema().to_string(), expected);
4563    }
4564
4565    #[tokio::test]
4566    async fn test_or_not_exists_table_label() {
4567        let mut eval_stmt = EvalStmt {
4568            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4569            start: UNIX_EPOCH,
4570            end: UNIX_EPOCH
4571                .checked_add(Duration::from_secs(100_000))
4572                .unwrap(),
4573            interval: Duration::from_secs(5),
4574            lookback_delta: Duration::from_secs(1),
4575        };
4576        let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
4577
4578        let prom_expr = parser::parse(case).unwrap();
4579        eval_stmt.expr = prom_expr;
4580        let table_provider = build_test_table_provider_with_fields(
4581            &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
4582            &["job"],
4583        )
4584        .await;
4585
4586        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4587            .await
4588            .unwrap();
4589        let expected = "UnionDistinctOn: on col=[[\"job\"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
4590        \n  SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
4591        \n    Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]\
4592        \n      Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]\
4593        \n        Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]\
4594        \n          PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4595        \n            PromSeriesDivide: tags=[\"job\"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4596        \n              Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4597        \n                Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4598        \n                  TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4599        \n  SubqueryAlias:  [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]\
4600        \n    Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]\
4601        \n      Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]\
4602        \n        Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]\
4603        \n          EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]";
4604
4605        assert_eq!(plan.display_indent_schema().to_string(), expected);
4606    }
4607}