query/promql/
planner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_query::prelude::GREPTIME_VALUE;
23use datafusion::common::DFSchemaRef;
24use datafusion::datasource::DefaultTableSource;
25use datafusion::execution::context::SessionState;
26use datafusion::functions_aggregate::average::avg_udaf;
27use datafusion::functions_aggregate::count::count_udaf;
28use datafusion::functions_aggregate::grouping::grouping_udaf;
29use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
30use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
31use datafusion::functions_aggregate::sum::sum_udaf;
32use datafusion::functions_aggregate::variance::var_pop_udaf;
33use datafusion::functions_window::row_number::RowNumber;
34use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
35use datafusion::logical_expr::expr_rewriter::normalize_cols;
36use datafusion::logical_expr::{
37    BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
38    ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
39};
40use datafusion::prelude as df_prelude;
41use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
42use datafusion::scalar::ScalarValue;
43use datafusion::sql::TableReference;
44use datafusion_expr::utils::conjunction;
45use datafusion_expr::{col, lit, SortExpr};
46use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
47use datatypes::data_type::ConcreteDataType;
48use itertools::Itertools;
49use promql::extension_plan::{
50    build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
51    RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
52};
53use promql::functions::{
54    quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters,
55    IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
56    QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
57};
58use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
59use promql_parser::parser::token::TokenType;
60use promql_parser::parser::{
61    token, AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt,
62    Expr as PromExpr, Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector,
63    NumberLiteral, Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr,
64    VectorMatchCardinality, VectorSelector,
65};
66use snafu::{ensure, OptionExt, ResultExt};
67use store_api::metric_engine_consts::{
68    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
69};
70use table::table::adapter::DfTableProviderAdapter;
71
72use crate::promql::error::{
73    CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
74    ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidTimeRangeSnafu,
75    MultiFieldsNotSupportedSnafu, MultipleMetricMatchersSnafu, MultipleVectorSnafu,
76    NoMetricMatcherSnafu, PromqlPlanNodeSnafu, Result, TableNameNotFoundSnafu,
77    TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
78    UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu,
79    ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
80};
81
82/// `time()` function in PromQL.
83const SPECIAL_TIME_FUNCTION: &str = "time";
84/// `scalar()` function in PromQL.
85const SCALAR_FUNCTION: &str = "scalar";
86/// `histogram_quantile` function in PromQL
87const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
88/// `vector` function in PromQL
89const SPECIAL_VECTOR_FUNCTION: &str = "vector";
90/// `le` column for conventional histogram.
91const LE_COLUMN_NAME: &str = "le";
92
93const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
94
95/// default value column name for empty metric
96const DEFAULT_FIELD_COLUMN: &str = "value";
97
98/// Special modifier to project field columns under multi-field mode
99const FIELD_COLUMN_MATCHER: &str = "__field__";
100
101/// Special modifier for cross schema query
102const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
103const DB_COLUMN_MATCHER: &str = "__database__";
104
105/// Threshold for scatter scan mode
106const MAX_SCATTER_POINTS: i64 = 400;
107
108/// Interval 1 hour in millisecond
109const INTERVAL_1H: i64 = 60 * 60 * 1000;
110
111#[derive(Default, Debug, Clone)]
112struct PromPlannerContext {
113    // query parameters
114    start: Millisecond,
115    end: Millisecond,
116    interval: Millisecond,
117    lookback_delta: Millisecond,
118
119    // planner states
120    table_name: Option<String>,
121    time_index_column: Option<String>,
122    field_columns: Vec<String>,
123    tag_columns: Vec<String>,
124    field_column_matcher: Option<Vec<Matcher>>,
125    schema_name: Option<String>,
126    /// The range in millisecond of range selector. None if there is no range selector.
127    range: Option<Millisecond>,
128}
129
130impl PromPlannerContext {
131    fn from_eval_stmt(stmt: &EvalStmt) -> Self {
132        Self {
133            start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
134            end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
135            interval: stmt.interval.as_millis() as _,
136            lookback_delta: stmt.lookback_delta.as_millis() as _,
137            ..Default::default()
138        }
139    }
140
141    /// Reset all planner states
142    fn reset(&mut self) {
143        self.table_name = None;
144        self.time_index_column = None;
145        self.field_columns = vec![];
146        self.tag_columns = vec![];
147        self.field_column_matcher = None;
148        self.schema_name = None;
149        self.range = None;
150    }
151
152    /// Reset table name and schema to empty
153    fn reset_table_name_and_schema(&mut self) {
154        self.table_name = Some(String::new());
155        self.schema_name = None;
156    }
157
158    /// Check if `le` is present in tag columns
159    fn has_le_tag(&self) -> bool {
160        self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
161    }
162}
163
164pub struct PromPlanner {
165    table_provider: DfTableSourceProvider,
166    ctx: PromPlannerContext,
167}
168
169/// Unescapes the value of the matcher
170pub fn normalize_matcher(mut matcher: Matcher) -> Matcher {
171    if let Ok(unescaped_value) = unescaper::unescape(&matcher.value) {
172        matcher.value = unescaped_value;
173    }
174    matcher
175}
176
177impl PromPlanner {
178    pub async fn stmt_to_plan(
179        table_provider: DfTableSourceProvider,
180        stmt: &EvalStmt,
181        session_state: &SessionState,
182    ) -> Result<LogicalPlan> {
183        let mut planner = Self {
184            table_provider,
185            ctx: PromPlannerContext::from_eval_stmt(stmt),
186        };
187
188        planner.prom_expr_to_plan(&stmt.expr, session_state).await
189    }
190
191    #[async_recursion]
192    pub async fn prom_expr_to_plan(
193        &mut self,
194        prom_expr: &PromExpr,
195        session_state: &SessionState,
196    ) -> Result<LogicalPlan> {
197        let res = match prom_expr {
198            PromExpr::Aggregate(expr) => self.prom_aggr_expr_to_plan(session_state, expr).await?,
199            PromExpr::Unary(expr) => self.prom_unary_expr_to_plan(session_state, expr).await?,
200            PromExpr::Binary(expr) => self.prom_binary_expr_to_plan(session_state, expr).await?,
201            PromExpr::Paren(ParenExpr { expr }) => {
202                self.prom_expr_to_plan(expr, session_state).await?
203            }
204            PromExpr::Subquery(expr) => {
205                self.prom_subquery_expr_to_plan(session_state, expr).await?
206            }
207            PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
208            PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
209            PromExpr::VectorSelector(selector) => {
210                self.prom_vector_selector_to_plan(selector).await?
211            }
212            PromExpr::MatrixSelector(selector) => {
213                self.prom_matrix_selector_to_plan(selector).await?
214            }
215            PromExpr::Call(expr) => self.prom_call_expr_to_plan(session_state, expr).await?,
216            PromExpr::Extension(expr) => self.prom_ext_expr_to_plan(session_state, expr).await?,
217        };
218        Ok(res)
219    }
220
221    async fn prom_subquery_expr_to_plan(
222        &mut self,
223        session_state: &SessionState,
224        subquery_expr: &SubqueryExpr,
225    ) -> Result<LogicalPlan> {
226        let SubqueryExpr {
227            expr, range, step, ..
228        } = subquery_expr;
229
230        let current_interval = self.ctx.interval;
231        if let Some(step) = step {
232            self.ctx.interval = step.as_millis() as _;
233        }
234        let current_start = self.ctx.start;
235        self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
236        let input = self.prom_expr_to_plan(expr, session_state).await?;
237        self.ctx.interval = current_interval;
238        self.ctx.start = current_start;
239
240        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
241        let range_ms = range.as_millis() as _;
242        self.ctx.range = Some(range_ms);
243
244        let manipulate = RangeManipulate::new(
245            self.ctx.start,
246            self.ctx.end,
247            self.ctx.interval,
248            range_ms,
249            self.ctx
250                .time_index_column
251                .clone()
252                .expect("time index should be set in `setup_context`"),
253            self.ctx.field_columns.clone(),
254            input,
255        )
256        .context(DataFusionPlanningSnafu)?;
257
258        Ok(LogicalPlan::Extension(Extension {
259            node: Arc::new(manipulate),
260        }))
261    }
262
263    async fn prom_aggr_expr_to_plan(
264        &mut self,
265        session_state: &SessionState,
266        aggr_expr: &AggregateExpr,
267    ) -> Result<LogicalPlan> {
268        let AggregateExpr {
269            op,
270            expr,
271            modifier,
272            param,
273        } = aggr_expr;
274
275        let input = self.prom_expr_to_plan(expr, session_state).await?;
276        match (*op).id() {
277            token::T_TOPK | token::T_BOTTOMK => {
278                self.prom_topk_bottomk_to_plan(aggr_expr, input).await
279            }
280            _ => {
281                // calculate columns to group by
282                // Need to append time index column into group by columns
283                let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
284                // convert op and value columns to aggregate exprs
285                let (aggr_exprs, prev_field_exprs) =
286                    self.create_aggregate_exprs(*op, param, &input)?;
287
288                // create plan
289                let builder = LogicalPlanBuilder::from(input);
290                let builder = if op.id() == token::T_COUNT_VALUES {
291                    let label = Self::get_param_value_as_str(*op, param)?;
292                    // `count_values` must be grouped by fields,
293                    // and project the fields to the new label.
294                    group_exprs.extend(prev_field_exprs.clone());
295                    let project_fields = self
296                        .create_field_column_exprs()?
297                        .into_iter()
298                        .chain(self.create_tag_column_exprs()?)
299                        .chain(Some(self.create_time_index_column_expr()?))
300                        .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
301
302                    builder
303                        .aggregate(group_exprs.clone(), aggr_exprs)
304                        .context(DataFusionPlanningSnafu)?
305                        .project(project_fields)
306                        .context(DataFusionPlanningSnafu)?
307                } else {
308                    builder
309                        .aggregate(group_exprs.clone(), aggr_exprs)
310                        .context(DataFusionPlanningSnafu)?
311                };
312
313                let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
314
315                builder
316                    .sort(sort_expr)
317                    .context(DataFusionPlanningSnafu)?
318                    .build()
319                    .context(DataFusionPlanningSnafu)
320            }
321        }
322    }
323
324    /// Create logical plan for PromQL topk and bottomk expr.
325    async fn prom_topk_bottomk_to_plan(
326        &mut self,
327        aggr_expr: &AggregateExpr,
328        input: LogicalPlan,
329    ) -> Result<LogicalPlan> {
330        let AggregateExpr {
331            op,
332            param,
333            modifier,
334            ..
335        } = aggr_expr;
336
337        let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
338
339        let val = Self::get_param_value_as_f64(*op, param)?;
340
341        // convert op and value columns to window exprs.
342        let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
343
344        let rank_columns: Vec<_> = window_exprs
345            .iter()
346            .map(|expr| expr.schema_name().to_string())
347            .collect();
348
349        // Create ranks filter with `Operator::Or`.
350        // Safety: at least one rank column
351        let filter: DfExpr = rank_columns
352            .iter()
353            .fold(None, |expr, rank| {
354                let predicate = DfExpr::BinaryExpr(BinaryExpr {
355                    left: Box::new(col(rank)),
356                    op: Operator::LtEq,
357                    right: Box::new(lit(val)),
358                });
359
360                match expr {
361                    None => Some(predicate),
362                    Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
363                        left: Box::new(expr),
364                        op: Operator::Or,
365                        right: Box::new(predicate),
366                    })),
367                }
368            })
369            .unwrap();
370
371        let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
372
373        let mut new_group_exprs = group_exprs.clone();
374        // Order by ranks
375        new_group_exprs.extend(rank_columns);
376
377        let group_sort_expr = new_group_exprs
378            .into_iter()
379            .map(|expr| expr.sort(true, false));
380
381        let project_fields = self
382            .create_field_column_exprs()?
383            .into_iter()
384            .chain(self.create_tag_column_exprs()?)
385            .chain(Some(self.create_time_index_column_expr()?));
386
387        LogicalPlanBuilder::from(input)
388            .window(window_exprs)
389            .context(DataFusionPlanningSnafu)?
390            .filter(filter)
391            .context(DataFusionPlanningSnafu)?
392            .sort(group_sort_expr)
393            .context(DataFusionPlanningSnafu)?
394            .project(project_fields)
395            .context(DataFusionPlanningSnafu)?
396            .build()
397            .context(DataFusionPlanningSnafu)
398    }
399
400    async fn prom_unary_expr_to_plan(
401        &mut self,
402        session_state: &SessionState,
403        unary_expr: &UnaryExpr,
404    ) -> Result<LogicalPlan> {
405        let UnaryExpr { expr } = unary_expr;
406        // Unary Expr in PromQL implys the `-` operator
407        let input = self.prom_expr_to_plan(expr, session_state).await?;
408        self.projection_for_each_field_column(input, |col| {
409            Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
410        })
411    }
412
413    async fn prom_binary_expr_to_plan(
414        &mut self,
415        session_state: &SessionState,
416        binary_expr: &PromBinaryExpr,
417    ) -> Result<LogicalPlan> {
418        let PromBinaryExpr {
419            lhs,
420            rhs,
421            op,
422            modifier,
423        } = binary_expr;
424
425        // if set to true, comparison operator will return 0/1 (for true/false) instead of
426        // filter on the result column
427        let should_return_bool = if let Some(m) = modifier {
428            m.return_bool
429        } else {
430            false
431        };
432        let is_comparison_op = Self::is_token_a_comparison_op(*op);
433
434        // we should build a filter plan here if the op is comparison op and need not
435        // to return 0/1. Otherwise, we should build a projection plan
436        match (
437            Self::try_build_literal_expr(lhs),
438            Self::try_build_literal_expr(rhs),
439        ) {
440            (Some(lhs), Some(rhs)) => {
441                self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
442                self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
443                self.ctx.reset_table_name_and_schema();
444                let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
445                let mut field_expr = field_expr_builder(lhs, rhs)?;
446
447                if is_comparison_op && should_return_bool {
448                    field_expr = DfExpr::Cast(Cast {
449                        expr: Box::new(field_expr),
450                        data_type: ArrowDataType::Float64,
451                    });
452                }
453
454                Ok(LogicalPlan::Extension(Extension {
455                    node: Arc::new(
456                        EmptyMetric::new(
457                            self.ctx.start,
458                            self.ctx.end,
459                            self.ctx.interval,
460                            SPECIAL_TIME_FUNCTION.to_string(),
461                            DEFAULT_FIELD_COLUMN.to_string(),
462                            Some(field_expr),
463                        )
464                        .context(DataFusionPlanningSnafu)?,
465                    ),
466                }))
467            }
468            // lhs is a literal, rhs is a column
469            (Some(mut expr), None) => {
470                let input = self.prom_expr_to_plan(rhs, session_state).await?;
471                // check if the literal is a special time expr
472                if let Some(time_expr) = Self::try_build_special_time_expr(
473                    lhs,
474                    self.ctx.time_index_column.as_ref().unwrap(),
475                ) {
476                    expr = time_expr
477                }
478                let bin_expr_builder = |col: &String| {
479                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
480                    let mut binary_expr =
481                        binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
482
483                    if is_comparison_op && should_return_bool {
484                        binary_expr = DfExpr::Cast(Cast {
485                            expr: Box::new(binary_expr),
486                            data_type: ArrowDataType::Float64,
487                        });
488                    }
489                    Ok(binary_expr)
490                };
491                if is_comparison_op && !should_return_bool {
492                    self.filter_on_field_column(input, bin_expr_builder)
493                } else {
494                    self.projection_for_each_field_column(input, bin_expr_builder)
495                }
496            }
497            // lhs is a column, rhs is a literal
498            (None, Some(mut expr)) => {
499                let input = self.prom_expr_to_plan(lhs, session_state).await?;
500                // check if the literal is a special time expr
501                if let Some(time_expr) = Self::try_build_special_time_expr(
502                    rhs,
503                    self.ctx.time_index_column.as_ref().unwrap(),
504                ) {
505                    expr = time_expr
506                }
507                let bin_expr_builder = |col: &String| {
508                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
509                    let mut binary_expr =
510                        binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
511
512                    if is_comparison_op && should_return_bool {
513                        binary_expr = DfExpr::Cast(Cast {
514                            expr: Box::new(binary_expr),
515                            data_type: ArrowDataType::Float64,
516                        });
517                    }
518                    Ok(binary_expr)
519                };
520                if is_comparison_op && !should_return_bool {
521                    self.filter_on_field_column(input, bin_expr_builder)
522                } else {
523                    self.projection_for_each_field_column(input, bin_expr_builder)
524                }
525            }
526            // both are columns. join them on time index
527            (None, None) => {
528                let left_input = self.prom_expr_to_plan(lhs, session_state).await?;
529                let left_field_columns = self.ctx.field_columns.clone();
530                let left_time_index_column = self.ctx.time_index_column.clone();
531                let mut left_table_ref = self
532                    .table_ref()
533                    .unwrap_or_else(|_| TableReference::bare(""));
534                let left_context = self.ctx.clone();
535
536                let right_input = self.prom_expr_to_plan(rhs, session_state).await?;
537                let right_field_columns = self.ctx.field_columns.clone();
538                let right_time_index_column = self.ctx.time_index_column.clone();
539                let mut right_table_ref = self
540                    .table_ref()
541                    .unwrap_or_else(|_| TableReference::bare(""));
542                let right_context = self.ctx.clone();
543
544                // TODO(ruihang): avoid join if left and right are the same table
545
546                // set op has "special" join semantics
547                if Self::is_token_a_set_op(*op) {
548                    return self.set_op_on_non_field_columns(
549                        left_input,
550                        right_input,
551                        left_context,
552                        right_context,
553                        *op,
554                        modifier,
555                    );
556                }
557
558                // normal join
559                if left_table_ref == right_table_ref {
560                    // rename table references to avoid ambiguity
561                    left_table_ref = TableReference::bare("lhs");
562                    right_table_ref = TableReference::bare("rhs");
563                    // `self.ctx` have ctx in right plan, if right plan have no tag,
564                    // we use left plan ctx as the ctx for subsequent calculations,
565                    // to avoid case like `host + scalar(...)`
566                    // we need preserve tag column on `host` table in subsequent projection,
567                    // which only show in left plan ctx.
568                    if self.ctx.tag_columns.is_empty() {
569                        self.ctx = left_context.clone();
570                        self.ctx.table_name = Some("lhs".to_string());
571                    } else {
572                        self.ctx.table_name = Some("rhs".to_string());
573                    }
574                }
575                let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
576
577                let join_plan = self.join_on_non_field_columns(
578                    left_input,
579                    right_input,
580                    left_table_ref.clone(),
581                    right_table_ref.clone(),
582                    left_time_index_column,
583                    right_time_index_column,
584                    // if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
585                    // under this case we only join on time index
586                    left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
587                    modifier,
588                )?;
589                let join_plan_schema = join_plan.schema().clone();
590
591                let bin_expr_builder = |_: &String| {
592                    let (left_col_name, right_col_name) = field_columns.next().unwrap();
593                    let left_col = join_plan_schema
594                        .qualified_field_with_name(Some(&left_table_ref), left_col_name)
595                        .context(DataFusionPlanningSnafu)?
596                        .into();
597                    let right_col = join_plan_schema
598                        .qualified_field_with_name(Some(&right_table_ref), right_col_name)
599                        .context(DataFusionPlanningSnafu)?
600                        .into();
601
602                    let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
603                    let mut binary_expr =
604                        binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
605                    if is_comparison_op && should_return_bool {
606                        binary_expr = DfExpr::Cast(Cast {
607                            expr: Box::new(binary_expr),
608                            data_type: ArrowDataType::Float64,
609                        });
610                    }
611                    Ok(binary_expr)
612                };
613                if is_comparison_op && !should_return_bool {
614                    self.filter_on_field_column(join_plan, bin_expr_builder)
615                } else {
616                    self.projection_for_each_field_column(join_plan, bin_expr_builder)
617                }
618            }
619        }
620    }
621
622    fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
623        let NumberLiteral { val } = number_literal;
624        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
625        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
626        self.ctx.reset_table_name_and_schema();
627        let literal_expr = df_prelude::lit(*val);
628
629        let plan = LogicalPlan::Extension(Extension {
630            node: Arc::new(
631                EmptyMetric::new(
632                    self.ctx.start,
633                    self.ctx.end,
634                    self.ctx.interval,
635                    SPECIAL_TIME_FUNCTION.to_string(),
636                    DEFAULT_FIELD_COLUMN.to_string(),
637                    Some(literal_expr),
638                )
639                .context(DataFusionPlanningSnafu)?,
640            ),
641        });
642        Ok(plan)
643    }
644
645    fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
646        let StringLiteral { val } = string_literal;
647        self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
648        self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
649        self.ctx.reset_table_name_and_schema();
650        let literal_expr = df_prelude::lit(val.to_string());
651
652        let plan = LogicalPlan::Extension(Extension {
653            node: Arc::new(
654                EmptyMetric::new(
655                    self.ctx.start,
656                    self.ctx.end,
657                    self.ctx.interval,
658                    SPECIAL_TIME_FUNCTION.to_string(),
659                    DEFAULT_FIELD_COLUMN.to_string(),
660                    Some(literal_expr),
661                )
662                .context(DataFusionPlanningSnafu)?,
663            ),
664        });
665        Ok(plan)
666    }
667
668    async fn prom_vector_selector_to_plan(
669        &mut self,
670        vector_selector: &VectorSelector,
671    ) -> Result<LogicalPlan> {
672        let VectorSelector {
673            name,
674            offset,
675            matchers,
676            at: _,
677        } = vector_selector;
678        let matchers = self.preprocess_label_matchers(matchers, name)?;
679        self.setup_context().await?;
680        let normalize = self
681            .selector_to_series_normalize_plan(offset, matchers, false)
682            .await?;
683        let manipulate = InstantManipulate::new(
684            self.ctx.start,
685            self.ctx.end,
686            self.ctx.lookback_delta,
687            self.ctx.interval,
688            self.ctx
689                .time_index_column
690                .clone()
691                .expect("time index should be set in `setup_context`"),
692            self.ctx.field_columns.first().cloned(),
693            normalize,
694        );
695        Ok(LogicalPlan::Extension(Extension {
696            node: Arc::new(manipulate),
697        }))
698    }
699
700    async fn prom_matrix_selector_to_plan(
701        &mut self,
702        matrix_selector: &MatrixSelector,
703    ) -> Result<LogicalPlan> {
704        let MatrixSelector { vs, range } = matrix_selector;
705        let VectorSelector {
706            name,
707            offset,
708            matchers,
709            ..
710        } = vs;
711        let matchers = self.preprocess_label_matchers(matchers, name)?;
712        self.setup_context().await?;
713
714        ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
715        let range_ms = range.as_millis() as _;
716        self.ctx.range = Some(range_ms);
717
718        let normalize = self
719            .selector_to_series_normalize_plan(offset, matchers, true)
720            .await?;
721        let manipulate = RangeManipulate::new(
722            self.ctx.start,
723            self.ctx.end,
724            self.ctx.interval,
725            // TODO(ruihang): convert via Timestamp datatypes to support different time units
726            range_ms,
727            self.ctx
728                .time_index_column
729                .clone()
730                .expect("time index should be set in `setup_context`"),
731            self.ctx.field_columns.clone(),
732            normalize,
733        )
734        .context(DataFusionPlanningSnafu)?;
735
736        Ok(LogicalPlan::Extension(Extension {
737            node: Arc::new(manipulate),
738        }))
739    }
740
741    async fn prom_call_expr_to_plan(
742        &mut self,
743        session_state: &SessionState,
744        call_expr: &Call,
745    ) -> Result<LogicalPlan> {
746        let Call { func, args } = call_expr;
747        // some special functions that are not expression but a plan
748        match func.name {
749            SPECIAL_HISTOGRAM_QUANTILE => {
750                return self.create_histogram_plan(args, session_state).await
751            }
752            SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
753            SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await,
754            _ => {}
755        }
756
757        // transform function arguments
758        let args = self.create_function_args(&args.args)?;
759        let input = if let Some(prom_expr) = &args.input {
760            self.prom_expr_to_plan(prom_expr, session_state).await?
761        } else {
762            self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
763            self.ctx.reset_table_name_and_schema();
764            self.ctx.tag_columns = vec![];
765            self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
766            LogicalPlan::Extension(Extension {
767                node: Arc::new(
768                    EmptyMetric::new(
769                        self.ctx.start,
770                        self.ctx.end,
771                        self.ctx.interval,
772                        SPECIAL_TIME_FUNCTION.to_string(),
773                        DEFAULT_FIELD_COLUMN.to_string(),
774                        None,
775                    )
776                    .context(DataFusionPlanningSnafu)?,
777                ),
778            })
779        };
780        let mut func_exprs =
781            self.create_function_expr(func, args.literals.clone(), session_state)?;
782        func_exprs.insert(0, self.create_time_index_column_expr()?);
783        func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
784
785        let builder = LogicalPlanBuilder::from(input)
786            .project(func_exprs)
787            .context(DataFusionPlanningSnafu)?
788            .filter(self.create_empty_values_filter_expr()?)
789            .context(DataFusionPlanningSnafu)?;
790
791        let builder = match func.name {
792            "sort" => builder
793                .sort(self.create_field_columns_sort_exprs(true))
794                .context(DataFusionPlanningSnafu)?,
795            "sort_desc" => builder
796                .sort(self.create_field_columns_sort_exprs(false))
797                .context(DataFusionPlanningSnafu)?,
798            "sort_by_label" => builder
799                .sort(Self::create_sort_exprs_by_tags(
800                    func.name,
801                    args.literals,
802                    true,
803                )?)
804                .context(DataFusionPlanningSnafu)?,
805            "sort_by_label_desc" => builder
806                .sort(Self::create_sort_exprs_by_tags(
807                    func.name,
808                    args.literals,
809                    false,
810                )?)
811                .context(DataFusionPlanningSnafu)?,
812
813            _ => builder,
814        };
815
816        builder.build().context(DataFusionPlanningSnafu)
817    }
818
819    async fn prom_ext_expr_to_plan(
820        &mut self,
821        session_state: &SessionState,
822        ext_expr: &promql_parser::parser::ast::Extension,
823    ) -> Result<LogicalPlan> {
824        // let promql_parser::parser::ast::Extension { expr } = ext_expr;
825        let expr = &ext_expr.expr;
826        let children = expr.children();
827        let plan = self.prom_expr_to_plan(&children[0], session_state).await?;
828        // Wrapper for the explanation/analyze of the existing plan
829        // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
830        // if `analyze` is true, runs the actual plan and produces
831        // information about metrics during run.
832        // if `verbose` is true, prints out additional details when VERBOSE keyword is specified
833        match expr.name() {
834            "ANALYZE" => LogicalPlanBuilder::from(plan)
835                .explain(false, true)
836                .unwrap()
837                .build()
838                .context(DataFusionPlanningSnafu),
839            "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
840                .explain(true, true)
841                .unwrap()
842                .build()
843                .context(DataFusionPlanningSnafu),
844            "EXPLAIN" => LogicalPlanBuilder::from(plan)
845                .explain(false, false)
846                .unwrap()
847                .build()
848                .context(DataFusionPlanningSnafu),
849            "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
850                .explain(true, false)
851                .unwrap()
852                .build()
853                .context(DataFusionPlanningSnafu),
854            _ => LogicalPlanBuilder::empty(true)
855                .build()
856                .context(DataFusionPlanningSnafu),
857        }
858    }
859
860    /// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].
861    /// Returns a new [Matchers] that doesn't contain metric name matcher.
862    ///
863    /// Each call to this function means new selector is started. Thus, the context will be reset
864    /// at first.
865    ///
866    /// Name rule:
867    /// - if `name` is some, then the matchers MUST NOT contain `__name__` matcher.
868    /// - if `name` is none, then the matchers MAY contain NONE OR MULTIPLE `__name__` matchers.
869    #[allow(clippy::mutable_key_type)]
870    fn preprocess_label_matchers(
871        &mut self,
872        label_matchers: &Matchers,
873        name: &Option<String>,
874    ) -> Result<Matchers> {
875        self.ctx.reset();
876
877        let metric_name;
878        if let Some(name) = name.clone() {
879            metric_name = Some(name);
880            ensure!(
881                label_matchers.find_matchers(METRIC_NAME).is_empty(),
882                MultipleMetricMatchersSnafu
883            );
884        } else {
885            let mut matches = label_matchers.find_matchers(METRIC_NAME);
886            ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
887            ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
888            ensure!(
889                matches[0].op == MatchOp::Equal,
890                UnsupportedMatcherOpSnafu {
891                    matcher_op: matches[0].op.to_string(),
892                    matcher: METRIC_NAME
893                }
894            );
895            metric_name = matches.pop().map(|m| m.value);
896        }
897
898        self.ctx.table_name = metric_name;
899
900        let mut matchers = HashSet::new();
901        for matcher in &label_matchers.matchers {
902            // TODO(ruihang): support other metric match ops
903            if matcher.name == FIELD_COLUMN_MATCHER {
904                self.ctx
905                    .field_column_matcher
906                    .get_or_insert_default()
907                    .push(matcher.clone());
908            } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
909                ensure!(
910                    matcher.op == MatchOp::Equal,
911                    UnsupportedMatcherOpSnafu {
912                        matcher: matcher.name.to_string(),
913                        matcher_op: matcher.op.to_string(),
914                    }
915                );
916                self.ctx.schema_name = Some(matcher.value.clone());
917            } else if matcher.name != METRIC_NAME {
918                let _ = matchers.insert(matcher.clone());
919            }
920        }
921
922        Ok(Matchers::new(
923            matchers.into_iter().map(normalize_matcher).collect(),
924        ))
925    }
926
927    async fn selector_to_series_normalize_plan(
928        &mut self,
929        offset: &Option<Offset>,
930        label_matchers: Matchers,
931        is_range_selector: bool,
932    ) -> Result<LogicalPlan> {
933        // make table scan plan
934        let table_ref = self.table_ref()?;
935        let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
936        let table_schema = table_scan.schema();
937
938        // make filter exprs
939        let offset_duration = match offset {
940            Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
941            Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
942            None => 0,
943        };
944        let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
945        if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
946            scan_filters.push(time_index_filter);
947        }
948        table_scan = LogicalPlanBuilder::from(table_scan)
949            .filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
950            .context(DataFusionPlanningSnafu)?
951            .build()
952            .context(DataFusionPlanningSnafu)?;
953
954        // make a projection plan if there is any `__field__` matcher
955        if let Some(field_matchers) = &self.ctx.field_column_matcher {
956            let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
957            // opt-in set
958            let mut result_set = HashSet::new();
959            // opt-out set
960            let mut reverse_set = HashSet::new();
961            for matcher in field_matchers {
962                match &matcher.op {
963                    MatchOp::Equal => {
964                        if col_set.contains(&matcher.value) {
965                            let _ = result_set.insert(matcher.value.clone());
966                        } else {
967                            return Err(ColumnNotFoundSnafu {
968                                col: matcher.value.clone(),
969                            }
970                            .build());
971                        }
972                    }
973                    MatchOp::NotEqual => {
974                        if col_set.contains(&matcher.value) {
975                            let _ = reverse_set.insert(matcher.value.clone());
976                        } else {
977                            return Err(ColumnNotFoundSnafu {
978                                col: matcher.value.clone(),
979                            }
980                            .build());
981                        }
982                    }
983                    MatchOp::Re(regex) => {
984                        for col in &self.ctx.field_columns {
985                            if regex.is_match(col) {
986                                let _ = result_set.insert(col.clone());
987                            }
988                        }
989                    }
990                    MatchOp::NotRe(regex) => {
991                        for col in &self.ctx.field_columns {
992                            if regex.is_match(col) {
993                                let _ = reverse_set.insert(col.clone());
994                            }
995                        }
996                    }
997                }
998            }
999            // merge two set
1000            if result_set.is_empty() {
1001                result_set = col_set.into_iter().cloned().collect();
1002            }
1003            for col in reverse_set {
1004                let _ = result_set.remove(&col);
1005            }
1006
1007            // mask the field columns in context using computed result set
1008            self.ctx.field_columns = self
1009                .ctx
1010                .field_columns
1011                .drain(..)
1012                .filter(|col| result_set.contains(col))
1013                .collect();
1014
1015            let exprs = result_set
1016                .into_iter()
1017                .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1018                .chain(self.create_tag_column_exprs()?)
1019                .chain(Some(self.create_time_index_column_expr()?))
1020                .collect::<Vec<_>>();
1021
1022            // reuse this variable for simplicity
1023            table_scan = LogicalPlanBuilder::from(table_scan)
1024                .project(exprs)
1025                .context(DataFusionPlanningSnafu)?
1026                .build()
1027                .context(DataFusionPlanningSnafu)?;
1028        }
1029
1030        // make sort plan
1031        let sort_plan = LogicalPlanBuilder::from(table_scan)
1032            .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1033            .context(DataFusionPlanningSnafu)?
1034            .build()
1035            .context(DataFusionPlanningSnafu)?;
1036
1037        // make divide plan
1038        let time_index_column =
1039            self.ctx
1040                .time_index_column
1041                .clone()
1042                .with_context(|| TimeIndexNotFoundSnafu {
1043                    table: table_ref.to_string(),
1044                })?;
1045        let divide_plan = LogicalPlan::Extension(Extension {
1046            node: Arc::new(SeriesDivide::new(
1047                self.ctx.tag_columns.clone(),
1048                time_index_column,
1049                sort_plan,
1050            )),
1051        });
1052
1053        // make series_normalize plan
1054        if !is_range_selector && offset_duration == 0 {
1055            return Ok(divide_plan);
1056        }
1057        let series_normalize = SeriesNormalize::new(
1058            offset_duration,
1059            self.ctx
1060                .time_index_column
1061                .clone()
1062                .with_context(|| TimeIndexNotFoundSnafu {
1063                    table: table_ref.to_quoted_string(),
1064                })?,
1065            is_range_selector,
1066            self.ctx.tag_columns.clone(),
1067            divide_plan,
1068        );
1069        let logical_plan = LogicalPlan::Extension(Extension {
1070            node: Arc::new(series_normalize),
1071        });
1072
1073        Ok(logical_plan)
1074    }
1075
1076    /// Convert [LabelModifier] to [Column] exprs for aggregation.
1077    /// Timestamp column and tag columns will be included.
1078    ///
1079    /// # Side effect
1080    ///
1081    /// This method will also change the tag columns in ctx if `update_ctx` is true.
1082    fn agg_modifier_to_col(
1083        &mut self,
1084        input_schema: &DFSchemaRef,
1085        modifier: &Option<LabelModifier>,
1086        update_ctx: bool,
1087    ) -> Result<Vec<DfExpr>> {
1088        match modifier {
1089            None => {
1090                if update_ctx {
1091                    self.ctx.tag_columns = vec![];
1092                }
1093                Ok(vec![self.create_time_index_column_expr()?])
1094            }
1095            Some(LabelModifier::Include(labels)) => {
1096                let mut exprs = Vec::with_capacity(labels.labels.len());
1097                for label in &labels.labels {
1098                    // nonexistence label will be ignored
1099                    if let Ok(field) = input_schema.field_with_unqualified_name(label) {
1100                        exprs.push(DfExpr::Column(Column::from(field.name())));
1101                    }
1102                }
1103
1104                if update_ctx {
1105                    // change the tag columns in context
1106                    self.ctx.tag_columns.clone_from(&labels.labels);
1107                }
1108
1109                // add timestamp column
1110                exprs.push(self.create_time_index_column_expr()?);
1111
1112                Ok(exprs)
1113            }
1114            Some(LabelModifier::Exclude(labels)) => {
1115                let mut all_fields = input_schema
1116                    .fields()
1117                    .iter()
1118                    .map(|f| f.name())
1119                    .collect::<BTreeSet<_>>();
1120
1121                // remove "without"-ed fields
1122                // nonexistence label will be ignored
1123                for label in &labels.labels {
1124                    let _ = all_fields.remove(label);
1125                }
1126
1127                // remove time index and value fields
1128                if let Some(time_index) = &self.ctx.time_index_column {
1129                    let _ = all_fields.remove(time_index);
1130                }
1131                for value in &self.ctx.field_columns {
1132                    let _ = all_fields.remove(value);
1133                }
1134
1135                if update_ctx {
1136                    // change the tag columns in context
1137                    self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1138                }
1139
1140                // collect remaining fields and convert to col expr
1141                let mut exprs = all_fields
1142                    .into_iter()
1143                    .map(|c| DfExpr::Column(Column::from(c)))
1144                    .collect::<Vec<_>>();
1145
1146                // add timestamp column
1147                exprs.push(self.create_time_index_column_expr()?);
1148
1149                Ok(exprs)
1150            }
1151        }
1152    }
1153
1154    // TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
1155    pub fn matchers_to_expr(
1156        label_matchers: Matchers,
1157        table_schema: &DFSchemaRef,
1158    ) -> Result<Vec<DfExpr>> {
1159        let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1160        for matcher in label_matchers.matchers {
1161            let col = if table_schema
1162                .field_with_unqualified_name(&matcher.name)
1163                .is_err()
1164            {
1165                DfExpr::Literal(ScalarValue::Utf8(Some(String::new()))).alias(matcher.name)
1166            } else {
1167                DfExpr::Column(Column::from_name(matcher.name))
1168            };
1169            let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)));
1170            let expr = match matcher.op {
1171                MatchOp::Equal => col.eq(lit),
1172                MatchOp::NotEqual => col.not_eq(lit),
1173                MatchOp::Re(re) => {
1174                    // TODO(ruihang): a more programmatic way to handle this in datafusion
1175                    if re.as_str() == ".*" {
1176                        continue;
1177                    }
1178                    DfExpr::BinaryExpr(BinaryExpr {
1179                        left: Box::new(col),
1180                        op: Operator::RegexMatch,
1181                        right: Box::new(DfExpr::Literal(ScalarValue::Utf8(Some(
1182                            re.as_str().to_string(),
1183                        )))),
1184                    })
1185                }
1186                MatchOp::NotRe(re) => DfExpr::BinaryExpr(BinaryExpr {
1187                    left: Box::new(col),
1188                    op: Operator::RegexNotMatch,
1189                    right: Box::new(DfExpr::Literal(ScalarValue::Utf8(Some(
1190                        re.as_str().to_string(),
1191                    )))),
1192                }),
1193            };
1194            exprs.push(expr);
1195        }
1196
1197        Ok(exprs)
1198    }
1199
1200    fn table_ref(&self) -> Result<TableReference> {
1201        let table_name = self
1202            .ctx
1203            .table_name
1204            .clone()
1205            .context(TableNameNotFoundSnafu)?;
1206
1207        // set schema name if `__schema__` is given
1208        let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1209            TableReference::partial(schema_name.as_str(), table_name.as_str())
1210        } else {
1211            TableReference::bare(table_name.as_str())
1212        };
1213
1214        Ok(table_ref)
1215    }
1216
1217    fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1218        let start = self.ctx.start;
1219        let end = self.ctx.end;
1220        if end < start {
1221            return InvalidTimeRangeSnafu { start, end }.fail();
1222        }
1223        let lookback_delta = self.ctx.lookback_delta;
1224        let range = self.ctx.range.unwrap_or_default();
1225        let interval = self.ctx.interval;
1226        let time_index_expr = self.create_time_index_column_expr()?;
1227        let num_points = (end - start) / interval;
1228
1229        // Scan a continuous time range
1230        if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1231            let single_time_range = time_index_expr
1232                .clone()
1233                .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1234                    Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
1235                    None,
1236                )))
1237                .and(
1238                    time_index_expr.lt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1239                        Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
1240                        None,
1241                    ))),
1242                );
1243            return Ok(Some(single_time_range));
1244        }
1245
1246        // Otherwise scan scatter ranges separately
1247        let mut filters = Vec::with_capacity(num_points as usize);
1248        for timestamp in (start..end).step_by(interval as usize) {
1249            filters.push(
1250                time_index_expr
1251                    .clone()
1252                    .gt_eq(DfExpr::Literal(ScalarValue::TimestampMillisecond(
1253                        Some(timestamp - offset_duration - lookback_delta - range),
1254                        None,
1255                    )))
1256                    .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1257                        ScalarValue::TimestampMillisecond(
1258                            Some(timestamp - offset_duration + lookback_delta),
1259                            None,
1260                        ),
1261                    ))),
1262            )
1263        }
1264
1265        Ok(filters.into_iter().reduce(DfExpr::or))
1266    }
1267
1268    /// Create a table scan plan and a filter plan with given filter.
1269    ///
1270    /// # Panic
1271    /// If the filter is empty
1272    async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1273        let provider = self
1274            .table_provider
1275            .resolve_table(table_ref.clone())
1276            .await
1277            .context(CatalogSnafu)?;
1278
1279        let is_time_index_ms = provider
1280            .as_any()
1281            .downcast_ref::<DefaultTableSource>()
1282            .context(UnknownTableSnafu)?
1283            .table_provider
1284            .as_any()
1285            .downcast_ref::<DfTableProviderAdapter>()
1286            .context(UnknownTableSnafu)?
1287            .table()
1288            .schema()
1289            .timestamp_column()
1290            .with_context(|| TimeIndexNotFoundSnafu {
1291                table: table_ref.to_quoted_string(),
1292            })?
1293            .data_type
1294            == ConcreteDataType::timestamp_millisecond_datatype();
1295
1296        let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1297            .context(DataFusionPlanningSnafu)?
1298            .build()
1299            .context(DataFusionPlanningSnafu)?;
1300
1301        if !is_time_index_ms {
1302            // cast to ms if time_index not in Millisecond precision
1303            let expr: Vec<_> = self
1304                .ctx
1305                .field_columns
1306                .iter()
1307                .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1308                .chain(self.create_tag_column_exprs()?)
1309                .chain(Some(DfExpr::Alias(Alias {
1310                    expr: Box::new(DfExpr::Cast(Cast {
1311                        expr: Box::new(self.create_time_index_column_expr()?),
1312                        data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1313                    })),
1314                    relation: Some(table_ref.clone()),
1315                    name: self
1316                        .ctx
1317                        .time_index_column
1318                        .as_ref()
1319                        .with_context(|| TimeIndexNotFoundSnafu {
1320                            table: table_ref.to_quoted_string(),
1321                        })?
1322                        .clone(),
1323                })))
1324                .collect::<Vec<_>>();
1325            scan_plan = LogicalPlanBuilder::from(scan_plan)
1326                .project(expr)
1327                .context(DataFusionPlanningSnafu)?
1328                .build()
1329                .context(DataFusionPlanningSnafu)?;
1330        }
1331
1332        let result = LogicalPlanBuilder::from(scan_plan)
1333            .build()
1334            .context(DataFusionPlanningSnafu)?;
1335        Ok(result)
1336    }
1337
1338    /// Setup [PromPlannerContext]'s state fields.
1339    async fn setup_context(&mut self) -> Result<()> {
1340        let table_ref = self.table_ref()?;
1341        let table = self
1342            .table_provider
1343            .resolve_table(table_ref.clone())
1344            .await
1345            .context(CatalogSnafu)?
1346            .as_any()
1347            .downcast_ref::<DefaultTableSource>()
1348            .context(UnknownTableSnafu)?
1349            .table_provider
1350            .as_any()
1351            .downcast_ref::<DfTableProviderAdapter>()
1352            .context(UnknownTableSnafu)?
1353            .table();
1354
1355        // set time index column name
1356        let time_index = table
1357            .schema()
1358            .timestamp_column()
1359            .with_context(|| TimeIndexNotFoundSnafu {
1360                table: table_ref.to_quoted_string(),
1361            })?
1362            .name
1363            .clone();
1364        self.ctx.time_index_column = Some(time_index);
1365
1366        // set values columns
1367        let values = table
1368            .table_info()
1369            .meta
1370            .field_column_names()
1371            .cloned()
1372            .collect();
1373        self.ctx.field_columns = values;
1374
1375        // set primary key (tag) columns
1376        let tags = table
1377            .table_info()
1378            .meta
1379            .row_key_column_names()
1380            .filter(|col| {
1381                // remove metric engine's internal columns
1382                col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1383            })
1384            .cloned()
1385            .collect();
1386        self.ctx.tag_columns = tags;
1387
1388        Ok(())
1389    }
1390
1391    // TODO(ruihang): insert column expr
1392    fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1393        let mut result = FunctionArgs::default();
1394
1395        for arg in args {
1396            match *arg.clone() {
1397                PromExpr::Aggregate(_)
1398                | PromExpr::Unary(_)
1399                | PromExpr::Binary(_)
1400                | PromExpr::Paren(_)
1401                | PromExpr::Subquery(_)
1402                | PromExpr::VectorSelector(_)
1403                | PromExpr::MatrixSelector(_)
1404                | PromExpr::Extension(_)
1405                | PromExpr::Call(_) => {
1406                    if result.input.replace(*arg.clone()).is_some() {
1407                        MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1408                    }
1409                }
1410
1411                PromExpr::NumberLiteral(NumberLiteral { val, .. }) => {
1412                    let scalar_value = ScalarValue::Float64(Some(val));
1413                    result.literals.push(DfExpr::Literal(scalar_value));
1414                }
1415                PromExpr::StringLiteral(StringLiteral { val, .. }) => {
1416                    let scalar_value = ScalarValue::Utf8(Some(val));
1417                    result.literals.push(DfExpr::Literal(scalar_value));
1418                }
1419            }
1420        }
1421
1422        Ok(result)
1423    }
1424
1425    /// # Side Effects
1426    ///
1427    /// This method will update [PromPlannerContext]'s value fields.
1428    fn create_function_expr(
1429        &mut self,
1430        func: &Function,
1431        other_input_exprs: Vec<DfExpr>,
1432        session_state: &SessionState,
1433    ) -> Result<Vec<DfExpr>> {
1434        // TODO(ruihang): check function args list
1435        let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1436
1437        // TODO(ruihang): set this according to in-param list
1438        let field_column_pos = 0;
1439        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1440        let scalar_func = match func.name {
1441            "increase" => ScalarFunc::ExtrapolateUdf(
1442                Arc::new(Increase::scalar_udf()),
1443                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1444            ),
1445            "rate" => ScalarFunc::ExtrapolateUdf(
1446                Arc::new(Rate::scalar_udf()),
1447                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1448            ),
1449            "delta" => ScalarFunc::ExtrapolateUdf(
1450                Arc::new(Delta::scalar_udf()),
1451                self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1452            ),
1453            "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1454            "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1455            "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1456            "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1457            "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1458            "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1459            "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1460            "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1461            "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1462            "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1463            "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1464            "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1465            "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1466            "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1467            "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1468            "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1469            "predict_linear" => ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf())),
1470            "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1471            "time" => {
1472                exprs.push(build_special_time_expr(
1473                    self.ctx.time_index_column.as_ref().unwrap(),
1474                ));
1475                ScalarFunc::GeneratedExpr
1476            }
1477            "minute" => {
1478                // date_part('minute', time_index)
1479                let expr = self.date_part_on_time_index("minute")?;
1480                exprs.push(expr);
1481                ScalarFunc::GeneratedExpr
1482            }
1483            "hour" => {
1484                // date_part('hour', time_index)
1485                let expr = self.date_part_on_time_index("hour")?;
1486                exprs.push(expr);
1487                ScalarFunc::GeneratedExpr
1488            }
1489            "month" => {
1490                // date_part('month', time_index)
1491                let expr = self.date_part_on_time_index("month")?;
1492                exprs.push(expr);
1493                ScalarFunc::GeneratedExpr
1494            }
1495            "year" => {
1496                // date_part('year', time_index)
1497                let expr = self.date_part_on_time_index("year")?;
1498                exprs.push(expr);
1499                ScalarFunc::GeneratedExpr
1500            }
1501            "day_of_month" => {
1502                // date_part('day', time_index)
1503                let expr = self.date_part_on_time_index("day")?;
1504                exprs.push(expr);
1505                ScalarFunc::GeneratedExpr
1506            }
1507            "day_of_week" => {
1508                // date_part('dow', time_index)
1509                let expr = self.date_part_on_time_index("dow")?;
1510                exprs.push(expr);
1511                ScalarFunc::GeneratedExpr
1512            }
1513            "day_of_year" => {
1514                // date_part('doy', time_index)
1515                let expr = self.date_part_on_time_index("doy")?;
1516                exprs.push(expr);
1517                ScalarFunc::GeneratedExpr
1518            }
1519            "days_in_month" => {
1520                // date_part(
1521                //     'days',
1522                //     (date_trunc('month', <TIME INDEX>::date) + interval '1 month - 1 day')
1523                // );
1524                let day_lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some("day".to_string())));
1525                let month_lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some("month".to_string())));
1526                let interval_1month_lit_expr =
1527                    DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)));
1528                let interval_1day_lit_expr = DfExpr::Literal(ScalarValue::IntervalDayTime(Some(
1529                    IntervalDayTime::new(1, 0),
1530                )));
1531                let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1532                    left: Box::new(interval_1month_lit_expr),
1533                    op: Operator::Minus,
1534                    right: Box::new(interval_1day_lit_expr),
1535                });
1536                let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1537                    func: datafusion_functions::datetime::date_trunc(),
1538                    args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1539                });
1540                let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1541                    left: Box::new(date_trunc_expr),
1542                    op: Operator::Plus,
1543                    right: Box::new(the_1month_minus_1day_expr),
1544                });
1545                let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1546                    func: datafusion_functions::datetime::date_part(),
1547                    args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1548                });
1549
1550                exprs.push(date_part_expr);
1551                ScalarFunc::GeneratedExpr
1552            }
1553
1554            "label_join" => {
1555                let (concat_expr, dst_label) =
1556                    Self::build_concat_labels_expr(&mut other_input_exprs, session_state)?;
1557
1558                // Reserve the current field columns except the `dst_label`.
1559                for value in &self.ctx.field_columns {
1560                    if *value != dst_label {
1561                        let expr = DfExpr::Column(Column::from_name(value));
1562                        exprs.push(expr);
1563                    }
1564                }
1565
1566                // Remove it from tag columns
1567                self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1568
1569                // Add the new label expr
1570                exprs.push(concat_expr);
1571
1572                ScalarFunc::GeneratedExpr
1573            }
1574            "label_replace" => {
1575                let (replace_expr, dst_label) =
1576                    Self::build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?;
1577
1578                // Reserve the current field columns except the `dst_label`.
1579                for value in &self.ctx.field_columns {
1580                    if *value != dst_label {
1581                        let expr = DfExpr::Column(Column::from_name(value));
1582                        exprs.push(expr);
1583                    }
1584                }
1585
1586                // Remove it from tag columns
1587                self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1588
1589                // Add the new label expr
1590                exprs.push(replace_expr);
1591
1592                ScalarFunc::GeneratedExpr
1593            }
1594            "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" => {
1595                // These functions are not expression but a part of plan,
1596                // they are processed by `prom_call_expr_to_plan`.
1597                for value in &self.ctx.field_columns {
1598                    let expr = DfExpr::Column(Column::from_name(value));
1599                    exprs.push(expr);
1600                }
1601
1602                ScalarFunc::GeneratedExpr
1603            }
1604            "round" => {
1605                if other_input_exprs.is_empty() {
1606                    other_input_exprs.push_front(DfExpr::Literal(ScalarValue::Float64(Some(0.0))));
1607                }
1608                ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1609            }
1610
1611            _ => {
1612                if let Some(f) = session_state.scalar_functions().get(func.name) {
1613                    ScalarFunc::DataFusionBuiltin(f.clone())
1614                } else if let Some(f) = datafusion_functions::math::functions()
1615                    .iter()
1616                    .find(|f| f.name() == func.name)
1617                {
1618                    ScalarFunc::DataFusionUdf(f.clone())
1619                } else {
1620                    return UnsupportedExprSnafu {
1621                        name: func.name.to_string(),
1622                    }
1623                    .fail();
1624                }
1625            }
1626        };
1627
1628        for value in &self.ctx.field_columns {
1629            let col_expr = DfExpr::Column(Column::from_name(value));
1630
1631            match scalar_func.clone() {
1632                ScalarFunc::DataFusionBuiltin(func) => {
1633                    other_input_exprs.insert(field_column_pos, col_expr);
1634                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1635                        func,
1636                        args: other_input_exprs.clone().into(),
1637                    });
1638                    exprs.push(fn_expr);
1639                    let _ = other_input_exprs.remove(field_column_pos);
1640                }
1641                ScalarFunc::DataFusionUdf(func) => {
1642                    let args = itertools::chain!(
1643                        other_input_exprs.iter().take(field_column_pos).cloned(),
1644                        std::iter::once(col_expr),
1645                        other_input_exprs.iter().skip(field_column_pos).cloned()
1646                    )
1647                    .collect_vec();
1648                    exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1649                }
1650                ScalarFunc::Udf(func) => {
1651                    let ts_range_expr = DfExpr::Column(Column::from_name(
1652                        RangeManipulate::build_timestamp_range_name(
1653                            self.ctx.time_index_column.as_ref().unwrap(),
1654                        ),
1655                    ));
1656                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1657                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1658                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1659                        func,
1660                        args: other_input_exprs.clone().into(),
1661                    });
1662                    exprs.push(fn_expr);
1663                    let _ = other_input_exprs.remove(field_column_pos + 1);
1664                    let _ = other_input_exprs.remove(field_column_pos);
1665                }
1666                ScalarFunc::ExtrapolateUdf(func, range_length) => {
1667                    let ts_range_expr = DfExpr::Column(Column::from_name(
1668                        RangeManipulate::build_timestamp_range_name(
1669                            self.ctx.time_index_column.as_ref().unwrap(),
1670                        ),
1671                    ));
1672                    other_input_exprs.insert(field_column_pos, ts_range_expr);
1673                    other_input_exprs.insert(field_column_pos + 1, col_expr);
1674                    other_input_exprs
1675                        .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1676                    other_input_exprs.push_back(lit(range_length));
1677                    let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1678                        func,
1679                        args: other_input_exprs.clone().into(),
1680                    });
1681                    exprs.push(fn_expr);
1682                    let _ = other_input_exprs.pop_back();
1683                    let _ = other_input_exprs.remove(field_column_pos + 2);
1684                    let _ = other_input_exprs.remove(field_column_pos + 1);
1685                    let _ = other_input_exprs.remove(field_column_pos);
1686                }
1687                ScalarFunc::GeneratedExpr => {}
1688            }
1689        }
1690
1691        // update value columns' name, and alias them to remove qualifiers
1692        let mut new_field_columns = Vec::with_capacity(exprs.len());
1693
1694        exprs = exprs
1695            .into_iter()
1696            .map(|expr| {
1697                let display_name = expr.schema_name().to_string();
1698                new_field_columns.push(display_name.clone());
1699                Ok(expr.alias(display_name))
1700            })
1701            .collect::<std::result::Result<Vec<_>, _>>()
1702            .context(DataFusionPlanningSnafu)?;
1703
1704        self.ctx.field_columns = new_field_columns;
1705
1706        Ok(exprs)
1707    }
1708
1709    /// Build expr for `label_replace` function
1710    fn build_regexp_replace_label_expr(
1711        other_input_exprs: &mut VecDeque<DfExpr>,
1712        session_state: &SessionState,
1713    ) -> Result<(DfExpr, String)> {
1714        // label_replace(vector, dst_label, replacement, src_label, regex)
1715        let dst_label = match other_input_exprs.pop_front() {
1716            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1717            other => UnexpectedPlanExprSnafu {
1718                desc: format!("expected dst_label string literal, but found {:?}", other),
1719            }
1720            .fail()?,
1721        };
1722        let replacement = match other_input_exprs.pop_front() {
1723            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
1724            other => UnexpectedPlanExprSnafu {
1725                desc: format!("expected replacement string literal, but found {:?}", other),
1726            }
1727            .fail()?,
1728        };
1729        let src_label = match other_input_exprs.pop_front() {
1730            Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)))) => s,
1731            other => UnexpectedPlanExprSnafu {
1732                desc: format!("expected src_label string literal, but found {:?}", other),
1733            }
1734            .fail()?,
1735        };
1736        let regex = match other_input_exprs.pop_front() {
1737            Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
1738            other => UnexpectedPlanExprSnafu {
1739                desc: format!("expected regex string literal, but found {:?}", other),
1740            }
1741            .fail()?,
1742        };
1743
1744        let func = session_state
1745            .scalar_functions()
1746            .get("regexp_replace")
1747            .context(UnsupportedExprSnafu {
1748                name: "regexp_replace",
1749            })?;
1750
1751        // regexp_replace(src_label, regex, replacement)
1752        let args = vec![
1753            if src_label.is_empty() {
1754                DfExpr::Literal(ScalarValue::Utf8(Some(String::new())))
1755            } else {
1756                DfExpr::Column(Column::from_name(src_label))
1757            },
1758            DfExpr::Literal(ScalarValue::Utf8(Some(regex))),
1759            DfExpr::Literal(ScalarValue::Utf8(Some(replacement))),
1760        ];
1761
1762        Ok((
1763            DfExpr::ScalarFunction(ScalarFunction {
1764                func: func.clone(),
1765                args,
1766            })
1767            .alias(&dst_label),
1768            dst_label,
1769        ))
1770    }
1771
1772    /// Build expr for `label_join` function
1773    fn build_concat_labels_expr(
1774        other_input_exprs: &mut VecDeque<DfExpr>,
1775        session_state: &SessionState,
1776    ) -> Result<(DfExpr, String)> {
1777        // label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)
1778
1779        let dst_label = match other_input_exprs.pop_front() {
1780            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1781            other => UnexpectedPlanExprSnafu {
1782                desc: format!("expected dst_label string literal, but found {:?}", other),
1783            }
1784            .fail()?,
1785        };
1786        let separator = match other_input_exprs.pop_front() {
1787            Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
1788            other => UnexpectedPlanExprSnafu {
1789                desc: format!("expected separator string literal, but found {:?}", other),
1790            }
1791            .fail()?,
1792        };
1793        let src_labels = other_input_exprs
1794            .clone()
1795            .into_iter()
1796            .map(|expr| {
1797                // Cast source label into column
1798                match expr {
1799                    DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
1800                        if label.is_empty() {
1801                            Ok(DfExpr::Literal(ScalarValue::Null))
1802                        } else {
1803                            Ok(DfExpr::Column(Column::from_name(label)))
1804                        }
1805                    }
1806                    other => UnexpectedPlanExprSnafu {
1807                        desc: format!(
1808                            "expected source label string literal, but found {:?}",
1809                            other
1810                        ),
1811                    }
1812                    .fail(),
1813                }
1814            })
1815            .collect::<Result<Vec<_>>>()?;
1816        ensure!(
1817            !src_labels.is_empty(),
1818            FunctionInvalidArgumentSnafu {
1819                fn_name: "label_join"
1820            }
1821        );
1822
1823        let func = session_state
1824            .scalar_functions()
1825            .get("concat_ws")
1826            .context(UnsupportedExprSnafu { name: "concat_ws" })?;
1827
1828        // concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
1829        let mut args = Vec::with_capacity(1 + src_labels.len());
1830        args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator))));
1831        args.extend(src_labels);
1832
1833        Ok((
1834            DfExpr::ScalarFunction(ScalarFunction {
1835                func: func.clone(),
1836                args,
1837            })
1838            .alias(&dst_label),
1839            dst_label,
1840        ))
1841    }
1842
1843    fn create_time_index_column_expr(&self) -> Result<DfExpr> {
1844        Ok(DfExpr::Column(Column::from_name(
1845            self.ctx
1846                .time_index_column
1847                .clone()
1848                .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
1849        )))
1850    }
1851
1852    fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
1853        let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
1854        for tag in &self.ctx.tag_columns {
1855            let expr = DfExpr::Column(Column::from_name(tag));
1856            result.push(expr);
1857        }
1858        Ok(result)
1859    }
1860
1861    fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
1862        let mut result = Vec::with_capacity(self.ctx.field_columns.len());
1863        for field in &self.ctx.field_columns {
1864            let expr = DfExpr::Column(Column::from_name(field));
1865            result.push(expr);
1866        }
1867        Ok(result)
1868    }
1869
1870    fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
1871        let mut result = self
1872            .ctx
1873            .tag_columns
1874            .iter()
1875            .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
1876            .collect::<Vec<_>>();
1877        result.push(self.create_time_index_column_expr()?.sort(true, true));
1878        Ok(result)
1879    }
1880
1881    fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
1882        self.ctx
1883            .field_columns
1884            .iter()
1885            .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
1886            .collect::<Vec<_>>()
1887    }
1888
1889    fn create_sort_exprs_by_tags(
1890        func: &str,
1891        tags: Vec<DfExpr>,
1892        asc: bool,
1893    ) -> Result<Vec<SortExpr>> {
1894        ensure!(
1895            !tags.is_empty(),
1896            FunctionInvalidArgumentSnafu { fn_name: func }
1897        );
1898
1899        tags.iter()
1900            .map(|col| match col {
1901                DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
1902                    Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
1903                }
1904                other => UnexpectedPlanExprSnafu {
1905                    desc: format!("expected label string literal, but found {:?}", other),
1906                }
1907                .fail(),
1908            })
1909            .collect::<Result<Vec<_>>>()
1910    }
1911
1912    fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
1913        let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1914        for value in &self.ctx.field_columns {
1915            let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
1916            exprs.push(expr);
1917        }
1918
1919        conjunction(exprs).context(ValueNotFoundSnafu {
1920            table: self.table_ref()?.to_quoted_string(),
1921        })
1922    }
1923
1924    /// Creates a set of DataFusion `DfExpr::AggregateFunction` expressions for each value column using the specified aggregate function.
1925    ///
1926    /// # Side Effects
1927    ///
1928    /// This method modifies the value columns in the context by replacing them with the new columns
1929    /// created by the aggregate function application.
1930    ///
1931    /// # Returns
1932    ///
1933    /// Returns a tuple of `(aggregate_expressions, previous_field_expressions)` where:
1934    /// - `aggregate_expressions`: Expressions that apply the aggregate function to the original fields
1935    /// - `previous_field_expressions`: Original field expressions before aggregation. This is non-empty
1936    ///   only when the operation is `count_values`, as this operation requires preserving the original
1937    ///   values for grouping.
1938    ///
1939    fn create_aggregate_exprs(
1940        &mut self,
1941        op: TokenType,
1942        param: &Option<Box<PromExpr>>,
1943        input_plan: &LogicalPlan,
1944    ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
1945        let mut non_col_args = Vec::new();
1946        let aggr = match op.id() {
1947            token::T_SUM => sum_udaf(),
1948            token::T_QUANTILE => {
1949                let q = Self::get_param_value_as_f64(op, param)?;
1950                non_col_args.push(lit(q));
1951                quantile_udaf()
1952            }
1953            token::T_AVG => avg_udaf(),
1954            token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
1955            token::T_MIN => min_udaf(),
1956            token::T_MAX => max_udaf(),
1957            token::T_GROUP => grouping_udaf(),
1958            token::T_STDDEV => stddev_pop_udaf(),
1959            token::T_STDVAR => var_pop_udaf(),
1960            token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
1961                name: format!("{op:?}"),
1962            }
1963            .fail()?,
1964            _ => UnexpectedTokenSnafu { token: op }.fail()?,
1965        };
1966
1967        // perform aggregate operation to each value column
1968        let exprs: Vec<DfExpr> = self
1969            .ctx
1970            .field_columns
1971            .iter()
1972            .map(|col| {
1973                non_col_args.push(DfExpr::Column(Column::from_name(col)));
1974                let expr = aggr.call(non_col_args.clone());
1975                non_col_args.pop();
1976                expr
1977            })
1978            .collect::<Vec<_>>();
1979
1980        // if the aggregator is `count_values`, it must be grouped by current fields.
1981        let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
1982            let prev_field_exprs: Vec<_> = self
1983                .ctx
1984                .field_columns
1985                .iter()
1986                .map(|col| DfExpr::Column(Column::from_name(col)))
1987                .collect();
1988
1989            ensure!(
1990                self.ctx.field_columns.len() == 1,
1991                UnsupportedExprSnafu {
1992                    name: "count_values on multi-value input"
1993                }
1994            );
1995
1996            prev_field_exprs
1997        } else {
1998            vec![]
1999        };
2000
2001        // update value column name according to the aggregators,
2002        let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2003
2004        let normalized_exprs =
2005            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2006        for expr in normalized_exprs {
2007            new_field_columns.push(expr.schema_name().to_string());
2008        }
2009        self.ctx.field_columns = new_field_columns;
2010
2011        Ok((exprs, prev_field_exprs))
2012    }
2013
2014    fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2015        let param = param
2016            .as_deref()
2017            .with_context(|| FunctionInvalidArgumentSnafu {
2018                fn_name: op.to_string(),
2019            })?;
2020        let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2021            return FunctionInvalidArgumentSnafu {
2022                fn_name: op.to_string(),
2023            }
2024            .fail();
2025        };
2026
2027        Ok(val)
2028    }
2029
2030    fn get_param_value_as_f64(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<f64> {
2031        let param = param
2032            .as_deref()
2033            .with_context(|| FunctionInvalidArgumentSnafu {
2034                fn_name: op.to_string(),
2035            })?;
2036        let PromExpr::NumberLiteral(NumberLiteral { val }) = param else {
2037            return FunctionInvalidArgumentSnafu {
2038                fn_name: op.to_string(),
2039            }
2040            .fail();
2041        };
2042
2043        Ok(*val)
2044    }
2045
2046    /// Create [DfExpr::WindowFunction] expr for each value column with given window function.
2047    ///
2048    fn create_window_exprs(
2049        &mut self,
2050        op: TokenType,
2051        group_exprs: Vec<DfExpr>,
2052        input_plan: &LogicalPlan,
2053    ) -> Result<Vec<DfExpr>> {
2054        ensure!(
2055            self.ctx.field_columns.len() == 1,
2056            UnsupportedExprSnafu {
2057                name: "topk or bottomk on multi-value input"
2058            }
2059        );
2060
2061        assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2062
2063        let asc = matches!(op.id(), token::T_BOTTOMK);
2064
2065        let tag_sort_exprs = self
2066            .create_tag_column_exprs()?
2067            .into_iter()
2068            .map(|expr| expr.sort(asc, true));
2069
2070        // perform window operation to each value column
2071        let exprs: Vec<DfExpr> = self
2072            .ctx
2073            .field_columns
2074            .iter()
2075            .map(|col| {
2076                let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2077                // Order by value in the specific order
2078                sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2079                // Then tags if the values are equal,
2080                // Try to ensure the relative stability of the output results.
2081                sort_exprs.extend(tag_sort_exprs.clone());
2082
2083                DfExpr::WindowFunction(WindowFunction {
2084                    fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2085                    args: vec![],
2086                    partition_by: group_exprs.clone(),
2087                    order_by: sort_exprs,
2088                    window_frame: WindowFrame::new(Some(true)),
2089                    null_treatment: None,
2090                })
2091            })
2092            .collect();
2093
2094        let normalized_exprs =
2095            normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2096        Ok(normalized_exprs)
2097    }
2098
2099    /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
2100    async fn create_histogram_plan(
2101        &mut self,
2102        args: &PromFunctionArgs,
2103        session_state: &SessionState,
2104    ) -> Result<LogicalPlan> {
2105        if args.args.len() != 2 {
2106            return FunctionInvalidArgumentSnafu {
2107                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2108            }
2109            .fail();
2110        }
2111        let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2112            FunctionInvalidArgumentSnafu {
2113                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2114            }
2115        })?;
2116        let input = args.args[1].as_ref().clone();
2117        let input_plan = self.prom_expr_to_plan(&input, session_state).await?;
2118
2119        if !self.ctx.has_le_tag() {
2120            return ColumnNotFoundSnafu {
2121                col: LE_COLUMN_NAME.to_string(),
2122            }
2123            .fail();
2124        }
2125        let time_index_column =
2126            self.ctx
2127                .time_index_column
2128                .clone()
2129                .with_context(|| TimeIndexNotFoundSnafu {
2130                    table: self.ctx.table_name.clone().unwrap_or_default(),
2131                })?;
2132        // FIXME(ruihang): support multi fields
2133        let field_column = self
2134            .ctx
2135            .field_columns
2136            .first()
2137            .with_context(|| FunctionInvalidArgumentSnafu {
2138                fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2139            })?
2140            .clone();
2141        // remove le column from tag columns
2142        self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2143
2144        Ok(LogicalPlan::Extension(Extension {
2145            node: Arc::new(
2146                HistogramFold::new(
2147                    LE_COLUMN_NAME.to_string(),
2148                    field_column,
2149                    time_index_column,
2150                    phi,
2151                    input_plan,
2152                )
2153                .context(DataFusionPlanningSnafu)?,
2154            ),
2155        }))
2156    }
2157
2158    /// Create a [SPECIAL_VECTOR_FUNCTION] plan
2159    async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2160        if args.args.len() != 1 {
2161            return FunctionInvalidArgumentSnafu {
2162                fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2163            }
2164            .fail();
2165        }
2166        let lit = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2167            FunctionInvalidArgumentSnafu {
2168                fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2169            }
2170        })?;
2171
2172        // reuse `SPECIAL_TIME_FUNCTION` as name of time index column
2173        self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2174        self.ctx.reset_table_name_and_schema();
2175        self.ctx.tag_columns = vec![];
2176        self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()];
2177        Ok(LogicalPlan::Extension(Extension {
2178            node: Arc::new(
2179                EmptyMetric::new(
2180                    self.ctx.start,
2181                    self.ctx.end,
2182                    self.ctx.interval,
2183                    SPECIAL_TIME_FUNCTION.to_string(),
2184                    GREPTIME_VALUE.to_string(),
2185                    Some(DfExpr::Literal(ScalarValue::Float64(Some(lit)))),
2186                )
2187                .context(DataFusionPlanningSnafu)?,
2188            ),
2189        }))
2190    }
2191
2192    /// Create a [SCALAR_FUNCTION] plan
2193    async fn create_scalar_plan(
2194        &mut self,
2195        args: &PromFunctionArgs,
2196        session_state: &SessionState,
2197    ) -> Result<LogicalPlan> {
2198        ensure!(
2199            args.len() == 1,
2200            FunctionInvalidArgumentSnafu {
2201                fn_name: SCALAR_FUNCTION
2202            }
2203        );
2204        let input = self.prom_expr_to_plan(&args.args[0], session_state).await?;
2205        ensure!(
2206            self.ctx.field_columns.len() == 1,
2207            MultiFieldsNotSupportedSnafu {
2208                operator: SCALAR_FUNCTION
2209            },
2210        );
2211        let scalar_plan = LogicalPlan::Extension(Extension {
2212            node: Arc::new(
2213                ScalarCalculate::new(
2214                    self.ctx.start,
2215                    self.ctx.end,
2216                    self.ctx.interval,
2217                    input,
2218                    self.ctx.time_index_column.as_ref().unwrap(),
2219                    &self.ctx.tag_columns,
2220                    &self.ctx.field_columns[0],
2221                    self.ctx.table_name.as_deref(),
2222                )
2223                .context(PromqlPlanNodeSnafu)?,
2224            ),
2225        });
2226        // scalar plan have no tag columns
2227        self.ctx.tag_columns.clear();
2228        self.ctx.field_columns.clear();
2229        self.ctx
2230            .field_columns
2231            .push(scalar_plan.schema().field(1).name().clone());
2232        Ok(scalar_plan)
2233    }
2234
2235    /// Try to build a DataFusion Literal Expression from PromQL Expr, return
2236    /// `None` if the input is not a literal expression.
2237    fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2238        match expr {
2239            PromExpr::NumberLiteral(NumberLiteral { val }) => {
2240                let scalar_value = ScalarValue::Float64(Some(*val));
2241                Some(DfExpr::Literal(scalar_value))
2242            }
2243            PromExpr::StringLiteral(StringLiteral { val }) => {
2244                let scalar_value = ScalarValue::Utf8(Some(val.to_string()));
2245                Some(DfExpr::Literal(scalar_value))
2246            }
2247            PromExpr::VectorSelector(_)
2248            | PromExpr::MatrixSelector(_)
2249            | PromExpr::Extension(_)
2250            | PromExpr::Aggregate(_)
2251            | PromExpr::Subquery(_) => None,
2252            PromExpr::Call(Call { func, .. }) => {
2253                if func.name == SPECIAL_TIME_FUNCTION {
2254                    Some(build_special_time_expr(SPECIAL_TIME_FUNCTION))
2255                } else {
2256                    None
2257                }
2258            }
2259            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2260            // TODO(ruihang): support Unary operator
2261            PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2262            PromExpr::Binary(PromBinaryExpr {
2263                lhs,
2264                rhs,
2265                op,
2266                modifier,
2267            }) => {
2268                let lhs = Self::try_build_literal_expr(lhs)?;
2269                let rhs = Self::try_build_literal_expr(rhs)?;
2270                let is_comparison_op = Self::is_token_a_comparison_op(*op);
2271                let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2272                let expr = expr_builder(lhs, rhs).ok()?;
2273
2274                let should_return_bool = if let Some(m) = modifier {
2275                    m.return_bool
2276                } else {
2277                    false
2278                };
2279                if is_comparison_op && should_return_bool {
2280                    Some(DfExpr::Cast(Cast {
2281                        expr: Box::new(expr),
2282                        data_type: ArrowDataType::Float64,
2283                    }))
2284                } else {
2285                    Some(expr)
2286                }
2287            }
2288        }
2289    }
2290
2291    fn try_build_special_time_expr(expr: &PromExpr, time_index_col: &str) -> Option<DfExpr> {
2292        match expr {
2293            PromExpr::Call(Call { func, .. }) => {
2294                if func.name == SPECIAL_TIME_FUNCTION {
2295                    Some(build_special_time_expr(time_index_col))
2296                } else {
2297                    None
2298                }
2299            }
2300            _ => None,
2301        }
2302    }
2303
2304    /// Try to build a [f64] from [PromExpr].
2305    fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2306        match expr {
2307            PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2308            PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2309            PromExpr::Unary(UnaryExpr { expr, .. }) => {
2310                Self::try_build_float_literal(expr).map(|f| -f)
2311            }
2312            PromExpr::StringLiteral(_)
2313            | PromExpr::Binary(_)
2314            | PromExpr::VectorSelector(_)
2315            | PromExpr::MatrixSelector(_)
2316            | PromExpr::Call(_)
2317            | PromExpr::Extension(_)
2318            | PromExpr::Aggregate(_)
2319            | PromExpr::Subquery(_) => None,
2320        }
2321    }
2322
2323    /// Return a lambda to build binary expression from token.
2324    /// Because some binary operator are function in DataFusion like `atan2` or `^`.
2325    #[allow(clippy::type_complexity)]
2326    fn prom_token_to_binary_expr_builder(
2327        token: TokenType,
2328    ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2329        match token.id() {
2330            token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2331            token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2332            token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2333            token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2334            token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2335            token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2336            token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2337            token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2338            token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2339            token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2340            token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2341            token::T_POW => Ok(Box::new(|lhs, rhs| {
2342                Ok(DfExpr::ScalarFunction(ScalarFunction {
2343                    func: datafusion_functions::math::power(),
2344                    args: vec![lhs, rhs],
2345                }))
2346            })),
2347            token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2348                Ok(DfExpr::ScalarFunction(ScalarFunction {
2349                    func: datafusion_functions::math::atan2(),
2350                    args: vec![lhs, rhs],
2351                }))
2352            })),
2353            _ => UnexpectedTokenSnafu { token }.fail(),
2354        }
2355    }
2356
2357    /// Check if the given op is a [comparison operator](https://prometheus.io/docs/prometheus/latest/querying/operators/#comparison-binary-operators).
2358    fn is_token_a_comparison_op(token: TokenType) -> bool {
2359        matches!(
2360            token.id(),
2361            token::T_EQLC
2362                | token::T_NEQ
2363                | token::T_GTR
2364                | token::T_LSS
2365                | token::T_GTE
2366                | token::T_LTE
2367        )
2368    }
2369
2370    /// Check if the given op is a set operator (UNION, INTERSECT and EXCEPT in SQL).
2371    fn is_token_a_set_op(token: TokenType) -> bool {
2372        matches!(
2373            token.id(),
2374            token::T_LAND // INTERSECT
2375                | token::T_LOR // UNION
2376                | token::T_LUNLESS // EXCEPT
2377        )
2378    }
2379
2380    /// Build a inner join on time index column and tag columns to concat two logical plans.
2381    /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
2382    #[allow(clippy::too_many_arguments)]
2383    fn join_on_non_field_columns(
2384        &self,
2385        left: LogicalPlan,
2386        right: LogicalPlan,
2387        left_table_ref: TableReference,
2388        right_table_ref: TableReference,
2389        left_time_index_column: Option<String>,
2390        right_time_index_column: Option<String>,
2391        only_join_time_index: bool,
2392        modifier: &Option<BinModifier>,
2393    ) -> Result<LogicalPlan> {
2394        let mut left_tag_columns = if only_join_time_index {
2395            BTreeSet::new()
2396        } else {
2397            self.ctx
2398                .tag_columns
2399                .iter()
2400                .cloned()
2401                .collect::<BTreeSet<_>>()
2402        };
2403        let mut right_tag_columns = left_tag_columns.clone();
2404
2405        // apply modifier
2406        if let Some(modifier) = modifier {
2407            // apply label modifier
2408            if let Some(matching) = &modifier.matching {
2409                match matching {
2410                    // keeps columns mentioned in `on`
2411                    LabelModifier::Include(on) => {
2412                        let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2413                        left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2414                        right_tag_columns =
2415                            right_tag_columns.intersection(&mask).cloned().collect();
2416                    }
2417                    // removes columns memtioned in `ignoring`
2418                    LabelModifier::Exclude(ignoring) => {
2419                        // doesn't check existence of label
2420                        for label in &ignoring.labels {
2421                            let _ = left_tag_columns.remove(label);
2422                            let _ = right_tag_columns.remove(label);
2423                        }
2424                    }
2425                }
2426            }
2427        }
2428
2429        // push time index column if it exists
2430        if let (Some(left_time_index_column), Some(right_time_index_column)) =
2431            (left_time_index_column, right_time_index_column)
2432        {
2433            left_tag_columns.insert(left_time_index_column);
2434            right_tag_columns.insert(right_time_index_column);
2435        }
2436
2437        let right = LogicalPlanBuilder::from(right)
2438            .alias(right_table_ref)
2439            .context(DataFusionPlanningSnafu)?
2440            .build()
2441            .context(DataFusionPlanningSnafu)?;
2442
2443        // Inner Join on time index column to concat two operator
2444        LogicalPlanBuilder::from(left)
2445            .alias(left_table_ref)
2446            .context(DataFusionPlanningSnafu)?
2447            .join(
2448                right,
2449                JoinType::Inner,
2450                (
2451                    left_tag_columns
2452                        .into_iter()
2453                        .map(Column::from_name)
2454                        .collect::<Vec<_>>(),
2455                    right_tag_columns
2456                        .into_iter()
2457                        .map(Column::from_name)
2458                        .collect::<Vec<_>>(),
2459                ),
2460                None,
2461            )
2462            .context(DataFusionPlanningSnafu)?
2463            .build()
2464            .context(DataFusionPlanningSnafu)
2465    }
2466
2467    /// Build a set operator (AND/OR/UNLESS)
2468    fn set_op_on_non_field_columns(
2469        &mut self,
2470        left: LogicalPlan,
2471        mut right: LogicalPlan,
2472        left_context: PromPlannerContext,
2473        right_context: PromPlannerContext,
2474        op: TokenType,
2475        modifier: &Option<BinModifier>,
2476    ) -> Result<LogicalPlan> {
2477        let mut left_tag_col_set = left_context
2478            .tag_columns
2479            .iter()
2480            .cloned()
2481            .collect::<HashSet<_>>();
2482        let mut right_tag_col_set = right_context
2483            .tag_columns
2484            .iter()
2485            .cloned()
2486            .collect::<HashSet<_>>();
2487
2488        if matches!(op.id(), token::T_LOR) {
2489            return self.or_operator(
2490                left,
2491                right,
2492                left_tag_col_set,
2493                right_tag_col_set,
2494                left_context,
2495                right_context,
2496                modifier,
2497            );
2498        }
2499
2500        // apply modifier
2501        if let Some(modifier) = modifier {
2502            // one-to-many and many-to-one are not supported
2503            ensure!(
2504                matches!(
2505                    modifier.card,
2506                    VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2507                ),
2508                UnsupportedVectorMatchSnafu {
2509                    name: modifier.card.clone(),
2510                },
2511            );
2512            // apply label modifier
2513            if let Some(matching) = &modifier.matching {
2514                match matching {
2515                    // keeps columns mentioned in `on`
2516                    LabelModifier::Include(on) => {
2517                        let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2518                        left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2519                        right_tag_col_set =
2520                            right_tag_col_set.intersection(&mask).cloned().collect();
2521                    }
2522                    // removes columns memtioned in `ignoring`
2523                    LabelModifier::Exclude(ignoring) => {
2524                        // doesn't check existence of label
2525                        for label in &ignoring.labels {
2526                            let _ = left_tag_col_set.remove(label);
2527                            let _ = right_tag_col_set.remove(label);
2528                        }
2529                    }
2530                }
2531            }
2532        }
2533        // ensure two sides have the same tag columns
2534        if !matches!(op.id(), token::T_LOR) {
2535            ensure!(
2536                left_tag_col_set == right_tag_col_set,
2537                CombineTableColumnMismatchSnafu {
2538                    left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2539                    right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2540                }
2541            )
2542        };
2543        let left_time_index = left_context.time_index_column.clone().unwrap();
2544        let right_time_index = right_context.time_index_column.clone().unwrap();
2545        let join_keys = left_tag_col_set
2546            .iter()
2547            .cloned()
2548            .chain([left_time_index.clone()])
2549            .collect::<Vec<_>>();
2550        self.ctx.time_index_column = Some(left_time_index.clone());
2551
2552        // alias right time index column if necessary
2553        if left_context.time_index_column != right_context.time_index_column {
2554            let right_project_exprs = right
2555                .schema()
2556                .fields()
2557                .iter()
2558                .map(|field| {
2559                    if field.name() == &right_time_index {
2560                        DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
2561                    } else {
2562                        DfExpr::Column(Column::from_name(field.name()))
2563                    }
2564                })
2565                .collect::<Vec<_>>();
2566
2567            right = LogicalPlanBuilder::from(right)
2568                .project(right_project_exprs)
2569                .context(DataFusionPlanningSnafu)?
2570                .build()
2571                .context(DataFusionPlanningSnafu)?;
2572        }
2573
2574        ensure!(
2575            left_context.field_columns.len() == 1,
2576            MultiFieldsNotSupportedSnafu {
2577                operator: "AND operator"
2578            }
2579        );
2580        // Update the field column in context.
2581        // The AND/UNLESS operator only keep the field column in left input.
2582        let left_field_col = left_context.field_columns.first().unwrap();
2583        self.ctx.field_columns = vec![left_field_col.clone()];
2584
2585        // Generate join plan.
2586        // All set operations in PromQL are "distinct"
2587        match op.id() {
2588            token::T_LAND => LogicalPlanBuilder::from(left)
2589                .distinct()
2590                .context(DataFusionPlanningSnafu)?
2591                .join_detailed(
2592                    right,
2593                    JoinType::LeftSemi,
2594                    (join_keys.clone(), join_keys),
2595                    None,
2596                    true,
2597                )
2598                .context(DataFusionPlanningSnafu)?
2599                .build()
2600                .context(DataFusionPlanningSnafu),
2601            token::T_LUNLESS => LogicalPlanBuilder::from(left)
2602                .distinct()
2603                .context(DataFusionPlanningSnafu)?
2604                .join_detailed(
2605                    right,
2606                    JoinType::LeftAnti,
2607                    (join_keys.clone(), join_keys),
2608                    None,
2609                    true,
2610                )
2611                .context(DataFusionPlanningSnafu)?
2612                .build()
2613                .context(DataFusionPlanningSnafu),
2614            token::T_LOR => {
2615                // OR is handled at the beginning of this function, as it cannot
2616                // be expressed using JOIN like AND and UNLESS.
2617                unreachable!()
2618            }
2619            _ => UnexpectedTokenSnafu { token: op }.fail(),
2620        }
2621    }
2622
2623    // TODO(ruihang): change function name
2624    #[allow(clippy::too_many_arguments)]
2625    fn or_operator(
2626        &mut self,
2627        left: LogicalPlan,
2628        right: LogicalPlan,
2629        left_tag_cols_set: HashSet<String>,
2630        right_tag_cols_set: HashSet<String>,
2631        left_context: PromPlannerContext,
2632        right_context: PromPlannerContext,
2633        modifier: &Option<BinModifier>,
2634    ) -> Result<LogicalPlan> {
2635        // checks
2636        ensure!(
2637            left_context.field_columns.len() == right_context.field_columns.len(),
2638            CombineTableColumnMismatchSnafu {
2639                left: left_context.field_columns.clone(),
2640                right: right_context.field_columns.clone()
2641            }
2642        );
2643        ensure!(
2644            left_context.field_columns.len() == 1,
2645            MultiFieldsNotSupportedSnafu {
2646                operator: "OR operator"
2647            }
2648        );
2649
2650        // prepare hash sets
2651        let all_tags = left_tag_cols_set
2652            .union(&right_tag_cols_set)
2653            .cloned()
2654            .collect::<HashSet<_>>();
2655        let tags_not_in_left = all_tags
2656            .difference(&left_tag_cols_set)
2657            .cloned()
2658            .collect::<Vec<_>>();
2659        let tags_not_in_right = all_tags
2660            .difference(&right_tag_cols_set)
2661            .cloned()
2662            .collect::<Vec<_>>();
2663        let left_qualifier = left.schema().qualified_field(0).0.cloned();
2664        let right_qualifier = right.schema().qualified_field(0).0.cloned();
2665        let left_qualifier_string = left_qualifier
2666            .as_ref()
2667            .map(|l| l.to_string())
2668            .unwrap_or_default();
2669        let right_qualifier_string = right_qualifier
2670            .as_ref()
2671            .map(|r| r.to_string())
2672            .unwrap_or_default();
2673        let left_time_index_column =
2674            left_context
2675                .time_index_column
2676                .clone()
2677                .with_context(|| TimeIndexNotFoundSnafu {
2678                    table: left_qualifier_string.clone(),
2679                })?;
2680        let right_time_index_column =
2681            right_context
2682                .time_index_column
2683                .clone()
2684                .with_context(|| TimeIndexNotFoundSnafu {
2685                    table: right_qualifier_string.clone(),
2686                })?;
2687        // Take the name of first field column. The length is checked above.
2688        let left_field_col = left_context.field_columns.first().unwrap();
2689        let right_field_col = right_context.field_columns.first().unwrap();
2690
2691        // step 0: fill all columns in output schema
2692        let mut all_columns_set = left
2693            .schema()
2694            .fields()
2695            .iter()
2696            .chain(right.schema().fields().iter())
2697            .map(|field| field.name().clone())
2698            .collect::<HashSet<_>>();
2699        // remove time index column
2700        all_columns_set.remove(&left_time_index_column);
2701        all_columns_set.remove(&right_time_index_column);
2702        // remove field column in the right
2703        if left_field_col != right_field_col {
2704            all_columns_set.remove(right_field_col);
2705        }
2706        let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
2707        // sort to ensure the generated schema is not volatile
2708        all_columns.sort_unstable();
2709        // use left time index column name as the result time index column name
2710        all_columns.insert(0, left_time_index_column.clone());
2711
2712        // step 1: align schema using project, fill non-exist columns with null
2713        let left_proj_exprs = all_columns.iter().map(|col| {
2714            if tags_not_in_left.contains(col) {
2715                DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
2716            } else {
2717                DfExpr::Column(Column::new(None::<String>, col))
2718            }
2719        });
2720        let right_time_index_expr = DfExpr::Column(Column::new(
2721            right_qualifier.clone(),
2722            right_time_index_column,
2723        ))
2724        .alias(left_time_index_column.clone());
2725        // The field column in right side may not have qualifier (it may be removed by join operation),
2726        // so we need to find it from the schema.
2727        let right_qualifier_for_field = right
2728            .schema()
2729            .iter()
2730            .find(|(_, f)| f.name() == right_field_col)
2731            .map(|(q, _)| q)
2732            .context(ColumnNotFoundSnafu {
2733                col: right_field_col.to_string(),
2734            })?
2735            .cloned();
2736        // `skip(1)` to skip the time index column
2737        let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
2738            // expr
2739            if col == left_field_col && left_field_col != right_field_col {
2740                // alias field in right side if necessary to handle different field name
2741                DfExpr::Column(Column::new(
2742                    right_qualifier_for_field.clone(),
2743                    right_field_col,
2744                ))
2745            } else if tags_not_in_right.contains(col) {
2746                DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
2747            } else {
2748                DfExpr::Column(Column::new(None::<String>, col))
2749            }
2750        });
2751        let right_proj_exprs = [right_time_index_expr]
2752            .into_iter()
2753            .chain(right_proj_exprs_without_time_index);
2754
2755        let left_projected = LogicalPlanBuilder::from(left)
2756            .project(left_proj_exprs)
2757            .context(DataFusionPlanningSnafu)?
2758            .alias(left_qualifier_string.clone())
2759            .context(DataFusionPlanningSnafu)?
2760            .build()
2761            .context(DataFusionPlanningSnafu)?;
2762        let right_projected = LogicalPlanBuilder::from(right)
2763            .project(right_proj_exprs)
2764            .context(DataFusionPlanningSnafu)?
2765            .alias(right_qualifier_string.clone())
2766            .context(DataFusionPlanningSnafu)?
2767            .build()
2768            .context(DataFusionPlanningSnafu)?;
2769
2770        // step 2: compute match columns
2771        let mut match_columns = if let Some(modifier) = modifier
2772            && let Some(matching) = &modifier.matching
2773        {
2774            match matching {
2775                // keeps columns mentioned in `on`
2776                LabelModifier::Include(on) => on.labels.clone(),
2777                // removes columns memtioned in `ignoring`
2778                LabelModifier::Exclude(ignoring) => {
2779                    let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
2780                    all_tags.difference(&ignoring).cloned().collect()
2781                }
2782            }
2783        } else {
2784            all_tags.iter().cloned().collect()
2785        };
2786        // sort to ensure the generated plan is not volatile
2787        match_columns.sort_unstable();
2788        // step 3: build `UnionDistinctOn` plan
2789        let schema = left_projected.schema().clone();
2790        let union_distinct_on = UnionDistinctOn::new(
2791            left_projected,
2792            right_projected,
2793            match_columns,
2794            left_time_index_column.clone(),
2795            schema,
2796        );
2797        let result = LogicalPlan::Extension(Extension {
2798            node: Arc::new(union_distinct_on),
2799        });
2800
2801        // step 4: update context
2802        self.ctx.time_index_column = Some(left_time_index_column);
2803        self.ctx.tag_columns = all_tags.into_iter().collect();
2804        self.ctx.field_columns = vec![left_field_col.to_string()];
2805
2806        Ok(result)
2807    }
2808
2809    /// Build a projection that project and perform operation expr for every value columns.
2810    /// Non-value columns (tag and timestamp) will be preserved in the projection.
2811    ///
2812    /// # Side effect
2813    ///
2814    /// This function will update the value columns in the context. Those new column names
2815    /// don't contains qualifier.
2816    fn projection_for_each_field_column<F>(
2817        &mut self,
2818        input: LogicalPlan,
2819        name_to_expr: F,
2820    ) -> Result<LogicalPlan>
2821    where
2822        F: FnMut(&String) -> Result<DfExpr>,
2823    {
2824        let non_field_columns_iter = self
2825            .ctx
2826            .tag_columns
2827            .iter()
2828            .chain(self.ctx.time_index_column.iter())
2829            .map(|col| {
2830                Ok(DfExpr::Column(Column::new(
2831                    self.ctx.table_name.clone().map(TableReference::bare),
2832                    col,
2833                )))
2834            });
2835
2836        // build computation exprs
2837        let result_field_columns = self
2838            .ctx
2839            .field_columns
2840            .iter()
2841            .map(name_to_expr)
2842            .collect::<Result<Vec<_>>>()?;
2843
2844        // alias the computation exprs to remove qualifier
2845        self.ctx.field_columns = result_field_columns
2846            .iter()
2847            .map(|expr| expr.schema_name().to_string())
2848            .collect();
2849        let field_columns_iter = result_field_columns
2850            .into_iter()
2851            .zip(self.ctx.field_columns.iter())
2852            .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
2853
2854        // chain non-field columns (unchanged) and field columns (applied computation then alias)
2855        let project_fields = non_field_columns_iter
2856            .chain(field_columns_iter)
2857            .collect::<Result<Vec<_>>>()?;
2858
2859        LogicalPlanBuilder::from(input)
2860            .project(project_fields)
2861            .context(DataFusionPlanningSnafu)?
2862            .build()
2863            .context(DataFusionPlanningSnafu)
2864    }
2865
2866    /// Build a filter plan that filter on value column. Notice that only one value column
2867    /// is expected.
2868    fn filter_on_field_column<F>(
2869        &self,
2870        input: LogicalPlan,
2871        mut name_to_expr: F,
2872    ) -> Result<LogicalPlan>
2873    where
2874        F: FnMut(&String) -> Result<DfExpr>,
2875    {
2876        ensure!(
2877            self.ctx.field_columns.len() == 1,
2878            UnsupportedExprSnafu {
2879                name: "filter on multi-value input"
2880            }
2881        );
2882
2883        let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
2884
2885        LogicalPlanBuilder::from(input)
2886            .filter(field_column_filter)
2887            .context(DataFusionPlanningSnafu)?
2888            .build()
2889            .context(DataFusionPlanningSnafu)
2890    }
2891
2892    /// Generate an expr like `date_part("hour", <TIME_INDEX>)`. Caller should ensure the
2893    /// time index column in context is set
2894    fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
2895        let lit_expr = DfExpr::Literal(ScalarValue::Utf8(Some(date_part.to_string())));
2896        let input_expr = datafusion::logical_expr::col(
2897            self.ctx
2898                .time_index_column
2899                .as_ref()
2900                // table name doesn't matters here
2901                .with_context(|| TimeIndexNotFoundSnafu {
2902                    table: "<doesn't matter>",
2903                })?,
2904        );
2905        let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2906            func: datafusion_functions::datetime::date_part(),
2907            args: vec![lit_expr, input_expr],
2908        });
2909        Ok(fn_expr)
2910    }
2911}
2912
2913#[derive(Default, Debug)]
2914struct FunctionArgs {
2915    input: Option<PromExpr>,
2916    literals: Vec<DfExpr>,
2917}
2918
2919#[derive(Debug, Clone)]
2920enum ScalarFunc {
2921    DataFusionBuiltin(Arc<ScalarUdfDef>),
2922    /// The UDF that is defined by Datafusion itself.
2923    DataFusionUdf(Arc<ScalarUdfDef>),
2924    Udf(Arc<ScalarUdfDef>),
2925    // todo(ruihang): maybe merge with Udf later
2926    /// UDF that require extra information like range length to be evaluated.
2927    /// The second argument is range length.
2928    ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
2929    /// Func that doesn't require input, like `time()`.
2930    GeneratedExpr,
2931}
2932
2933#[cfg(test)]
2934mod test {
2935    use std::time::{Duration, UNIX_EPOCH};
2936
2937    use catalog::memory::MemoryCatalogManager;
2938    use catalog::RegisterTableRequest;
2939    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
2940    use common_query::test_util::DummyDecoder;
2941    use datafusion::execution::SessionStateBuilder;
2942    use datatypes::prelude::ConcreteDataType;
2943    use datatypes::schema::{ColumnSchema, Schema};
2944    use promql_parser::label::Labels;
2945    use promql_parser::parser;
2946    use session::context::QueryContext;
2947    use table::metadata::{TableInfoBuilder, TableMetaBuilder};
2948    use table::test_util::EmptyTable;
2949
2950    use super::*;
2951
2952    fn build_session_state() -> SessionState {
2953        SessionStateBuilder::new().with_default_features().build()
2954    }
2955
2956    async fn build_test_table_provider(
2957        table_name_tuples: &[(String, String)],
2958        num_tag: usize,
2959        num_field: usize,
2960    ) -> DfTableSourceProvider {
2961        let catalog_list = MemoryCatalogManager::with_default_setup();
2962        for (schema_name, table_name) in table_name_tuples {
2963            let mut columns = vec![];
2964            for i in 0..num_tag {
2965                columns.push(ColumnSchema::new(
2966                    format!("tag_{i}"),
2967                    ConcreteDataType::string_datatype(),
2968                    false,
2969                ));
2970            }
2971            columns.push(
2972                ColumnSchema::new(
2973                    "timestamp".to_string(),
2974                    ConcreteDataType::timestamp_millisecond_datatype(),
2975                    false,
2976                )
2977                .with_time_index(true),
2978            );
2979            for i in 0..num_field {
2980                columns.push(ColumnSchema::new(
2981                    format!("field_{i}"),
2982                    ConcreteDataType::float64_datatype(),
2983                    true,
2984                ));
2985            }
2986            let schema = Arc::new(Schema::new(columns));
2987            let table_meta = TableMetaBuilder::empty()
2988                .schema(schema)
2989                .primary_key_indices((0..num_tag).collect())
2990                .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
2991                .next_column_id(1024)
2992                .build()
2993                .unwrap();
2994            let table_info = TableInfoBuilder::default()
2995                .name(table_name.to_string())
2996                .meta(table_meta)
2997                .build()
2998                .unwrap();
2999            let table = EmptyTable::from_table_info(&table_info);
3000
3001            assert!(catalog_list
3002                .register_table_sync(RegisterTableRequest {
3003                    catalog: DEFAULT_CATALOG_NAME.to_string(),
3004                    schema: schema_name.to_string(),
3005                    table_name: table_name.to_string(),
3006                    table_id: 1024,
3007                    table,
3008                })
3009                .is_ok());
3010        }
3011
3012        DfTableSourceProvider::new(
3013            catalog_list,
3014            false,
3015            QueryContext::arc(),
3016            DummyDecoder::arc(),
3017            false,
3018        )
3019    }
3020
3021    async fn build_test_table_provider_with_fields(
3022        table_name_tuples: &[(String, String)],
3023        tags: &[&str],
3024    ) -> DfTableSourceProvider {
3025        let catalog_list = MemoryCatalogManager::with_default_setup();
3026        for (schema_name, table_name) in table_name_tuples {
3027            let mut columns = vec![];
3028            let num_tag = tags.len();
3029            for tag in tags {
3030                columns.push(ColumnSchema::new(
3031                    tag.to_string(),
3032                    ConcreteDataType::string_datatype(),
3033                    false,
3034                ));
3035            }
3036            columns.push(
3037                ColumnSchema::new(
3038                    "greptime_timestamp".to_string(),
3039                    ConcreteDataType::timestamp_millisecond_datatype(),
3040                    false,
3041                )
3042                .with_time_index(true),
3043            );
3044            columns.push(ColumnSchema::new(
3045                "greptime_value".to_string(),
3046                ConcreteDataType::float64_datatype(),
3047                true,
3048            ));
3049            let schema = Arc::new(Schema::new(columns));
3050            let table_meta = TableMetaBuilder::empty()
3051                .schema(schema)
3052                .primary_key_indices((0..num_tag).collect())
3053                .next_column_id(1024)
3054                .build()
3055                .unwrap();
3056            let table_info = TableInfoBuilder::default()
3057                .name(table_name.to_string())
3058                .meta(table_meta)
3059                .build()
3060                .unwrap();
3061            let table = EmptyTable::from_table_info(&table_info);
3062
3063            assert!(catalog_list
3064                .register_table_sync(RegisterTableRequest {
3065                    catalog: DEFAULT_CATALOG_NAME.to_string(),
3066                    schema: schema_name.to_string(),
3067                    table_name: table_name.to_string(),
3068                    table_id: 1024,
3069                    table,
3070                })
3071                .is_ok());
3072        }
3073
3074        DfTableSourceProvider::new(
3075            catalog_list,
3076            false,
3077            QueryContext::arc(),
3078            DummyDecoder::arc(),
3079            false,
3080        )
3081    }
3082
3083    // {
3084    //     input: `abs(some_metric{foo!="bar"})`,
3085    //     expected: &Call{
3086    //         Func: MustGetFunction("abs"),
3087    //         Args: Expressions{
3088    //             &VectorSelector{
3089    //                 Name: "some_metric",
3090    //                 LabelMatchers: []*labels.Matcher{
3091    //                     MustLabelMatcher(labels.MatchNotEqual, "foo", "bar"),
3092    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3093    //                 },
3094    //             },
3095    //         },
3096    //     },
3097    // },
3098    async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3099        let prom_expr =
3100            parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3101        let eval_stmt = EvalStmt {
3102            expr: prom_expr,
3103            start: UNIX_EPOCH,
3104            end: UNIX_EPOCH
3105                .checked_add(Duration::from_secs(100_000))
3106                .unwrap(),
3107            interval: Duration::from_secs(5),
3108            lookback_delta: Duration::from_secs(1),
3109        };
3110
3111        let table_provider = build_test_table_provider(
3112            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3113            1,
3114            1,
3115        )
3116        .await;
3117        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3118            .await
3119            .unwrap();
3120
3121        let expected = String::from(
3122            "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3123            \n  Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3124            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3125            \n      PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3126            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3127            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3128            \n            TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3129        ).replace("TEMPLATE", plan_name);
3130
3131        assert_eq!(plan.display_indent_schema().to_string(), expected);
3132    }
3133
3134    #[tokio::test]
3135    async fn single_abs() {
3136        do_single_instant_function_call("abs", "abs").await;
3137    }
3138
3139    #[tokio::test]
3140    #[should_panic]
3141    async fn single_absent() {
3142        do_single_instant_function_call("absent", "").await;
3143    }
3144
3145    #[tokio::test]
3146    async fn single_ceil() {
3147        do_single_instant_function_call("ceil", "ceil").await;
3148    }
3149
3150    #[tokio::test]
3151    async fn single_exp() {
3152        do_single_instant_function_call("exp", "exp").await;
3153    }
3154
3155    #[tokio::test]
3156    async fn single_ln() {
3157        do_single_instant_function_call("ln", "ln").await;
3158    }
3159
3160    #[tokio::test]
3161    async fn single_log2() {
3162        do_single_instant_function_call("log2", "log2").await;
3163    }
3164
3165    #[tokio::test]
3166    async fn single_log10() {
3167        do_single_instant_function_call("log10", "log10").await;
3168    }
3169
3170    #[tokio::test]
3171    #[should_panic]
3172    async fn single_scalar() {
3173        do_single_instant_function_call("scalar", "").await;
3174    }
3175
3176    #[tokio::test]
3177    #[should_panic]
3178    async fn single_sgn() {
3179        do_single_instant_function_call("sgn", "").await;
3180    }
3181
3182    #[tokio::test]
3183    #[should_panic]
3184    async fn single_sort() {
3185        do_single_instant_function_call("sort", "").await;
3186    }
3187
3188    #[tokio::test]
3189    #[should_panic]
3190    async fn single_sort_desc() {
3191        do_single_instant_function_call("sort_desc", "").await;
3192    }
3193
3194    #[tokio::test]
3195    async fn single_sqrt() {
3196        do_single_instant_function_call("sqrt", "sqrt").await;
3197    }
3198
3199    #[tokio::test]
3200    #[should_panic]
3201    async fn single_timestamp() {
3202        do_single_instant_function_call("timestamp", "").await;
3203    }
3204
3205    #[tokio::test]
3206    async fn single_acos() {
3207        do_single_instant_function_call("acos", "acos").await;
3208    }
3209
3210    #[tokio::test]
3211    #[should_panic]
3212    async fn single_acosh() {
3213        do_single_instant_function_call("acosh", "").await;
3214    }
3215
3216    #[tokio::test]
3217    async fn single_asin() {
3218        do_single_instant_function_call("asin", "asin").await;
3219    }
3220
3221    #[tokio::test]
3222    #[should_panic]
3223    async fn single_asinh() {
3224        do_single_instant_function_call("asinh", "").await;
3225    }
3226
3227    #[tokio::test]
3228    async fn single_atan() {
3229        do_single_instant_function_call("atan", "atan").await;
3230    }
3231
3232    #[tokio::test]
3233    #[should_panic]
3234    async fn single_atanh() {
3235        do_single_instant_function_call("atanh", "").await;
3236    }
3237
3238    #[tokio::test]
3239    async fn single_cos() {
3240        do_single_instant_function_call("cos", "cos").await;
3241    }
3242
3243    #[tokio::test]
3244    #[should_panic]
3245    async fn single_cosh() {
3246        do_single_instant_function_call("cosh", "").await;
3247    }
3248
3249    #[tokio::test]
3250    async fn single_sin() {
3251        do_single_instant_function_call("sin", "sin").await;
3252    }
3253
3254    #[tokio::test]
3255    #[should_panic]
3256    async fn single_sinh() {
3257        do_single_instant_function_call("sinh", "").await;
3258    }
3259
3260    #[tokio::test]
3261    async fn single_tan() {
3262        do_single_instant_function_call("tan", "tan").await;
3263    }
3264
3265    #[tokio::test]
3266    #[should_panic]
3267    async fn single_tanh() {
3268        do_single_instant_function_call("tanh", "").await;
3269    }
3270
3271    #[tokio::test]
3272    #[should_panic]
3273    async fn single_deg() {
3274        do_single_instant_function_call("deg", "").await;
3275    }
3276
3277    #[tokio::test]
3278    #[should_panic]
3279    async fn single_rad() {
3280        do_single_instant_function_call("rad", "").await;
3281    }
3282
3283    // {
3284    //     input: "avg by (foo)(some_metric)",
3285    //     expected: &AggregateExpr{
3286    //         Op: AVG,
3287    //         Expr: &VectorSelector{
3288    //             Name: "some_metric",
3289    //             LabelMatchers: []*labels.Matcher{
3290    //                 MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3291    //             },
3292    //             PosRange: PositionRange{
3293    //                 Start: 13,
3294    //                 End:   24,
3295    //             },
3296    //         },
3297    //         Grouping: []string{"foo"},
3298    //         PosRange: PositionRange{
3299    //             Start: 0,
3300    //             End:   25,
3301    //         },
3302    //     },
3303    // },
3304    async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3305        let prom_expr = parser::parse(&format!(
3306            "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3307        ))
3308        .unwrap();
3309        let mut eval_stmt = EvalStmt {
3310            expr: prom_expr,
3311            start: UNIX_EPOCH,
3312            end: UNIX_EPOCH
3313                .checked_add(Duration::from_secs(100_000))
3314                .unwrap(),
3315            interval: Duration::from_secs(5),
3316            lookback_delta: Duration::from_secs(1),
3317        };
3318
3319        // test group by
3320        let table_provider = build_test_table_provider(
3321            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3322            2,
3323            2,
3324        )
3325        .await;
3326        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3327            .await
3328            .unwrap();
3329        let expected_no_without = String::from(
3330            "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3331            \n  Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3332            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3333            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3334            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3335            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3336            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3337        ).replace("TEMPLATE", plan_name);
3338        assert_eq!(
3339            plan.display_indent_schema().to_string(),
3340            expected_no_without
3341        );
3342
3343        // test group without
3344        if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3345            *modifier = Some(LabelModifier::Exclude(Labels {
3346                labels: vec![String::from("tag_1")].into_iter().collect(),
3347            }));
3348        }
3349        let table_provider = build_test_table_provider(
3350            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3351            2,
3352            2,
3353        )
3354        .await;
3355        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3356            .await
3357            .unwrap();
3358        let expected_without = String::from(
3359            "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3360            \n  Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3361            \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3362            \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3363            \n        Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3364            \n          Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3365            \n            TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3366        ).replace("TEMPLATE", plan_name);
3367        assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3368    }
3369
3370    #[tokio::test]
3371    async fn aggregate_sum() {
3372        do_aggregate_expr_plan("sum", "sum").await;
3373    }
3374
3375    #[tokio::test]
3376    async fn aggregate_avg() {
3377        do_aggregate_expr_plan("avg", "avg").await;
3378    }
3379
3380    #[tokio::test]
3381    #[should_panic] // output type doesn't match
3382    async fn aggregate_count() {
3383        do_aggregate_expr_plan("count", "count").await;
3384    }
3385
3386    #[tokio::test]
3387    async fn aggregate_min() {
3388        do_aggregate_expr_plan("min", "min").await;
3389    }
3390
3391    #[tokio::test]
3392    async fn aggregate_max() {
3393        do_aggregate_expr_plan("max", "max").await;
3394    }
3395
3396    #[tokio::test]
3397    #[should_panic] // output type doesn't match
3398    async fn aggregate_group() {
3399        do_aggregate_expr_plan("grouping", "GROUPING").await;
3400    }
3401
3402    #[tokio::test]
3403    async fn aggregate_stddev() {
3404        do_aggregate_expr_plan("stddev", "stddev_pop").await;
3405    }
3406
3407    #[tokio::test]
3408    async fn aggregate_stdvar() {
3409        do_aggregate_expr_plan("stdvar", "var_pop").await;
3410    }
3411
3412    // TODO(ruihang): add range fn tests once exprs are ready.
3413
3414    // {
3415    //     input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}",
3416    //     expected: &BinaryExpr{
3417    //         Op: ADD,
3418    //         LHS: &VectorSelector{
3419    //             Name: "a",
3420    //             LabelMatchers: []*labels.Matcher{
3421    //                     MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"),
3422    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3423    //             },
3424    //         },
3425    //         RHS: &VectorSelector{
3426    //             Name: "sum",
3427    //             LabelMatchers: []*labels.Matcher{
3428    //                     MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"),
3429    //                     MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
3430    //             },
3431    //         },
3432    //         VectorMatching: &VectorMatching{},
3433    //     },
3434    // },
3435    #[tokio::test]
3436    async fn binary_op_column_column() {
3437        let prom_expr =
3438            parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3439        let eval_stmt = EvalStmt {
3440            expr: prom_expr,
3441            start: UNIX_EPOCH,
3442            end: UNIX_EPOCH
3443                .checked_add(Duration::from_secs(100_000))
3444                .unwrap(),
3445            interval: Duration::from_secs(5),
3446            lookback_delta: Duration::from_secs(1),
3447        };
3448
3449        let table_provider = build_test_table_provider(
3450            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3451            1,
3452            1,
3453        )
3454        .await;
3455        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3456            .await
3457            .unwrap();
3458
3459        let  expected = String::from(
3460            "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3461            \n  Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3462            \n    SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3463            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3464            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3465            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3466            \n            Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3467            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3468            \n    SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3469            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3470            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3471            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3472            \n            Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3473            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3474        );
3475
3476        assert_eq!(plan.display_indent_schema().to_string(), expected);
3477    }
3478
3479    async fn indie_query_plan_compare(query: &str, expected: String) {
3480        let prom_expr = parser::parse(query).unwrap();
3481        let eval_stmt = EvalStmt {
3482            expr: prom_expr,
3483            start: UNIX_EPOCH,
3484            end: UNIX_EPOCH
3485                .checked_add(Duration::from_secs(100_000))
3486                .unwrap(),
3487            interval: Duration::from_secs(5),
3488            lookback_delta: Duration::from_secs(1),
3489        };
3490
3491        let table_provider = build_test_table_provider(
3492            &[
3493                (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
3494                (
3495                    "greptime_private".to_string(),
3496                    "some_alt_metric".to_string(),
3497                ),
3498            ],
3499            1,
3500            1,
3501        )
3502        .await;
3503        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3504            .await
3505            .unwrap();
3506
3507        assert_eq!(plan.display_indent_schema().to_string(), expected);
3508    }
3509
3510    #[tokio::test]
3511    async fn binary_op_literal_column() {
3512        let query = r#"1 + some_metric{tag_0="bar"}"#;
3513        let expected = String::from(
3514            "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
3515            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3516            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3517            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3518            \n        Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3519            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3520        );
3521
3522        indie_query_plan_compare(query, expected).await;
3523    }
3524
3525    #[tokio::test]
3526    async fn binary_op_literal_literal() {
3527        let query = r#"1 + 1"#;
3528        let expected = String::from("EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]");
3529
3530        indie_query_plan_compare(query, expected).await;
3531    }
3532
3533    #[tokio::test]
3534    async fn simple_bool_grammar() {
3535        let query = "some_metric != bool 1.2345";
3536        let expected = String::from(
3537            "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
3538            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3539            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3540            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3541            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3542            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3543        );
3544
3545        indie_query_plan_compare(query, expected).await;
3546    }
3547
3548    #[tokio::test]
3549    async fn bool_with_additional_arithmetic() {
3550        let query = "some_metric + (1 == bool 2)";
3551        let expected = String::from(
3552            "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
3553            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3554            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3555            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3556            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3557            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3558        );
3559
3560        indie_query_plan_compare(query, expected).await;
3561    }
3562
3563    #[tokio::test]
3564    async fn simple_unary() {
3565        let query = "-some_metric";
3566        let expected = String::from(
3567            "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
3568            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3569            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3570            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3571            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3572            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3573        );
3574
3575        indie_query_plan_compare(query, expected).await;
3576    }
3577
3578    #[tokio::test]
3579    async fn increase_aggr() {
3580        let query = "increase(some_metric[5m])";
3581        let expected = String::from(
3582            "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
3583            \n  Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
3584            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
3585            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3586            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3587            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3588            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3589            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3590        );
3591
3592        indie_query_plan_compare(query, expected).await;
3593    }
3594
3595    #[tokio::test]
3596    async fn less_filter_on_value() {
3597        let query = "some_metric < 1.2345";
3598        let expected = String::from(
3599            "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3600            \n  PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3601            \n    PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3602            \n      Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3603            \n        Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3604            \n          TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3605        );
3606
3607        indie_query_plan_compare(query, expected).await;
3608    }
3609
3610    #[tokio::test]
3611    async fn count_over_time() {
3612        let query = "count_over_time(some_metric[5m])";
3613        let expected = String::from(
3614            "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
3615            \n  Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
3616            \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
3617            \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3618            \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3619            \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3620            \n            Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3621            \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3622        );
3623
3624        indie_query_plan_compare(query, expected).await;
3625    }
3626
3627    #[tokio::test]
3628    async fn test_hash_join() {
3629        let mut eval_stmt = EvalStmt {
3630            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3631            start: UNIX_EPOCH,
3632            end: UNIX_EPOCH
3633                .checked_add(Duration::from_secs(100_000))
3634                .unwrap(),
3635            interval: Duration::from_secs(5),
3636            lookback_delta: Duration::from_secs(1),
3637        };
3638
3639        let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
3640
3641        let prom_expr = parser::parse(case).unwrap();
3642        eval_stmt.expr = prom_expr;
3643        let table_provider = build_test_table_provider_with_fields(
3644            &[
3645                (
3646                    DEFAULT_SCHEMA_NAME.to_string(),
3647                    "http_server_requests_seconds_sum".to_string(),
3648                ),
3649                (
3650                    DEFAULT_SCHEMA_NAME.to_string(),
3651                    "http_server_requests_seconds_count".to_string(),
3652                ),
3653            ],
3654            &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
3655        )
3656        .await;
3657        // Should be ok
3658        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3659            .await
3660            .unwrap();
3661        let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
3662            \n  Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
3663            \n    SubqueryAlias: http_server_requests_seconds_sum\
3664            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
3665            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
3666            \n          Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
3667            \n            Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
3668            \n              TableScan: http_server_requests_seconds_sum\
3669            \n    SubqueryAlias: http_server_requests_seconds_count\
3670            \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
3671            \n        PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
3672            \n          Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
3673            \n            Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
3674            \n              TableScan: http_server_requests_seconds_count";
3675        assert_eq!(plan.to_string(), expected);
3676    }
3677
3678    #[tokio::test]
3679    async fn test_nested_histogram_quantile() {
3680        let mut eval_stmt = EvalStmt {
3681            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3682            start: UNIX_EPOCH,
3683            end: UNIX_EPOCH
3684                .checked_add(Duration::from_secs(100_000))
3685                .unwrap(),
3686            interval: Duration::from_secs(5),
3687            lookback_delta: Duration::from_secs(1),
3688        };
3689
3690        let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
3691
3692        let prom_expr = parser::parse(case).unwrap();
3693        eval_stmt.expr = prom_expr;
3694        let table_provider = build_test_table_provider_with_fields(
3695            &[(
3696                DEFAULT_SCHEMA_NAME.to_string(),
3697                "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
3698            )],
3699            &["pod", "le", "path", "code", "container"],
3700        )
3701        .await;
3702        // Should be ok
3703        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3704            .await
3705            .unwrap();
3706    }
3707
3708    #[tokio::test]
3709    async fn test_parse_and_operator() {
3710        let mut eval_stmt = EvalStmt {
3711            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3712            start: UNIX_EPOCH,
3713            end: UNIX_EPOCH
3714                .checked_add(Duration::from_secs(100_000))
3715                .unwrap(),
3716            interval: Duration::from_secs(5),
3717            lookback_delta: Duration::from_secs(1),
3718        };
3719
3720        let cases = [
3721            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
3722            r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
3723        ];
3724
3725        for case in cases {
3726            let prom_expr = parser::parse(case).unwrap();
3727            eval_stmt.expr = prom_expr;
3728            let table_provider = build_test_table_provider_with_fields(
3729                &[
3730                    (
3731                        DEFAULT_SCHEMA_NAME.to_string(),
3732                        "kubelet_volume_stats_used_bytes".to_string(),
3733                    ),
3734                    (
3735                        DEFAULT_SCHEMA_NAME.to_string(),
3736                        "kubelet_volume_stats_capacity_bytes".to_string(),
3737                    ),
3738                ],
3739                &["namespace", "persistentvolumeclaim"],
3740            )
3741            .await;
3742            // Should be ok
3743            let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3744                .await
3745                .unwrap();
3746        }
3747    }
3748
3749    #[tokio::test]
3750    async fn test_nested_binary_op() {
3751        let mut eval_stmt = EvalStmt {
3752            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3753            start: UNIX_EPOCH,
3754            end: UNIX_EPOCH
3755                .checked_add(Duration::from_secs(100_000))
3756                .unwrap(),
3757            interval: Duration::from_secs(5),
3758            lookback_delta: Duration::from_secs(1),
3759        };
3760
3761        let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
3762        (
3763            sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
3764            or
3765            vector(0)
3766        )"#;
3767
3768        let prom_expr = parser::parse(case).unwrap();
3769        eval_stmt.expr = prom_expr;
3770        let table_provider = build_test_table_provider_with_fields(
3771            &[(
3772                DEFAULT_SCHEMA_NAME.to_string(),
3773                "nginx_ingress_controller_requests".to_string(),
3774            )],
3775            &["namespace", "job"],
3776        )
3777        .await;
3778        // Should be ok
3779        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3780            .await
3781            .unwrap();
3782    }
3783
3784    #[tokio::test]
3785    async fn test_parse_or_operator() {
3786        let mut eval_stmt = EvalStmt {
3787            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3788            start: UNIX_EPOCH,
3789            end: UNIX_EPOCH
3790                .checked_add(Duration::from_secs(100_000))
3791                .unwrap(),
3792            interval: Duration::from_secs(5),
3793            lookback_delta: Duration::from_secs(1),
3794        };
3795
3796        let case = r#"
3797        sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) / 
3798        (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
3799            or
3800        200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) / 
3801        sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
3802
3803        let table_provider = build_test_table_provider_with_fields(
3804            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
3805            &["tenant_name", "cluster_name"],
3806        )
3807        .await;
3808        eval_stmt.expr = parser::parse(case).unwrap();
3809        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3810            .await
3811            .unwrap();
3812
3813        let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) / 
3814            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
3815            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) / 
3816            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0 
3817            or 
3818            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) / 
3819            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0 
3820            or 
3821            sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) / 
3822            (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
3823        let table_provider = build_test_table_provider_with_fields(
3824            &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
3825            &["tenant_name", "cluster_name"],
3826        )
3827        .await;
3828        eval_stmt.expr = parser::parse(case).unwrap();
3829        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3830            .await
3831            .unwrap();
3832
3833        let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) + 
3834            sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or 
3835            (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or 
3836            (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
3837        let table_provider = build_test_table_provider_with_fields(
3838            &[
3839                (
3840                    DEFAULT_SCHEMA_NAME.to_string(),
3841                    "background_waitevent_cnt".to_string(),
3842                ),
3843                (
3844                    DEFAULT_SCHEMA_NAME.to_string(),
3845                    "foreground_waitevent_cnt".to_string(),
3846                ),
3847            ],
3848            &["tenant_name", "cluster_name"],
3849        )
3850        .await;
3851        eval_stmt.expr = parser::parse(case).unwrap();
3852        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3853            .await
3854            .unwrap();
3855
3856        let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
3857        let table_provider = build_test_table_provider_with_fields(
3858            &[
3859                (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
3860                (
3861                    DEFAULT_SCHEMA_NAME.to_string(),
3862                    "container_cpu_load_average_10s".to_string(),
3863                ),
3864                (
3865                    DEFAULT_SCHEMA_NAME.to_string(),
3866                    "container_spec_cpu_quota".to_string(),
3867                ),
3868            ],
3869            &["cluster_name", "host_name"],
3870        )
3871        .await;
3872        eval_stmt.expr = parser::parse(case).unwrap();
3873        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3874            .await
3875            .unwrap();
3876    }
3877
3878    #[tokio::test]
3879    async fn value_matcher() {
3880        // template
3881        let mut eval_stmt = EvalStmt {
3882            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
3883            start: UNIX_EPOCH,
3884            end: UNIX_EPOCH
3885                .checked_add(Duration::from_secs(100_000))
3886                .unwrap(),
3887            interval: Duration::from_secs(5),
3888            lookback_delta: Duration::from_secs(1),
3889        };
3890
3891        let cases = [
3892            // single equal matcher
3893            (
3894                r#"some_metric{__field__="field_1"}"#,
3895                vec![
3896                    "some_metric.field_1",
3897                    "some_metric.tag_0",
3898                    "some_metric.tag_1",
3899                    "some_metric.tag_2",
3900                    "some_metric.timestamp",
3901                ],
3902            ),
3903            // two equal matchers
3904            (
3905                r#"some_metric{__field__="field_1", __field__="field_0"}"#,
3906                vec![
3907                    "some_metric.field_0",
3908                    "some_metric.field_1",
3909                    "some_metric.tag_0",
3910                    "some_metric.tag_1",
3911                    "some_metric.tag_2",
3912                    "some_metric.timestamp",
3913                ],
3914            ),
3915            // single not_eq matcher
3916            (
3917                r#"some_metric{__field__!="field_1"}"#,
3918                vec![
3919                    "some_metric.field_0",
3920                    "some_metric.field_2",
3921                    "some_metric.tag_0",
3922                    "some_metric.tag_1",
3923                    "some_metric.tag_2",
3924                    "some_metric.timestamp",
3925                ],
3926            ),
3927            // two not_eq matchers
3928            (
3929                r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
3930                vec![
3931                    "some_metric.field_0",
3932                    "some_metric.tag_0",
3933                    "some_metric.tag_1",
3934                    "some_metric.tag_2",
3935                    "some_metric.timestamp",
3936                ],
3937            ),
3938            // equal and not_eq matchers (no conflict)
3939            (
3940                r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
3941                vec![
3942                    "some_metric.field_1",
3943                    "some_metric.tag_0",
3944                    "some_metric.tag_1",
3945                    "some_metric.tag_2",
3946                    "some_metric.timestamp",
3947                ],
3948            ),
3949            // equal and not_eq matchers (conflict)
3950            (
3951                r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
3952                vec![
3953                    "some_metric.tag_0",
3954                    "some_metric.tag_1",
3955                    "some_metric.tag_2",
3956                    "some_metric.timestamp",
3957                ],
3958            ),
3959            // single regex eq matcher
3960            (
3961                r#"some_metric{__field__=~"field_1|field_2"}"#,
3962                vec![
3963                    "some_metric.field_1",
3964                    "some_metric.field_2",
3965                    "some_metric.tag_0",
3966                    "some_metric.tag_1",
3967                    "some_metric.tag_2",
3968                    "some_metric.timestamp",
3969                ],
3970            ),
3971            // single regex not_eq matcher
3972            (
3973                r#"some_metric{__field__!~"field_1|field_2"}"#,
3974                vec![
3975                    "some_metric.field_0",
3976                    "some_metric.tag_0",
3977                    "some_metric.tag_1",
3978                    "some_metric.tag_2",
3979                    "some_metric.timestamp",
3980                ],
3981            ),
3982        ];
3983
3984        for case in cases {
3985            let prom_expr = parser::parse(case.0).unwrap();
3986            eval_stmt.expr = prom_expr;
3987            let table_provider = build_test_table_provider(
3988                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3989                3,
3990                3,
3991            )
3992            .await;
3993            let plan =
3994                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
3995                    .await
3996                    .unwrap();
3997            let mut fields = plan.schema().field_names();
3998            let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
3999            fields.sort();
4000            expected.sort();
4001            assert_eq!(fields, expected, "case: {:?}", case.0);
4002        }
4003
4004        let bad_cases = [
4005            r#"some_metric{__field__="nonexistent"}"#,
4006            r#"some_metric{__field__!="nonexistent"}"#,
4007        ];
4008
4009        for case in bad_cases {
4010            let prom_expr = parser::parse(case).unwrap();
4011            eval_stmt.expr = prom_expr;
4012            let table_provider = build_test_table_provider(
4013                &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4014                3,
4015                3,
4016            )
4017            .await;
4018            let plan =
4019                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
4020            assert!(plan.is_err(), "case: {:?}", case);
4021        }
4022    }
4023
4024    #[tokio::test]
4025    async fn custom_schema() {
4026        let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4027        let expected = String::from(
4028            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4029            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4030            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4031            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4032            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4033        );
4034
4035        indie_query_plan_compare(query, expected).await;
4036
4037        let query = "some_alt_metric{__database__=\"greptime_private\"}";
4038        let expected = String::from(
4039            "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4040            \n  PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4041            \n    Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4042            \n      Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4043            \n        TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4044        );
4045
4046        indie_query_plan_compare(query, expected).await;
4047
4048        let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4049        let expected = String::from("Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4050        \n  Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4051        \n    SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4052        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4053        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4054        \n          Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4055        \n            Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4056        \n              TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4057        \n    SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4058        \n      PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4059        \n        PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4060        \n          Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4061        \n            Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4062        \n              TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]");
4063
4064        indie_query_plan_compare(query, expected).await;
4065    }
4066
4067    #[tokio::test]
4068    async fn only_equals_is_supported_for_special_matcher() {
4069        let queries = &[
4070            "some_alt_metric{__schema__!=\"greptime_private\"}",
4071            "some_alt_metric{__schema__=~\"lalala\"}",
4072            "some_alt_metric{__database__!=\"greptime_private\"}",
4073            "some_alt_metric{__database__=~\"lalala\"}",
4074        ];
4075
4076        for query in queries {
4077            let prom_expr = parser::parse(query).unwrap();
4078            let eval_stmt = EvalStmt {
4079                expr: prom_expr,
4080                start: UNIX_EPOCH,
4081                end: UNIX_EPOCH
4082                    .checked_add(Duration::from_secs(100_000))
4083                    .unwrap(),
4084                interval: Duration::from_secs(5),
4085                lookback_delta: Duration::from_secs(1),
4086            };
4087
4088            let table_provider = build_test_table_provider(
4089                &[
4090                    (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4091                    (
4092                        "greptime_private".to_string(),
4093                        "some_alt_metric".to_string(),
4094                    ),
4095                ],
4096                1,
4097                1,
4098            )
4099            .await;
4100
4101            let plan =
4102                PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
4103            assert!(plan.is_err(), "query: {:?}", query);
4104        }
4105    }
4106
4107    #[tokio::test]
4108    async fn test_non_ms_precision() {
4109        let catalog_list = MemoryCatalogManager::with_default_setup();
4110        let columns = vec![
4111            ColumnSchema::new(
4112                "tag".to_string(),
4113                ConcreteDataType::string_datatype(),
4114                false,
4115            ),
4116            ColumnSchema::new(
4117                "timestamp".to_string(),
4118                ConcreteDataType::timestamp_nanosecond_datatype(),
4119                false,
4120            )
4121            .with_time_index(true),
4122            ColumnSchema::new(
4123                "field".to_string(),
4124                ConcreteDataType::float64_datatype(),
4125                true,
4126            ),
4127        ];
4128        let schema = Arc::new(Schema::new(columns));
4129        let table_meta = TableMetaBuilder::empty()
4130            .schema(schema)
4131            .primary_key_indices(vec![0])
4132            .value_indices(vec![2])
4133            .next_column_id(1024)
4134            .build()
4135            .unwrap();
4136        let table_info = TableInfoBuilder::default()
4137            .name("metrics".to_string())
4138            .meta(table_meta)
4139            .build()
4140            .unwrap();
4141        let table = EmptyTable::from_table_info(&table_info);
4142        assert!(catalog_list
4143            .register_table_sync(RegisterTableRequest {
4144                catalog: DEFAULT_CATALOG_NAME.to_string(),
4145                schema: DEFAULT_SCHEMA_NAME.to_string(),
4146                table_name: "metrics".to_string(),
4147                table_id: 1024,
4148                table,
4149            })
4150            .is_ok());
4151
4152        let plan = PromPlanner::stmt_to_plan(
4153            DfTableSourceProvider::new(
4154                catalog_list.clone(),
4155                false,
4156                QueryContext::arc(),
4157                DummyDecoder::arc(),
4158                true,
4159            ),
4160            &EvalStmt {
4161                expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4162                start: UNIX_EPOCH,
4163                end: UNIX_EPOCH
4164                    .checked_add(Duration::from_secs(100_000))
4165                    .unwrap(),
4166                interval: Duration::from_secs(5),
4167                lookback_delta: Duration::from_secs(1),
4168            },
4169            &build_session_state(),
4170        )
4171        .await
4172        .unwrap();
4173        assert_eq!(plan.display_indent_schema().to_string(),
4174        "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4175        \n  PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4176        \n    Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4177        \n      Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4178        \n        Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4179        \n          TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4180        );
4181        let plan = PromPlanner::stmt_to_plan(
4182            DfTableSourceProvider::new(
4183                catalog_list.clone(),
4184                false,
4185                QueryContext::arc(),
4186                DummyDecoder::arc(),
4187                true,
4188            ),
4189            &EvalStmt {
4190                expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4191                start: UNIX_EPOCH,
4192                end: UNIX_EPOCH
4193                    .checked_add(Duration::from_secs(100_000))
4194                    .unwrap(),
4195                interval: Duration::from_secs(5),
4196                lookback_delta: Duration::from_secs(1),
4197            },
4198            &build_session_state(),
4199        )
4200        .await
4201        .unwrap();
4202        assert_eq!(plan.display_indent_schema().to_string(),
4203        "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4204        \n  Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4205        \n    PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4206        \n      PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4207        \n        PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4208        \n          Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4209        \n            Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4210        \n              Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4211        \n                TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4212        );
4213    }
4214
4215    #[tokio::test]
4216    async fn test_nonexistent_label() {
4217        // template
4218        let mut eval_stmt = EvalStmt {
4219            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4220            start: UNIX_EPOCH,
4221            end: UNIX_EPOCH
4222                .checked_add(Duration::from_secs(100_000))
4223                .unwrap(),
4224            interval: Duration::from_secs(5),
4225            lookback_delta: Duration::from_secs(1),
4226        };
4227
4228        let case = r#"some_metric{nonexistent="hi"}"#;
4229        let prom_expr = parser::parse(case).unwrap();
4230        eval_stmt.expr = prom_expr;
4231        let table_provider = build_test_table_provider(
4232            &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4233            3,
4234            3,
4235        )
4236        .await;
4237        // Should be ok
4238        let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4239            .await
4240            .unwrap();
4241    }
4242
4243    #[tokio::test]
4244    async fn test_label_join() {
4245        let prom_expr = parser::parse(
4246            "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4247        )
4248        .unwrap();
4249        let eval_stmt = EvalStmt {
4250            expr: prom_expr,
4251            start: UNIX_EPOCH,
4252            end: UNIX_EPOCH
4253                .checked_add(Duration::from_secs(100_000))
4254                .unwrap(),
4255            interval: Duration::from_secs(5),
4256            lookback_delta: Duration::from_secs(1),
4257        };
4258
4259        let table_provider =
4260            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4261                .await;
4262        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4263            .await
4264            .unwrap();
4265
4266        let expected = "Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]\
4267        \n  Projection: up.timestamp, up.field_0 AS field_0, concat_ws(Utf8(\",\"), up.tag_1, up.tag_2, up.tag_3) AS foo AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]\
4268        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4269        \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\", \"tag_3\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4270        \n        Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4271        \n          Filter: up.tag_0 = Utf8(\"api-server\") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4272        \n            TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]";
4273
4274        assert_eq!(plan.display_indent_schema().to_string(), expected);
4275    }
4276
4277    #[tokio::test]
4278    async fn test_label_replace() {
4279        let prom_expr = parser::parse(
4280            "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4281        )
4282        .unwrap();
4283        let eval_stmt = EvalStmt {
4284            expr: prom_expr,
4285            start: UNIX_EPOCH,
4286            end: UNIX_EPOCH
4287                .checked_add(Duration::from_secs(100_000))
4288                .unwrap(),
4289            interval: Duration::from_secs(5),
4290            lookback_delta: Duration::from_secs(1),
4291        };
4292
4293        let table_provider =
4294            build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4295                .await;
4296        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4297            .await
4298            .unwrap();
4299
4300        let expected = "Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]\
4301        \n  Projection: up.timestamp, up.field_0 AS field_0, regexp_replace(up.tag_0, Utf8(\"(.*):.*\"), Utf8(\"$1\")) AS foo AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]\
4302        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4303        \n      PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4304        \n        Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4305        \n          Filter: up.tag_0 = Utf8(\"a:c\") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4306        \n            TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]";
4307
4308        assert_eq!(plan.display_indent_schema().to_string(), expected);
4309    }
4310
4311    #[tokio::test]
4312    async fn test_matchers_to_expr() {
4313        let mut eval_stmt = EvalStmt {
4314            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4315            start: UNIX_EPOCH,
4316            end: UNIX_EPOCH
4317                .checked_add(Duration::from_secs(100_000))
4318                .unwrap(),
4319            interval: Duration::from_secs(5),
4320            lookback_delta: Duration::from_secs(1),
4321        };
4322        let case =
4323            r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4324
4325        let prom_expr = parser::parse(case).unwrap();
4326        eval_stmt.expr = prom_expr;
4327        let table_provider = build_test_table_provider(
4328            &[(
4329                DEFAULT_SCHEMA_NAME.to_string(),
4330                "prometheus_tsdb_head_series".to_string(),
4331            )],
4332            3,
4333            3,
4334        )
4335        .await;
4336        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4337            .await
4338            .unwrap();
4339        let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4340        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4341        \n    PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4342        \n      PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4343        \n        Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4344        \n          Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4345        \n            TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4346        assert_eq!(plan.display_indent_schema().to_string(), expected);
4347    }
4348
4349    #[tokio::test]
4350    async fn test_topk_expr() {
4351        let mut eval_stmt = EvalStmt {
4352            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4353            start: UNIX_EPOCH,
4354            end: UNIX_EPOCH
4355                .checked_add(Duration::from_secs(100_000))
4356                .unwrap(),
4357            interval: Duration::from_secs(5),
4358            lookback_delta: Duration::from_secs(1),
4359        };
4360        let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4361
4362        let prom_expr = parser::parse(case).unwrap();
4363        eval_stmt.expr = prom_expr;
4364        let table_provider = build_test_table_provider_with_fields(
4365            &[
4366                (
4367                    DEFAULT_SCHEMA_NAME.to_string(),
4368                    "prometheus_tsdb_head_series".to_string(),
4369                ),
4370                (
4371                    DEFAULT_SCHEMA_NAME.to_string(),
4372                    "http_server_requests_seconds_count".to_string(),
4373                ),
4374            ],
4375            &["ip"],
4376        )
4377        .await;
4378
4379        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4380            .await
4381            .unwrap();
4382        let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4383        \n  Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4384        \n    Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4385        \n      WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4386        \n        Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4387        \n          Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4388        \n            PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4389        \n              PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4390        \n                Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4391        \n                  Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4392        \n                    TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4393
4394        assert_eq!(plan.display_indent_schema().to_string(), expected);
4395    }
4396
4397    #[tokio::test]
4398    async fn test_count_values_expr() {
4399        let mut eval_stmt = EvalStmt {
4400            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4401            start: UNIX_EPOCH,
4402            end: UNIX_EPOCH
4403                .checked_add(Duration::from_secs(100_000))
4404                .unwrap(),
4405            interval: Duration::from_secs(5),
4406            lookback_delta: Duration::from_secs(1),
4407        };
4408        let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4409
4410        let prom_expr = parser::parse(case).unwrap();
4411        eval_stmt.expr = prom_expr;
4412        let table_provider = build_test_table_provider_with_fields(
4413            &[
4414                (
4415                    DEFAULT_SCHEMA_NAME.to_string(),
4416                    "prometheus_tsdb_head_series".to_string(),
4417                ),
4418                (
4419                    DEFAULT_SCHEMA_NAME.to_string(),
4420                    "http_server_requests_seconds_count".to_string(),
4421                ),
4422            ],
4423            &["ip"],
4424        )
4425        .await;
4426
4427        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4428            .await
4429            .unwrap();
4430        let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4431        \n  Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4432        \n    Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4433        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4434        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4435        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4436        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4437        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4438        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4439
4440        assert_eq!(plan.display_indent_schema().to_string(), expected);
4441    }
4442
4443    #[tokio::test]
4444    async fn test_quantile_expr() {
4445        let mut eval_stmt = EvalStmt {
4446            expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4447            start: UNIX_EPOCH,
4448            end: UNIX_EPOCH
4449                .checked_add(Duration::from_secs(100_000))
4450                .unwrap(),
4451            interval: Duration::from_secs(5),
4452            lookback_delta: Duration::from_secs(1),
4453        };
4454        let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4455
4456        let prom_expr = parser::parse(case).unwrap();
4457        eval_stmt.expr = prom_expr;
4458        let table_provider = build_test_table_provider_with_fields(
4459            &[
4460                (
4461                    DEFAULT_SCHEMA_NAME.to_string(),
4462                    "prometheus_tsdb_head_series".to_string(),
4463                ),
4464                (
4465                    DEFAULT_SCHEMA_NAME.to_string(),
4466                    "http_server_requests_seconds_count".to_string(),
4467                ),
4468            ],
4469            &["ip"],
4470        )
4471        .await;
4472
4473        let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
4474            .await
4475            .unwrap();
4476        let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4477        \n  Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
4478        \n    Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4479        \n      Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4480        \n        PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4481        \n          PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4482        \n            Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4483        \n              Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4484        \n                TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4485
4486        assert_eq!(plan.display_indent_schema().to_string(), expected);
4487    }
4488}