1use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::greptime_value;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::grouping::grouping_udaf;
32use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
33use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
34use datafusion::functions_aggregate::sum::sum_udaf;
35use datafusion::functions_aggregate::variance::var_pop_udaf;
36use datafusion::functions_window::row_number::RowNumber;
37use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
38use datafusion::logical_expr::expr_rewriter::normalize_cols;
39use datafusion::logical_expr::{
40 BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
41 ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
42};
43use datafusion::prelude as df_prelude;
44use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
45use datafusion::scalar::ScalarValue;
46use datafusion::sql::TableReference;
47use datafusion_common::{DFSchema, NullEquality};
48use datafusion_expr::expr::WindowFunctionParams;
49use datafusion_expr::utils::conjunction;
50use datafusion_expr::{ExprSchemable, Literal, SortExpr, col, lit};
51use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
52use datatypes::data_type::ConcreteDataType;
53use itertools::Itertools;
54use once_cell::sync::Lazy;
55use promql::extension_plan::{
56 Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
57 ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
58};
59use promql::functions::{
60 AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
61 Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
62 QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
63 quantile_udaf,
64};
65use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
66use promql_parser::parser::token::TokenType;
67use promql_parser::parser::{
68 AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
69 Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
70 Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
71 VectorSelector, token,
72};
73use regex::{self, Regex};
74use snafu::{OptionExt, ResultExt, ensure};
75use store_api::metric_engine_consts::{
76 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
77};
78use table::table::adapter::DfTableProviderAdapter;
79
80use crate::promql::error::{
81 CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
82 ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
83 InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
84 MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
85 Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
86 UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
87 UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
88 ZeroRangeSelectorSnafu,
89};
90use crate::query_engine::QueryEngineState;
91
92const SPECIAL_TIME_FUNCTION: &str = "time";
94const SCALAR_FUNCTION: &str = "scalar";
96const SPECIAL_ABSENT_FUNCTION: &str = "absent";
98const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
100const SPECIAL_VECTOR_FUNCTION: &str = "vector";
102const LE_COLUMN_NAME: &str = "le";
104
105static LABEL_NAME_REGEX: Lazy<Regex> =
108 Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
109
110const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
111
112const DEFAULT_FIELD_COLUMN: &str = "value";
114
115const FIELD_COLUMN_MATCHER: &str = "__field__";
117
118const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
120const DB_COLUMN_MATCHER: &str = "__database__";
121
122const MAX_SCATTER_POINTS: i64 = 400;
124
125const INTERVAL_1H: i64 = 60 * 60 * 1000;
127
128#[derive(Default, Debug, Clone)]
129struct PromPlannerContext {
130 start: Millisecond,
132 end: Millisecond,
133 interval: Millisecond,
134 lookback_delta: Millisecond,
135
136 table_name: Option<String>,
138 time_index_column: Option<String>,
139 field_columns: Vec<String>,
140 tag_columns: Vec<String>,
141 field_column_matcher: Option<Vec<Matcher>>,
143 selector_matcher: Vec<Matcher>,
145 schema_name: Option<String>,
146 range: Option<Millisecond>,
148}
149
150impl PromPlannerContext {
151 fn from_eval_stmt(stmt: &EvalStmt) -> Self {
152 Self {
153 start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
154 end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
155 interval: stmt.interval.as_millis() as _,
156 lookback_delta: stmt.lookback_delta.as_millis() as _,
157 ..Default::default()
158 }
159 }
160
161 fn reset(&mut self) {
163 self.table_name = None;
164 self.time_index_column = None;
165 self.field_columns = vec![];
166 self.tag_columns = vec![];
167 self.field_column_matcher = None;
168 self.selector_matcher.clear();
169 self.schema_name = None;
170 self.range = None;
171 }
172
173 fn reset_table_name_and_schema(&mut self) {
175 self.table_name = Some(String::new());
176 self.schema_name = None;
177 }
178
179 fn has_le_tag(&self) -> bool {
181 self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
182 }
183}
184
185pub struct PromPlanner {
186 table_provider: DfTableSourceProvider,
187 ctx: PromPlannerContext,
188}
189
190impl PromPlanner {
191 pub async fn stmt_to_plan_with_alias(
192 table_provider: DfTableSourceProvider,
193 stmt: &EvalStmt,
194 alias: Option<String>,
195 query_engine_state: &QueryEngineState,
196 ) -> Result<LogicalPlan> {
197 let mut planner = Self {
198 table_provider,
199 ctx: PromPlannerContext::from_eval_stmt(stmt),
200 };
201
202 let plan = planner
203 .prom_expr_to_plan(&stmt.expr, query_engine_state)
204 .await?;
205
206 if let Some(alias_name) = alias {
208 planner.apply_alias_projection(plan, alias_name)
209 } else {
210 Ok(plan)
211 }
212 }
213
214 #[cfg(test)]
215 async fn stmt_to_plan(
216 table_provider: DfTableSourceProvider,
217 stmt: &EvalStmt,
218 query_engine_state: &QueryEngineState,
219 ) -> Result<LogicalPlan> {
220 Self::stmt_to_plan_with_alias(table_provider, stmt, None, query_engine_state).await
221 }
222
223 pub async fn prom_expr_to_plan(
224 &mut self,
225 prom_expr: &PromExpr,
226 query_engine_state: &QueryEngineState,
227 ) -> Result<LogicalPlan> {
228 self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
229 .await
230 }
231
232 #[async_recursion]
242 async fn prom_expr_to_plan_inner(
243 &mut self,
244 prom_expr: &PromExpr,
245 timestamp_fn: bool,
246 query_engine_state: &QueryEngineState,
247 ) -> Result<LogicalPlan> {
248 let res = match prom_expr {
249 PromExpr::Aggregate(expr) => {
250 self.prom_aggr_expr_to_plan(query_engine_state, expr)
251 .await?
252 }
253 PromExpr::Unary(expr) => {
254 self.prom_unary_expr_to_plan(query_engine_state, expr)
255 .await?
256 }
257 PromExpr::Binary(expr) => {
258 self.prom_binary_expr_to_plan(query_engine_state, expr)
259 .await?
260 }
261 PromExpr::Paren(ParenExpr { expr }) => {
262 self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
263 .await?
264 }
265 PromExpr::Subquery(expr) => {
266 self.prom_subquery_expr_to_plan(query_engine_state, expr)
267 .await?
268 }
269 PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
270 PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
271 PromExpr::VectorSelector(selector) => {
272 self.prom_vector_selector_to_plan(selector, timestamp_fn)
273 .await?
274 }
275 PromExpr::MatrixSelector(selector) => {
276 self.prom_matrix_selector_to_plan(selector).await?
277 }
278 PromExpr::Call(expr) => {
279 self.prom_call_expr_to_plan(query_engine_state, expr)
280 .await?
281 }
282 PromExpr::Extension(expr) => {
283 self.prom_ext_expr_to_plan(query_engine_state, expr).await?
284 }
285 };
286
287 Ok(res)
288 }
289
290 async fn prom_subquery_expr_to_plan(
291 &mut self,
292 query_engine_state: &QueryEngineState,
293 subquery_expr: &SubqueryExpr,
294 ) -> Result<LogicalPlan> {
295 let SubqueryExpr {
296 expr, range, step, ..
297 } = subquery_expr;
298
299 let current_interval = self.ctx.interval;
300 if let Some(step) = step {
301 self.ctx.interval = step.as_millis() as _;
302 }
303 let current_start = self.ctx.start;
304 self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
305 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
306 self.ctx.interval = current_interval;
307 self.ctx.start = current_start;
308
309 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
310 let range_ms = range.as_millis() as _;
311 self.ctx.range = Some(range_ms);
312
313 let manipulate = RangeManipulate::new(
314 self.ctx.start,
315 self.ctx.end,
316 self.ctx.interval,
317 range_ms,
318 self.ctx
319 .time_index_column
320 .clone()
321 .expect("time index should be set in `setup_context`"),
322 self.ctx.field_columns.clone(),
323 input,
324 )
325 .context(DataFusionPlanningSnafu)?;
326
327 Ok(LogicalPlan::Extension(Extension {
328 node: Arc::new(manipulate),
329 }))
330 }
331
332 async fn prom_aggr_expr_to_plan(
333 &mut self,
334 query_engine_state: &QueryEngineState,
335 aggr_expr: &AggregateExpr,
336 ) -> Result<LogicalPlan> {
337 let AggregateExpr {
338 op,
339 expr,
340 modifier,
341 param,
342 } = aggr_expr;
343
344 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
345
346 match (*op).id() {
347 token::T_TOPK | token::T_BOTTOMK => {
348 self.prom_topk_bottomk_to_plan(aggr_expr, input).await
349 }
350 _ => {
351 let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
354 let (aggr_exprs, prev_field_exprs) =
356 self.create_aggregate_exprs(*op, param, &input)?;
357
358 let builder = LogicalPlanBuilder::from(input);
360 let builder = if op.id() == token::T_COUNT_VALUES {
361 let label = Self::get_param_value_as_str(*op, param)?;
362 group_exprs.extend(prev_field_exprs.clone());
365 let project_fields = self
366 .create_field_column_exprs()?
367 .into_iter()
368 .chain(self.create_tag_column_exprs()?)
369 .chain(Some(self.create_time_index_column_expr()?))
370 .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
371
372 builder
373 .aggregate(group_exprs.clone(), aggr_exprs)
374 .context(DataFusionPlanningSnafu)?
375 .project(project_fields)
376 .context(DataFusionPlanningSnafu)?
377 } else {
378 builder
379 .aggregate(group_exprs.clone(), aggr_exprs)
380 .context(DataFusionPlanningSnafu)?
381 };
382
383 let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
384
385 builder
386 .sort(sort_expr)
387 .context(DataFusionPlanningSnafu)?
388 .build()
389 .context(DataFusionPlanningSnafu)
390 }
391 }
392 }
393
394 async fn prom_topk_bottomk_to_plan(
396 &mut self,
397 aggr_expr: &AggregateExpr,
398 input: LogicalPlan,
399 ) -> Result<LogicalPlan> {
400 let AggregateExpr {
401 op,
402 param,
403 modifier,
404 ..
405 } = aggr_expr;
406
407 let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
408
409 let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
410
411 let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
413
414 let rank_columns: Vec<_> = window_exprs
415 .iter()
416 .map(|expr| expr.schema_name().to_string())
417 .collect();
418
419 let filter: DfExpr = rank_columns
422 .iter()
423 .fold(None, |expr, rank| {
424 let predicate = DfExpr::BinaryExpr(BinaryExpr {
425 left: Box::new(col(rank)),
426 op: Operator::LtEq,
427 right: Box::new(val.clone()),
428 });
429
430 match expr {
431 None => Some(predicate),
432 Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
433 left: Box::new(expr),
434 op: Operator::Or,
435 right: Box::new(predicate),
436 })),
437 }
438 })
439 .unwrap();
440
441 let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
442
443 let mut new_group_exprs = group_exprs.clone();
444 new_group_exprs.extend(rank_columns);
446
447 let group_sort_expr = new_group_exprs
448 .into_iter()
449 .map(|expr| expr.sort(true, false));
450
451 let project_fields = self
452 .create_field_column_exprs()?
453 .into_iter()
454 .chain(self.create_tag_column_exprs()?)
455 .chain(Some(self.create_time_index_column_expr()?));
456
457 LogicalPlanBuilder::from(input)
458 .window(window_exprs)
459 .context(DataFusionPlanningSnafu)?
460 .filter(filter)
461 .context(DataFusionPlanningSnafu)?
462 .sort(group_sort_expr)
463 .context(DataFusionPlanningSnafu)?
464 .project(project_fields)
465 .context(DataFusionPlanningSnafu)?
466 .build()
467 .context(DataFusionPlanningSnafu)
468 }
469
470 async fn prom_unary_expr_to_plan(
471 &mut self,
472 query_engine_state: &QueryEngineState,
473 unary_expr: &UnaryExpr,
474 ) -> Result<LogicalPlan> {
475 let UnaryExpr { expr } = unary_expr;
476 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
478 self.projection_for_each_field_column(input, |col| {
479 Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
480 })
481 }
482
483 async fn prom_binary_expr_to_plan(
484 &mut self,
485 query_engine_state: &QueryEngineState,
486 binary_expr: &PromBinaryExpr,
487 ) -> Result<LogicalPlan> {
488 let PromBinaryExpr {
489 lhs,
490 rhs,
491 op,
492 modifier,
493 } = binary_expr;
494
495 let should_return_bool = if let Some(m) = modifier {
498 m.return_bool
499 } else {
500 false
501 };
502 let is_comparison_op = Self::is_token_a_comparison_op(*op);
503
504 match (
507 Self::try_build_literal_expr(lhs),
508 Self::try_build_literal_expr(rhs),
509 ) {
510 (Some(lhs), Some(rhs)) => {
511 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
512 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
513 self.ctx.reset_table_name_and_schema();
514 let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
515 let mut field_expr = field_expr_builder(lhs, rhs)?;
516
517 if is_comparison_op && should_return_bool {
518 field_expr = DfExpr::Cast(Cast {
519 expr: Box::new(field_expr),
520 data_type: ArrowDataType::Float64,
521 });
522 }
523
524 Ok(LogicalPlan::Extension(Extension {
525 node: Arc::new(
526 EmptyMetric::new(
527 self.ctx.start,
528 self.ctx.end,
529 self.ctx.interval,
530 SPECIAL_TIME_FUNCTION.to_string(),
531 DEFAULT_FIELD_COLUMN.to_string(),
532 Some(field_expr),
533 )
534 .context(DataFusionPlanningSnafu)?,
535 ),
536 }))
537 }
538 (Some(mut expr), None) => {
540 let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
541 if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
543 expr = time_expr
544 }
545 let bin_expr_builder = |col: &String| {
546 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
547 let mut binary_expr =
548 binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
549
550 if is_comparison_op && should_return_bool {
551 binary_expr = DfExpr::Cast(Cast {
552 expr: Box::new(binary_expr),
553 data_type: ArrowDataType::Float64,
554 });
555 }
556 Ok(binary_expr)
557 };
558 if is_comparison_op && !should_return_bool {
559 self.filter_on_field_column(input, bin_expr_builder)
560 } else {
561 self.projection_for_each_field_column(input, bin_expr_builder)
562 }
563 }
564 (None, Some(mut expr)) => {
566 let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
567 if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
569 expr = time_expr
570 }
571 let bin_expr_builder = |col: &String| {
572 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
573 let mut binary_expr =
574 binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
575
576 if is_comparison_op && should_return_bool {
577 binary_expr = DfExpr::Cast(Cast {
578 expr: Box::new(binary_expr),
579 data_type: ArrowDataType::Float64,
580 });
581 }
582 Ok(binary_expr)
583 };
584 if is_comparison_op && !should_return_bool {
585 self.filter_on_field_column(input, bin_expr_builder)
586 } else {
587 self.projection_for_each_field_column(input, bin_expr_builder)
588 }
589 }
590 (None, None) => {
592 let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
593 let left_field_columns = self.ctx.field_columns.clone();
594 let left_time_index_column = self.ctx.time_index_column.clone();
595 let mut left_table_ref = self
596 .table_ref()
597 .unwrap_or_else(|_| TableReference::bare(""));
598 let left_context = self.ctx.clone();
599
600 let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
601 let right_field_columns = self.ctx.field_columns.clone();
602 let right_time_index_column = self.ctx.time_index_column.clone();
603 let mut right_table_ref = self
604 .table_ref()
605 .unwrap_or_else(|_| TableReference::bare(""));
606 let right_context = self.ctx.clone();
607
608 if Self::is_token_a_set_op(*op) {
612 return self.set_op_on_non_field_columns(
613 left_input,
614 right_input,
615 left_context,
616 right_context,
617 *op,
618 modifier,
619 );
620 }
621
622 if left_table_ref == right_table_ref {
624 left_table_ref = TableReference::bare("lhs");
626 right_table_ref = TableReference::bare("rhs");
627 if self.ctx.tag_columns.is_empty() {
633 self.ctx = left_context.clone();
634 self.ctx.table_name = Some("lhs".to_string());
635 } else {
636 self.ctx.table_name = Some("rhs".to_string());
637 }
638 }
639 let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
640
641 let join_plan = self.join_on_non_field_columns(
642 left_input,
643 right_input,
644 left_table_ref.clone(),
645 right_table_ref.clone(),
646 left_time_index_column,
647 right_time_index_column,
648 left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
651 modifier,
652 )?;
653 let join_plan_schema = join_plan.schema().clone();
654
655 let bin_expr_builder = |_: &String| {
656 let (left_col_name, right_col_name) = field_columns.next().unwrap();
657 let left_col = join_plan_schema
658 .qualified_field_with_name(Some(&left_table_ref), left_col_name)
659 .context(DataFusionPlanningSnafu)?
660 .into();
661 let right_col = join_plan_schema
662 .qualified_field_with_name(Some(&right_table_ref), right_col_name)
663 .context(DataFusionPlanningSnafu)?
664 .into();
665
666 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
667 let mut binary_expr =
668 binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
669 if is_comparison_op && should_return_bool {
670 binary_expr = DfExpr::Cast(Cast {
671 expr: Box::new(binary_expr),
672 data_type: ArrowDataType::Float64,
673 });
674 }
675 Ok(binary_expr)
676 };
677 if is_comparison_op && !should_return_bool {
678 self.filter_on_field_column(join_plan, bin_expr_builder)
679 } else {
680 self.projection_for_each_field_column(join_plan, bin_expr_builder)
681 }
682 }
683 }
684 }
685
686 fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
687 let NumberLiteral { val } = number_literal;
688 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
689 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
690 self.ctx.reset_table_name_and_schema();
691 let literal_expr = df_prelude::lit(*val);
692
693 let plan = LogicalPlan::Extension(Extension {
694 node: Arc::new(
695 EmptyMetric::new(
696 self.ctx.start,
697 self.ctx.end,
698 self.ctx.interval,
699 SPECIAL_TIME_FUNCTION.to_string(),
700 DEFAULT_FIELD_COLUMN.to_string(),
701 Some(literal_expr),
702 )
703 .context(DataFusionPlanningSnafu)?,
704 ),
705 });
706 Ok(plan)
707 }
708
709 fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
710 let StringLiteral { val } = string_literal;
711 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
712 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
713 self.ctx.reset_table_name_and_schema();
714 let literal_expr = df_prelude::lit(val.clone());
715
716 let plan = LogicalPlan::Extension(Extension {
717 node: Arc::new(
718 EmptyMetric::new(
719 self.ctx.start,
720 self.ctx.end,
721 self.ctx.interval,
722 SPECIAL_TIME_FUNCTION.to_string(),
723 DEFAULT_FIELD_COLUMN.to_string(),
724 Some(literal_expr),
725 )
726 .context(DataFusionPlanningSnafu)?,
727 ),
728 });
729 Ok(plan)
730 }
731
732 async fn prom_vector_selector_to_plan(
733 &mut self,
734 vector_selector: &VectorSelector,
735 timestamp_fn: bool,
736 ) -> Result<LogicalPlan> {
737 let VectorSelector {
738 name,
739 offset,
740 matchers,
741 at: _,
742 } = vector_selector;
743 let matchers = self.preprocess_label_matchers(matchers, name)?;
744 if let Some(empty_plan) = self.setup_context().await? {
745 return Ok(empty_plan);
746 }
747 let normalize = self
748 .selector_to_series_normalize_plan(offset, matchers, false)
749 .await?;
750
751 let normalize = if timestamp_fn {
752 self.create_timestamp_func_plan(normalize)?
755 } else {
756 normalize
757 };
758
759 let manipulate = InstantManipulate::new(
760 self.ctx.start,
761 self.ctx.end,
762 self.ctx.lookback_delta,
763 self.ctx.interval,
764 self.ctx
765 .time_index_column
766 .clone()
767 .expect("time index should be set in `setup_context`"),
768 self.ctx.field_columns.first().cloned(),
769 normalize,
770 );
771 Ok(LogicalPlan::Extension(Extension {
772 node: Arc::new(manipulate),
773 }))
774 }
775
776 fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
798 let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
799 .alias(DEFAULT_FIELD_COLUMN);
800 self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
801 let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
802 project_exprs.push(self.create_time_index_column_expr()?);
803 project_exprs.push(time_expr);
804 project_exprs.extend(self.create_tag_column_exprs()?);
805
806 LogicalPlanBuilder::from(normalize)
807 .project(project_exprs)
808 .context(DataFusionPlanningSnafu)?
809 .build()
810 .context(DataFusionPlanningSnafu)
811 }
812
813 async fn prom_matrix_selector_to_plan(
814 &mut self,
815 matrix_selector: &MatrixSelector,
816 ) -> Result<LogicalPlan> {
817 let MatrixSelector { vs, range } = matrix_selector;
818 let VectorSelector {
819 name,
820 offset,
821 matchers,
822 ..
823 } = vs;
824 let matchers = self.preprocess_label_matchers(matchers, name)?;
825 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
826 let range_ms = range.as_millis() as _;
827 self.ctx.range = Some(range_ms);
828
829 let normalize = match self.setup_context().await? {
832 Some(empty_plan) => empty_plan,
833 None => {
834 self.selector_to_series_normalize_plan(offset, matchers, true)
835 .await?
836 }
837 };
838 let manipulate = RangeManipulate::new(
839 self.ctx.start,
840 self.ctx.end,
841 self.ctx.interval,
842 range_ms,
844 self.ctx
845 .time_index_column
846 .clone()
847 .expect("time index should be set in `setup_context`"),
848 self.ctx.field_columns.clone(),
849 normalize,
850 )
851 .context(DataFusionPlanningSnafu)?;
852
853 Ok(LogicalPlan::Extension(Extension {
854 node: Arc::new(manipulate),
855 }))
856 }
857
858 async fn prom_call_expr_to_plan(
859 &mut self,
860 query_engine_state: &QueryEngineState,
861 call_expr: &Call,
862 ) -> Result<LogicalPlan> {
863 let Call { func, args } = call_expr;
864 match func.name {
866 SPECIAL_HISTOGRAM_QUANTILE => {
867 return self.create_histogram_plan(args, query_engine_state).await;
868 }
869 SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
870 SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
871 SPECIAL_ABSENT_FUNCTION => {
872 return self.create_absent_plan(args, query_engine_state).await;
873 }
874 _ => {}
875 }
876
877 let args = self.create_function_args(&args.args)?;
879 let input = if let Some(prom_expr) = &args.input {
880 self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
881 .await?
882 } else {
883 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
884 self.ctx.reset_table_name_and_schema();
885 self.ctx.tag_columns = vec![];
886 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
887 LogicalPlan::Extension(Extension {
888 node: Arc::new(
889 EmptyMetric::new(
890 self.ctx.start,
891 self.ctx.end,
892 self.ctx.interval,
893 SPECIAL_TIME_FUNCTION.to_string(),
894 DEFAULT_FIELD_COLUMN.to_string(),
895 None,
896 )
897 .context(DataFusionPlanningSnafu)?,
898 ),
899 })
900 };
901 let (mut func_exprs, new_tags) =
902 self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
903 func_exprs.insert(0, self.create_time_index_column_expr()?);
904 func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
905
906 let builder = LogicalPlanBuilder::from(input)
907 .project(func_exprs)
908 .context(DataFusionPlanningSnafu)?
909 .filter(self.create_empty_values_filter_expr()?)
910 .context(DataFusionPlanningSnafu)?;
911
912 let builder = match func.name {
913 "sort" => builder
914 .sort(self.create_field_columns_sort_exprs(true))
915 .context(DataFusionPlanningSnafu)?,
916 "sort_desc" => builder
917 .sort(self.create_field_columns_sort_exprs(false))
918 .context(DataFusionPlanningSnafu)?,
919 "sort_by_label" => builder
920 .sort(Self::create_sort_exprs_by_tags(
921 func.name,
922 args.literals,
923 true,
924 )?)
925 .context(DataFusionPlanningSnafu)?,
926 "sort_by_label_desc" => builder
927 .sort(Self::create_sort_exprs_by_tags(
928 func.name,
929 args.literals,
930 false,
931 )?)
932 .context(DataFusionPlanningSnafu)?,
933
934 _ => builder,
935 };
936
937 for tag in new_tags {
940 self.ctx.tag_columns.push(tag);
941 }
942
943 let plan = builder.build().context(DataFusionPlanningSnafu)?;
944 common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
945
946 Ok(plan)
947 }
948
949 async fn prom_ext_expr_to_plan(
950 &mut self,
951 query_engine_state: &QueryEngineState,
952 ext_expr: &promql_parser::parser::ast::Extension,
953 ) -> Result<LogicalPlan> {
954 let expr = &ext_expr.expr;
956 let children = expr.children();
957 let plan = self
958 .prom_expr_to_plan(&children[0], query_engine_state)
959 .await?;
960 match expr.name() {
966 "ANALYZE" => LogicalPlanBuilder::from(plan)
967 .explain(false, true)
968 .unwrap()
969 .build()
970 .context(DataFusionPlanningSnafu),
971 "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
972 .explain(true, true)
973 .unwrap()
974 .build()
975 .context(DataFusionPlanningSnafu),
976 "EXPLAIN" => LogicalPlanBuilder::from(plan)
977 .explain(false, false)
978 .unwrap()
979 .build()
980 .context(DataFusionPlanningSnafu),
981 "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
982 .explain(true, false)
983 .unwrap()
984 .build()
985 .context(DataFusionPlanningSnafu),
986 _ => LogicalPlanBuilder::empty(true)
987 .build()
988 .context(DataFusionPlanningSnafu),
989 }
990 }
991
992 #[allow(clippy::mutable_key_type)]
1002 fn preprocess_label_matchers(
1003 &mut self,
1004 label_matchers: &Matchers,
1005 name: &Option<String>,
1006 ) -> Result<Matchers> {
1007 self.ctx.reset();
1008
1009 let metric_name;
1010 if let Some(name) = name.clone() {
1011 metric_name = Some(name);
1012 ensure!(
1013 label_matchers.find_matchers(METRIC_NAME).is_empty(),
1014 MultipleMetricMatchersSnafu
1015 );
1016 } else {
1017 let mut matches = label_matchers.find_matchers(METRIC_NAME);
1018 ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1019 ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1020 ensure!(
1021 matches[0].op == MatchOp::Equal,
1022 UnsupportedMatcherOpSnafu {
1023 matcher_op: matches[0].op.to_string(),
1024 matcher: METRIC_NAME
1025 }
1026 );
1027 metric_name = matches.pop().map(|m| m.value);
1028 }
1029
1030 self.ctx.table_name = metric_name;
1031
1032 let mut matchers = HashSet::new();
1033 for matcher in &label_matchers.matchers {
1034 if matcher.name == FIELD_COLUMN_MATCHER {
1036 self.ctx
1037 .field_column_matcher
1038 .get_or_insert_default()
1039 .push(matcher.clone());
1040 } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1041 ensure!(
1042 matcher.op == MatchOp::Equal,
1043 UnsupportedMatcherOpSnafu {
1044 matcher: matcher.name.clone(),
1045 matcher_op: matcher.op.to_string(),
1046 }
1047 );
1048 self.ctx.schema_name = Some(matcher.value.clone());
1049 } else if matcher.name != METRIC_NAME {
1050 self.ctx.selector_matcher.push(matcher.clone());
1051 let _ = matchers.insert(matcher.clone());
1052 }
1053 }
1054
1055 Ok(Matchers::new(matchers.into_iter().collect()))
1056 }
1057
1058 async fn selector_to_series_normalize_plan(
1059 &mut self,
1060 offset: &Option<Offset>,
1061 label_matchers: Matchers,
1062 is_range_selector: bool,
1063 ) -> Result<LogicalPlan> {
1064 let table_ref = self.table_ref()?;
1066 let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1067 let table_schema = table_scan.schema();
1068
1069 let offset_duration = match offset {
1071 Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1072 Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1073 None => 0,
1074 };
1075 let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1076 if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1077 scan_filters.push(time_index_filter);
1078 }
1079 table_scan = LogicalPlanBuilder::from(table_scan)
1080 .filter(conjunction(scan_filters).unwrap()) .context(DataFusionPlanningSnafu)?
1082 .build()
1083 .context(DataFusionPlanningSnafu)?;
1084
1085 if let Some(field_matchers) = &self.ctx.field_column_matcher {
1087 let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1088 let mut result_set = HashSet::new();
1090 let mut reverse_set = HashSet::new();
1092 for matcher in field_matchers {
1093 match &matcher.op {
1094 MatchOp::Equal => {
1095 if col_set.contains(&matcher.value) {
1096 let _ = result_set.insert(matcher.value.clone());
1097 } else {
1098 return Err(ColumnNotFoundSnafu {
1099 col: matcher.value.clone(),
1100 }
1101 .build());
1102 }
1103 }
1104 MatchOp::NotEqual => {
1105 if col_set.contains(&matcher.value) {
1106 let _ = reverse_set.insert(matcher.value.clone());
1107 } else {
1108 return Err(ColumnNotFoundSnafu {
1109 col: matcher.value.clone(),
1110 }
1111 .build());
1112 }
1113 }
1114 MatchOp::Re(regex) => {
1115 for col in &self.ctx.field_columns {
1116 if regex.is_match(col) {
1117 let _ = result_set.insert(col.clone());
1118 }
1119 }
1120 }
1121 MatchOp::NotRe(regex) => {
1122 for col in &self.ctx.field_columns {
1123 if regex.is_match(col) {
1124 let _ = reverse_set.insert(col.clone());
1125 }
1126 }
1127 }
1128 }
1129 }
1130 if result_set.is_empty() {
1132 result_set = col_set.into_iter().cloned().collect();
1133 }
1134 for col in reverse_set {
1135 let _ = result_set.remove(&col);
1136 }
1137
1138 self.ctx.field_columns = self
1140 .ctx
1141 .field_columns
1142 .drain(..)
1143 .filter(|col| result_set.contains(col))
1144 .collect();
1145
1146 let exprs = result_set
1147 .into_iter()
1148 .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1149 .chain(self.create_tag_column_exprs()?)
1150 .chain(Some(self.create_time_index_column_expr()?))
1151 .collect::<Vec<_>>();
1152
1153 table_scan = LogicalPlanBuilder::from(table_scan)
1155 .project(exprs)
1156 .context(DataFusionPlanningSnafu)?
1157 .build()
1158 .context(DataFusionPlanningSnafu)?;
1159 }
1160
1161 let sort_plan = LogicalPlanBuilder::from(table_scan)
1163 .sort(self.create_tag_and_time_index_column_sort_exprs()?)
1164 .context(DataFusionPlanningSnafu)?
1165 .build()
1166 .context(DataFusionPlanningSnafu)?;
1167
1168 let time_index_column =
1170 self.ctx
1171 .time_index_column
1172 .clone()
1173 .with_context(|| TimeIndexNotFoundSnafu {
1174 table: table_ref.to_string(),
1175 })?;
1176 let divide_plan = LogicalPlan::Extension(Extension {
1177 node: Arc::new(SeriesDivide::new(
1178 self.ctx.tag_columns.clone(),
1179 time_index_column,
1180 sort_plan,
1181 )),
1182 });
1183
1184 if !is_range_selector && offset_duration == 0 {
1186 return Ok(divide_plan);
1187 }
1188 let series_normalize = SeriesNormalize::new(
1189 offset_duration,
1190 self.ctx
1191 .time_index_column
1192 .clone()
1193 .with_context(|| TimeIndexNotFoundSnafu {
1194 table: table_ref.to_quoted_string(),
1195 })?,
1196 is_range_selector,
1197 self.ctx.tag_columns.clone(),
1198 divide_plan,
1199 );
1200 let logical_plan = LogicalPlan::Extension(Extension {
1201 node: Arc::new(series_normalize),
1202 });
1203
1204 Ok(logical_plan)
1205 }
1206
1207 fn agg_modifier_to_col(
1214 &mut self,
1215 input_schema: &DFSchemaRef,
1216 modifier: &Option<LabelModifier>,
1217 update_ctx: bool,
1218 ) -> Result<Vec<DfExpr>> {
1219 match modifier {
1220 None => {
1221 if update_ctx {
1222 self.ctx.tag_columns.clear();
1223 }
1224 Ok(vec![self.create_time_index_column_expr()?])
1225 }
1226 Some(LabelModifier::Include(labels)) => {
1227 if update_ctx {
1228 self.ctx.tag_columns.clear();
1229 }
1230 let mut exprs = Vec::with_capacity(labels.labels.len());
1231 for label in &labels.labels {
1232 if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label)
1234 {
1235 exprs.push(DfExpr::Column(Column::from_name(column_name.clone())));
1236
1237 if update_ctx {
1238 self.ctx.tag_columns.push(column_name);
1240 }
1241 }
1242 }
1243 exprs.push(self.create_time_index_column_expr()?);
1245
1246 Ok(exprs)
1247 }
1248 Some(LabelModifier::Exclude(labels)) => {
1249 let mut all_fields = input_schema
1250 .fields()
1251 .iter()
1252 .map(|f| f.name())
1253 .collect::<BTreeSet<_>>();
1254
1255 for label in &labels.labels {
1258 let _ = all_fields.remove(label);
1259 }
1260
1261 if let Some(time_index) = &self.ctx.time_index_column {
1263 let _ = all_fields.remove(time_index);
1264 }
1265 for value in &self.ctx.field_columns {
1266 let _ = all_fields.remove(value);
1267 }
1268
1269 if update_ctx {
1270 self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1272 }
1273
1274 let mut exprs = all_fields
1276 .into_iter()
1277 .map(|c| DfExpr::Column(Column::from(c)))
1278 .collect::<Vec<_>>();
1279
1280 exprs.push(self.create_time_index_column_expr()?);
1282
1283 Ok(exprs)
1284 }
1285 }
1286 }
1287
1288 pub fn matchers_to_expr(
1290 label_matchers: Matchers,
1291 table_schema: &DFSchemaRef,
1292 ) -> Result<Vec<DfExpr>> {
1293 let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1294 for matcher in label_matchers.matchers {
1295 if matcher.name == SCHEMA_COLUMN_MATCHER
1296 || matcher.name == DB_COLUMN_MATCHER
1297 || matcher.name == FIELD_COLUMN_MATCHER
1298 {
1299 continue;
1300 }
1301
1302 let column_name = Self::find_case_sensitive_column(table_schema, matcher.name.as_str());
1303 let col = if let Some(column_name) = column_name {
1304 DfExpr::Column(Column::from_name(column_name))
1305 } else {
1306 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
1307 .alias(matcher.name.clone())
1308 };
1309 let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1310 let expr = match matcher.op {
1311 MatchOp::Equal => col.eq(lit),
1312 MatchOp::NotEqual => col.not_eq(lit),
1313 MatchOp::Re(re) => {
1314 if re.as_str() == "^(?:.*)$" {
1320 continue;
1321 }
1322 if re.as_str() == "^(?:.+)$" {
1323 col.not_eq(DfExpr::Literal(
1324 ScalarValue::Utf8(Some(String::new())),
1325 None,
1326 ))
1327 } else {
1328 DfExpr::BinaryExpr(BinaryExpr {
1329 left: Box::new(col),
1330 op: Operator::RegexMatch,
1331 right: Box::new(DfExpr::Literal(
1332 ScalarValue::Utf8(Some(re.as_str().to_string())),
1333 None,
1334 )),
1335 })
1336 }
1337 }
1338 MatchOp::NotRe(re) => {
1339 if re.as_str() == "^(?:.*)$" {
1340 DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1341 } else if re.as_str() == "^(?:.+)$" {
1342 col.eq(DfExpr::Literal(
1343 ScalarValue::Utf8(Some(String::new())),
1344 None,
1345 ))
1346 } else {
1347 DfExpr::BinaryExpr(BinaryExpr {
1348 left: Box::new(col),
1349 op: Operator::RegexNotMatch,
1350 right: Box::new(DfExpr::Literal(
1351 ScalarValue::Utf8(Some(re.as_str().to_string())),
1352 None,
1353 )),
1354 })
1355 }
1356 }
1357 };
1358 exprs.push(expr);
1359 }
1360
1361 Ok(exprs)
1362 }
1363
1364 fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option<String> {
1365 schema
1366 .fields()
1367 .iter()
1368 .find(|field| field.name() == column)
1369 .map(|field| field.name().clone())
1370 }
1371
1372 fn table_ref(&self) -> Result<TableReference> {
1373 let table_name = self
1374 .ctx
1375 .table_name
1376 .clone()
1377 .context(TableNameNotFoundSnafu)?;
1378
1379 let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1381 TableReference::partial(schema_name.as_str(), table_name.as_str())
1382 } else {
1383 TableReference::bare(table_name.as_str())
1384 };
1385
1386 Ok(table_ref)
1387 }
1388
1389 fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1390 let start = self.ctx.start;
1391 let end = self.ctx.end;
1392 if end < start {
1393 return InvalidTimeRangeSnafu { start, end }.fail();
1394 }
1395 let lookback_delta = self.ctx.lookback_delta;
1396 let range = self.ctx.range.unwrap_or_default();
1397 let interval = self.ctx.interval;
1398 let time_index_expr = self.create_time_index_column_expr()?;
1399 let num_points = (end - start) / interval;
1400
1401 if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1403 let single_time_range = time_index_expr
1404 .clone()
1405 .gt_eq(DfExpr::Literal(
1406 ScalarValue::TimestampMillisecond(
1407 Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
1408 None,
1409 ),
1410 None,
1411 ))
1412 .and(time_index_expr.lt_eq(DfExpr::Literal(
1413 ScalarValue::TimestampMillisecond(
1414 Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
1415 None,
1416 ),
1417 None,
1418 )));
1419 return Ok(Some(single_time_range));
1420 }
1421
1422 let mut filters = Vec::with_capacity(num_points as usize);
1424 for timestamp in (start..end).step_by(interval as usize) {
1425 filters.push(
1426 time_index_expr
1427 .clone()
1428 .gt_eq(DfExpr::Literal(
1429 ScalarValue::TimestampMillisecond(
1430 Some(timestamp - offset_duration - lookback_delta - range),
1431 None,
1432 ),
1433 None,
1434 ))
1435 .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1436 ScalarValue::TimestampMillisecond(
1437 Some(timestamp - offset_duration + lookback_delta),
1438 None,
1439 ),
1440 None,
1441 ))),
1442 )
1443 }
1444
1445 Ok(filters.into_iter().reduce(DfExpr::or))
1446 }
1447
1448 async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1453 let provider = self
1454 .table_provider
1455 .resolve_table(table_ref.clone())
1456 .await
1457 .context(CatalogSnafu)?;
1458
1459 let is_time_index_ms = provider
1460 .as_any()
1461 .downcast_ref::<DefaultTableSource>()
1462 .context(UnknownTableSnafu)?
1463 .table_provider
1464 .as_any()
1465 .downcast_ref::<DfTableProviderAdapter>()
1466 .context(UnknownTableSnafu)?
1467 .table()
1468 .schema()
1469 .timestamp_column()
1470 .with_context(|| TimeIndexNotFoundSnafu {
1471 table: table_ref.to_quoted_string(),
1472 })?
1473 .data_type
1474 == ConcreteDataType::timestamp_millisecond_datatype();
1475
1476 let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
1477 .context(DataFusionPlanningSnafu)?
1478 .build()
1479 .context(DataFusionPlanningSnafu)?;
1480
1481 if !is_time_index_ms {
1482 let expr: Vec<_> = self
1484 .ctx
1485 .field_columns
1486 .iter()
1487 .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
1488 .chain(self.create_tag_column_exprs()?)
1489 .chain(Some(DfExpr::Alias(Alias {
1490 expr: Box::new(DfExpr::Cast(Cast {
1491 expr: Box::new(self.create_time_index_column_expr()?),
1492 data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1493 })),
1494 relation: Some(table_ref.clone()),
1495 name: self
1496 .ctx
1497 .time_index_column
1498 .as_ref()
1499 .with_context(|| TimeIndexNotFoundSnafu {
1500 table: table_ref.to_quoted_string(),
1501 })?
1502 .clone(),
1503 metadata: None,
1504 })))
1505 .collect::<Vec<_>>();
1506 scan_plan = LogicalPlanBuilder::from(scan_plan)
1507 .project(expr)
1508 .context(DataFusionPlanningSnafu)?
1509 .build()
1510 .context(DataFusionPlanningSnafu)?;
1511 }
1512
1513 let result = LogicalPlanBuilder::from(scan_plan)
1514 .build()
1515 .context(DataFusionPlanningSnafu)?;
1516 Ok(result)
1517 }
1518
1519 async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1523 let table_ref = self.table_ref()?;
1524 let table = match self.table_provider.resolve_table(table_ref.clone()).await {
1525 Err(e) if e.status_code() == StatusCode::TableNotFound => {
1526 let plan = self.setup_context_for_empty_metric()?;
1527 return Ok(Some(plan));
1528 }
1529 res => res.context(CatalogSnafu)?,
1530 };
1531 let table = table
1532 .as_any()
1533 .downcast_ref::<DefaultTableSource>()
1534 .context(UnknownTableSnafu)?
1535 .table_provider
1536 .as_any()
1537 .downcast_ref::<DfTableProviderAdapter>()
1538 .context(UnknownTableSnafu)?
1539 .table();
1540
1541 let time_index = table
1543 .schema()
1544 .timestamp_column()
1545 .with_context(|| TimeIndexNotFoundSnafu {
1546 table: table_ref.to_quoted_string(),
1547 })?
1548 .name
1549 .clone();
1550 self.ctx.time_index_column = Some(time_index);
1551
1552 let values = table
1554 .table_info()
1555 .meta
1556 .field_column_names()
1557 .cloned()
1558 .collect();
1559 self.ctx.field_columns = values;
1560
1561 let tags = table
1563 .table_info()
1564 .meta
1565 .row_key_column_names()
1566 .filter(|col| {
1567 col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
1569 })
1570 .cloned()
1571 .collect();
1572 self.ctx.tag_columns = tags;
1573
1574 Ok(None)
1575 }
1576
1577 fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
1580 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
1581 self.ctx.reset_table_name_and_schema();
1582 self.ctx.tag_columns = vec![];
1583 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
1584
1585 let plan = LogicalPlan::Extension(Extension {
1587 node: Arc::new(
1588 EmptyMetric::new(
1589 0,
1590 -1,
1591 self.ctx.interval,
1592 SPECIAL_TIME_FUNCTION.to_string(),
1593 DEFAULT_FIELD_COLUMN.to_string(),
1594 Some(lit(0.0f64)),
1595 )
1596 .context(DataFusionPlanningSnafu)?,
1597 ),
1598 });
1599 Ok(plan)
1600 }
1601
1602 fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
1604 let mut result = FunctionArgs::default();
1605
1606 for arg in args {
1607 if let Some(expr) = Self::try_build_literal_expr(arg) {
1609 result.literals.push(expr);
1610 } else {
1611 match arg.as_ref() {
1613 PromExpr::Subquery(_)
1614 | PromExpr::VectorSelector(_)
1615 | PromExpr::MatrixSelector(_)
1616 | PromExpr::Extension(_)
1617 | PromExpr::Aggregate(_)
1618 | PromExpr::Paren(_)
1619 | PromExpr::Call(_)
1620 | PromExpr::Binary(_)
1621 | PromExpr::Unary(_) => {
1622 if result.input.replace(*arg.clone()).is_some() {
1623 MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
1624 }
1625 }
1626
1627 _ => {
1628 let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
1629 result.literals.push(expr);
1630 }
1631 }
1632 }
1633 }
1634
1635 Ok(result)
1636 }
1637
1638 fn create_function_expr(
1644 &mut self,
1645 func: &Function,
1646 other_input_exprs: Vec<DfExpr>,
1647 query_engine_state: &QueryEngineState,
1648 ) -> Result<(Vec<DfExpr>, Vec<String>)> {
1649 let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
1651
1652 let field_column_pos = 0;
1654 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
1655 let mut new_tags = vec![];
1657 let scalar_func = match func.name {
1658 "increase" => ScalarFunc::ExtrapolateUdf(
1659 Arc::new(Increase::scalar_udf()),
1660 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1661 ),
1662 "rate" => ScalarFunc::ExtrapolateUdf(
1663 Arc::new(Rate::scalar_udf()),
1664 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1665 ),
1666 "delta" => ScalarFunc::ExtrapolateUdf(
1667 Arc::new(Delta::scalar_udf()),
1668 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
1669 ),
1670 "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
1671 "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
1672 "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
1673 "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
1674 "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
1675 "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
1676 "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
1677 "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
1678 "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
1679 "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
1680 "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
1681 "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
1682 "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
1683 "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
1684 "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
1685 "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
1686 "predict_linear" => {
1687 other_input_exprs[0] = DfExpr::Cast(Cast {
1688 expr: Box::new(other_input_exprs[0].clone()),
1689 data_type: ArrowDataType::Int64,
1690 });
1691 ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
1692 }
1693 "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
1694 "time" => {
1695 exprs.push(build_special_time_expr(
1696 self.ctx.time_index_column.as_ref().unwrap(),
1697 ));
1698 ScalarFunc::GeneratedExpr
1699 }
1700 "minute" => {
1701 let expr = self.date_part_on_time_index("minute")?;
1703 exprs.push(expr);
1704 ScalarFunc::GeneratedExpr
1705 }
1706 "hour" => {
1707 let expr = self.date_part_on_time_index("hour")?;
1709 exprs.push(expr);
1710 ScalarFunc::GeneratedExpr
1711 }
1712 "month" => {
1713 let expr = self.date_part_on_time_index("month")?;
1715 exprs.push(expr);
1716 ScalarFunc::GeneratedExpr
1717 }
1718 "year" => {
1719 let expr = self.date_part_on_time_index("year")?;
1721 exprs.push(expr);
1722 ScalarFunc::GeneratedExpr
1723 }
1724 "day_of_month" => {
1725 let expr = self.date_part_on_time_index("day")?;
1727 exprs.push(expr);
1728 ScalarFunc::GeneratedExpr
1729 }
1730 "day_of_week" => {
1731 let expr = self.date_part_on_time_index("dow")?;
1733 exprs.push(expr);
1734 ScalarFunc::GeneratedExpr
1735 }
1736 "day_of_year" => {
1737 let expr = self.date_part_on_time_index("doy")?;
1739 exprs.push(expr);
1740 ScalarFunc::GeneratedExpr
1741 }
1742 "days_in_month" => {
1743 let day_lit_expr = "day".lit();
1748 let month_lit_expr = "month".lit();
1749 let interval_1month_lit_expr =
1750 DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
1751 let interval_1day_lit_expr = DfExpr::Literal(
1752 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
1753 None,
1754 );
1755 let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
1756 left: Box::new(interval_1month_lit_expr),
1757 op: Operator::Minus,
1758 right: Box::new(interval_1day_lit_expr),
1759 });
1760 let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
1761 func: datafusion_functions::datetime::date_trunc(),
1762 args: vec![month_lit_expr, self.create_time_index_column_expr()?],
1763 });
1764 let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
1765 left: Box::new(date_trunc_expr),
1766 op: Operator::Plus,
1767 right: Box::new(the_1month_minus_1day_expr),
1768 });
1769 let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
1770 func: datafusion_functions::datetime::date_part(),
1771 args: vec![day_lit_expr, date_trunc_plus_interval_expr],
1772 });
1773
1774 exprs.push(date_part_expr);
1775 ScalarFunc::GeneratedExpr
1776 }
1777
1778 "label_join" => {
1779 let (concat_expr, dst_label) = Self::build_concat_labels_expr(
1780 &mut other_input_exprs,
1781 &self.ctx,
1782 query_engine_state,
1783 )?;
1784
1785 for value in &self.ctx.field_columns {
1787 if *value != dst_label {
1788 let expr = DfExpr::Column(Column::from_name(value));
1789 exprs.push(expr);
1790 }
1791 }
1792
1793 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
1795 new_tags.push(dst_label);
1796 exprs.push(concat_expr);
1798
1799 ScalarFunc::GeneratedExpr
1800 }
1801 "label_replace" => {
1802 if let Some((replace_expr, dst_label)) = self
1803 .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
1804 {
1805 for value in &self.ctx.field_columns {
1807 if *value != dst_label {
1808 let expr = DfExpr::Column(Column::from_name(value));
1809 exprs.push(expr);
1810 }
1811 }
1812
1813 ensure!(
1814 !self.ctx.tag_columns.contains(&dst_label),
1815 SameLabelSetSnafu
1816 );
1817 new_tags.push(dst_label);
1818 exprs.push(replace_expr);
1820 } else {
1821 for value in &self.ctx.field_columns {
1823 let expr = DfExpr::Column(Column::from_name(value));
1824 exprs.push(expr);
1825 }
1826 }
1827
1828 ScalarFunc::GeneratedExpr
1829 }
1830 "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
1831 for value in &self.ctx.field_columns {
1834 let expr = DfExpr::Column(Column::from_name(value));
1835 exprs.push(expr);
1836 }
1837
1838 ScalarFunc::GeneratedExpr
1839 }
1840 "round" => {
1841 if other_input_exprs.is_empty() {
1842 other_input_exprs.push_front(0.0f64.lit());
1843 }
1844 ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
1845 }
1846 "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
1847 "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
1848 "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
1849 "pi" => {
1850 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1852 func: datafusion::functions::math::pi(),
1853 args: vec![],
1854 });
1855 exprs.push(fn_expr);
1856
1857 ScalarFunc::GeneratedExpr
1858 }
1859 _ => {
1860 if let Some(f) = query_engine_state
1861 .session_state()
1862 .scalar_functions()
1863 .get(func.name)
1864 {
1865 ScalarFunc::DataFusionBuiltin(f.clone())
1866 } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
1867 let func_state = query_engine_state.function_state();
1868 let query_ctx = self.table_provider.query_ctx();
1869
1870 ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
1871 state: func_state,
1872 query_ctx: query_ctx.clone(),
1873 })))
1874 } else if let Some(f) = datafusion_functions::math::functions()
1875 .iter()
1876 .find(|f| f.name() == func.name)
1877 {
1878 ScalarFunc::DataFusionUdf(f.clone())
1879 } else {
1880 return UnsupportedExprSnafu {
1881 name: func.name.to_string(),
1882 }
1883 .fail();
1884 }
1885 }
1886 };
1887
1888 for value in &self.ctx.field_columns {
1889 let col_expr = DfExpr::Column(Column::from_name(value));
1890
1891 match scalar_func.clone() {
1892 ScalarFunc::DataFusionBuiltin(func) => {
1893 other_input_exprs.insert(field_column_pos, col_expr);
1894 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1895 func,
1896 args: other_input_exprs.clone().into(),
1897 });
1898 exprs.push(fn_expr);
1899 let _ = other_input_exprs.remove(field_column_pos);
1900 }
1901 ScalarFunc::DataFusionUdf(func) => {
1902 let args = itertools::chain!(
1903 other_input_exprs.iter().take(field_column_pos).cloned(),
1904 std::iter::once(col_expr),
1905 other_input_exprs.iter().skip(field_column_pos).cloned()
1906 )
1907 .collect_vec();
1908 exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
1909 }
1910 ScalarFunc::Udf(func) => {
1911 let ts_range_expr = DfExpr::Column(Column::from_name(
1912 RangeManipulate::build_timestamp_range_name(
1913 self.ctx.time_index_column.as_ref().unwrap(),
1914 ),
1915 ));
1916 other_input_exprs.insert(field_column_pos, ts_range_expr);
1917 other_input_exprs.insert(field_column_pos + 1, col_expr);
1918 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1919 func,
1920 args: other_input_exprs.clone().into(),
1921 });
1922 exprs.push(fn_expr);
1923 let _ = other_input_exprs.remove(field_column_pos + 1);
1924 let _ = other_input_exprs.remove(field_column_pos);
1925 }
1926 ScalarFunc::ExtrapolateUdf(func, range_length) => {
1927 let ts_range_expr = DfExpr::Column(Column::from_name(
1928 RangeManipulate::build_timestamp_range_name(
1929 self.ctx.time_index_column.as_ref().unwrap(),
1930 ),
1931 ));
1932 other_input_exprs.insert(field_column_pos, ts_range_expr);
1933 other_input_exprs.insert(field_column_pos + 1, col_expr);
1934 other_input_exprs
1935 .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
1936 other_input_exprs.push_back(lit(range_length));
1937 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
1938 func,
1939 args: other_input_exprs.clone().into(),
1940 });
1941 exprs.push(fn_expr);
1942 let _ = other_input_exprs.pop_back();
1943 let _ = other_input_exprs.remove(field_column_pos + 2);
1944 let _ = other_input_exprs.remove(field_column_pos + 1);
1945 let _ = other_input_exprs.remove(field_column_pos);
1946 }
1947 ScalarFunc::GeneratedExpr => {}
1948 }
1949 }
1950
1951 if !matches!(func.name, "label_join" | "label_replace") {
1955 let mut new_field_columns = Vec::with_capacity(exprs.len());
1956
1957 exprs = exprs
1958 .into_iter()
1959 .map(|expr| {
1960 let display_name = expr.schema_name().to_string();
1961 new_field_columns.push(display_name.clone());
1962 Ok(expr.alias(display_name))
1963 })
1964 .collect::<std::result::Result<Vec<_>, _>>()
1965 .context(DataFusionPlanningSnafu)?;
1966
1967 self.ctx.field_columns = new_field_columns;
1968 }
1969
1970 Ok((exprs, new_tags))
1971 }
1972
1973 fn validate_label_name(label_name: &str) -> Result<()> {
1977 if label_name.starts_with("__") {
1979 return InvalidDestinationLabelNameSnafu { label_name }.fail();
1980 }
1981 if !LABEL_NAME_REGEX.is_match(label_name) {
1983 return InvalidDestinationLabelNameSnafu { label_name }.fail();
1984 }
1985
1986 Ok(())
1987 }
1988
1989 fn build_regexp_replace_label_expr(
1991 &self,
1992 other_input_exprs: &mut VecDeque<DfExpr>,
1993 query_engine_state: &QueryEngineState,
1994 ) -> Result<Option<(DfExpr, String)>> {
1995 let dst_label = match other_input_exprs.pop_front() {
1997 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
1998 other => UnexpectedPlanExprSnafu {
1999 desc: format!("expected dst_label string literal, but found {:?}", other),
2000 }
2001 .fail()?,
2002 };
2003
2004 Self::validate_label_name(&dst_label)?;
2006 let replacement = match other_input_exprs.pop_front() {
2007 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
2008 other => UnexpectedPlanExprSnafu {
2009 desc: format!("expected replacement string literal, but found {:?}", other),
2010 }
2011 .fail()?,
2012 };
2013 let src_label = match other_input_exprs.pop_front() {
2014 Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2015 other => UnexpectedPlanExprSnafu {
2016 desc: format!("expected src_label string literal, but found {:?}", other),
2017 }
2018 .fail()?,
2019 };
2020
2021 let regex = match other_input_exprs.pop_front() {
2022 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2023 other => UnexpectedPlanExprSnafu {
2024 desc: format!("expected regex string literal, but found {:?}", other),
2025 }
2026 .fail()?,
2027 };
2028
2029 regex::Regex::new(®ex).map_err(|_| {
2032 InvalidRegularExpressionSnafu {
2033 regex: regex.clone(),
2034 }
2035 .build()
2036 })?;
2037
2038 if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2040 return Ok(None);
2041 }
2042
2043 if !self.ctx.tag_columns.contains(&src_label) {
2045 if replacement.is_empty() {
2046 return Ok(None);
2048 } else {
2049 return Ok(Some((
2051 lit(replacement).alias(&dst_label),
2053 dst_label,
2054 )));
2055 }
2056 }
2057
2058 let regex = format!("^(?s:{regex})$");
2061
2062 let session_state = query_engine_state.session_state();
2063 let func = session_state
2064 .scalar_functions()
2065 .get("regexp_replace")
2066 .context(UnsupportedExprSnafu {
2067 name: "regexp_replace",
2068 })?;
2069
2070 let args = vec![
2072 if src_label.is_empty() {
2073 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2074 } else {
2075 DfExpr::Column(Column::from_name(src_label))
2076 },
2077 DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2078 DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2079 ];
2080
2081 Ok(Some((
2082 DfExpr::ScalarFunction(ScalarFunction {
2083 func: func.clone(),
2084 args,
2085 })
2086 .alias(&dst_label),
2087 dst_label,
2088 )))
2089 }
2090
2091 fn build_concat_labels_expr(
2093 other_input_exprs: &mut VecDeque<DfExpr>,
2094 ctx: &PromPlannerContext,
2095 query_engine_state: &QueryEngineState,
2096 ) -> Result<(DfExpr, String)> {
2097 let dst_label = match other_input_exprs.pop_front() {
2100 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2101 other => UnexpectedPlanExprSnafu {
2102 desc: format!("expected dst_label string literal, but found {:?}", other),
2103 }
2104 .fail()?,
2105 };
2106 let separator = match other_input_exprs.pop_front() {
2107 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2108 other => UnexpectedPlanExprSnafu {
2109 desc: format!("expected separator string literal, but found {:?}", other),
2110 }
2111 .fail()?,
2112 };
2113
2114 let available_columns: HashSet<&str> = ctx
2116 .tag_columns
2117 .iter()
2118 .chain(ctx.field_columns.iter())
2119 .chain(ctx.time_index_column.as_ref())
2120 .map(|s| s.as_str())
2121 .collect();
2122
2123 let src_labels = other_input_exprs
2124 .iter()
2125 .map(|expr| {
2126 match expr {
2128 DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2129 if label.is_empty() {
2130 Ok(DfExpr::Literal(ScalarValue::Null, None))
2131 } else if available_columns.contains(label.as_str()) {
2132 Ok(DfExpr::Column(Column::from_name(label)))
2134 } else {
2135 Ok(DfExpr::Literal(ScalarValue::Null, None))
2137 }
2138 }
2139 other => UnexpectedPlanExprSnafu {
2140 desc: format!(
2141 "expected source label string literal, but found {:?}",
2142 other
2143 ),
2144 }
2145 .fail(),
2146 }
2147 })
2148 .collect::<Result<Vec<_>>>()?;
2149 ensure!(
2150 !src_labels.is_empty(),
2151 FunctionInvalidArgumentSnafu {
2152 fn_name: "label_join"
2153 }
2154 );
2155
2156 let session_state = query_engine_state.session_state();
2157 let func = session_state
2158 .scalar_functions()
2159 .get("concat_ws")
2160 .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2161
2162 let mut args = Vec::with_capacity(1 + src_labels.len());
2164 args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2165 args.extend(src_labels);
2166
2167 Ok((
2168 DfExpr::ScalarFunction(ScalarFunction {
2169 func: func.clone(),
2170 args,
2171 })
2172 .alias(&dst_label),
2173 dst_label,
2174 ))
2175 }
2176
2177 fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2178 Ok(DfExpr::Column(Column::from_name(
2179 self.ctx
2180 .time_index_column
2181 .clone()
2182 .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2183 )))
2184 }
2185
2186 fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2187 let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2188 for tag in &self.ctx.tag_columns {
2189 let expr = DfExpr::Column(Column::from_name(tag));
2190 result.push(expr);
2191 }
2192 Ok(result)
2193 }
2194
2195 fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2196 let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2197 for field in &self.ctx.field_columns {
2198 let expr = DfExpr::Column(Column::from_name(field));
2199 result.push(expr);
2200 }
2201 Ok(result)
2202 }
2203
2204 fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2205 let mut result = self
2206 .ctx
2207 .tag_columns
2208 .iter()
2209 .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2210 .collect::<Vec<_>>();
2211 result.push(self.create_time_index_column_expr()?.sort(true, true));
2212 Ok(result)
2213 }
2214
2215 fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2216 self.ctx
2217 .field_columns
2218 .iter()
2219 .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2220 .collect::<Vec<_>>()
2221 }
2222
2223 fn create_sort_exprs_by_tags(
2224 func: &str,
2225 tags: Vec<DfExpr>,
2226 asc: bool,
2227 ) -> Result<Vec<SortExpr>> {
2228 ensure!(
2229 !tags.is_empty(),
2230 FunctionInvalidArgumentSnafu { fn_name: func }
2231 );
2232
2233 tags.iter()
2234 .map(|col| match col {
2235 DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2236 Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2237 }
2238 other => UnexpectedPlanExprSnafu {
2239 desc: format!("expected label string literal, but found {:?}", other),
2240 }
2241 .fail(),
2242 })
2243 .collect::<Result<Vec<_>>>()
2244 }
2245
2246 fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2247 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2248 for value in &self.ctx.field_columns {
2249 let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2250 exprs.push(expr);
2251 }
2252
2253 conjunction(exprs).context(ValueNotFoundSnafu {
2254 table: self.table_ref()?.to_quoted_string(),
2255 })
2256 }
2257
2258 fn create_aggregate_exprs(
2274 &mut self,
2275 op: TokenType,
2276 param: &Option<Box<PromExpr>>,
2277 input_plan: &LogicalPlan,
2278 ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2279 let mut non_col_args = Vec::new();
2280 let aggr = match op.id() {
2281 token::T_SUM => sum_udaf(),
2282 token::T_QUANTILE => {
2283 let q =
2284 Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2285 non_col_args.push(q);
2286 quantile_udaf()
2287 }
2288 token::T_AVG => avg_udaf(),
2289 token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2290 token::T_MIN => min_udaf(),
2291 token::T_MAX => max_udaf(),
2292 token::T_GROUP => grouping_udaf(),
2293 token::T_STDDEV => stddev_pop_udaf(),
2294 token::T_STDVAR => var_pop_udaf(),
2295 token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2296 name: format!("{op:?}"),
2297 }
2298 .fail()?,
2299 _ => UnexpectedTokenSnafu { token: op }.fail()?,
2300 };
2301
2302 let exprs: Vec<DfExpr> = self
2304 .ctx
2305 .field_columns
2306 .iter()
2307 .map(|col| {
2308 non_col_args.push(DfExpr::Column(Column::from_name(col)));
2309 let expr = aggr.call(non_col_args.clone());
2310 non_col_args.pop();
2311 expr
2312 })
2313 .collect::<Vec<_>>();
2314
2315 let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2317 let prev_field_exprs: Vec<_> = self
2318 .ctx
2319 .field_columns
2320 .iter()
2321 .map(|col| DfExpr::Column(Column::from_name(col)))
2322 .collect();
2323
2324 ensure!(
2325 self.ctx.field_columns.len() == 1,
2326 UnsupportedExprSnafu {
2327 name: "count_values on multi-value input"
2328 }
2329 );
2330
2331 prev_field_exprs
2332 } else {
2333 vec![]
2334 };
2335
2336 let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2338
2339 let normalized_exprs =
2340 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2341 for expr in normalized_exprs {
2342 new_field_columns.push(expr.schema_name().to_string());
2343 }
2344 self.ctx.field_columns = new_field_columns;
2345
2346 Ok((exprs, prev_field_exprs))
2347 }
2348
2349 fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2350 let param = param
2351 .as_deref()
2352 .with_context(|| FunctionInvalidArgumentSnafu {
2353 fn_name: op.to_string(),
2354 })?;
2355 let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2356 return FunctionInvalidArgumentSnafu {
2357 fn_name: op.to_string(),
2358 }
2359 .fail();
2360 };
2361
2362 Ok(val)
2363 }
2364
2365 fn get_param_as_literal_expr(
2366 param: &Option<Box<PromExpr>>,
2367 op: Option<TokenType>,
2368 expected_type: Option<ArrowDataType>,
2369 ) -> Result<DfExpr> {
2370 let prom_param = param.as_deref().with_context(|| {
2371 if let Some(op) = op {
2372 FunctionInvalidArgumentSnafu {
2373 fn_name: op.to_string(),
2374 }
2375 } else {
2376 FunctionInvalidArgumentSnafu {
2377 fn_name: "unknown".to_string(),
2378 }
2379 }
2380 })?;
2381
2382 let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2383 if let Some(op) = op {
2384 FunctionInvalidArgumentSnafu {
2385 fn_name: op.to_string(),
2386 }
2387 } else {
2388 FunctionInvalidArgumentSnafu {
2389 fn_name: "unknown".to_string(),
2390 }
2391 }
2392 })?;
2393
2394 if let Some(expected_type) = expected_type {
2396 let expr_type = expr
2398 .get_type(&DFSchema::empty())
2399 .context(DataFusionPlanningSnafu)?;
2400 if expected_type != expr_type {
2401 return FunctionInvalidArgumentSnafu {
2402 fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2403 }
2404 .fail();
2405 }
2406 }
2407
2408 Ok(expr)
2409 }
2410
2411 fn create_window_exprs(
2414 &mut self,
2415 op: TokenType,
2416 group_exprs: Vec<DfExpr>,
2417 input_plan: &LogicalPlan,
2418 ) -> Result<Vec<DfExpr>> {
2419 ensure!(
2420 self.ctx.field_columns.len() == 1,
2421 UnsupportedExprSnafu {
2422 name: "topk or bottomk on multi-value input"
2423 }
2424 );
2425
2426 assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2427
2428 let asc = matches!(op.id(), token::T_BOTTOMK);
2429
2430 let tag_sort_exprs = self
2431 .create_tag_column_exprs()?
2432 .into_iter()
2433 .map(|expr| expr.sort(asc, true));
2434
2435 let exprs: Vec<DfExpr> = self
2437 .ctx
2438 .field_columns
2439 .iter()
2440 .map(|col| {
2441 let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2442 sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2444 sort_exprs.extend(tag_sort_exprs.clone());
2447
2448 DfExpr::WindowFunction(Box::new(WindowFunction {
2449 fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2450 params: WindowFunctionParams {
2451 args: vec![],
2452 partition_by: group_exprs.clone(),
2453 order_by: sort_exprs,
2454 window_frame: WindowFrame::new(Some(true)),
2455 null_treatment: None,
2456 distinct: false,
2457 filter: None,
2458 },
2459 }))
2460 })
2461 .collect();
2462
2463 let normalized_exprs =
2464 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2465 Ok(normalized_exprs)
2466 }
2467
2468 #[deprecated(
2470 note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2471 )]
2472 fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2473 match expr {
2474 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2475 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2476 PromExpr::Unary(UnaryExpr { expr, .. }) => {
2477 Self::try_build_float_literal(expr).map(|f| -f)
2478 }
2479 PromExpr::StringLiteral(_)
2480 | PromExpr::Binary(_)
2481 | PromExpr::VectorSelector(_)
2482 | PromExpr::MatrixSelector(_)
2483 | PromExpr::Call(_)
2484 | PromExpr::Extension(_)
2485 | PromExpr::Aggregate(_)
2486 | PromExpr::Subquery(_) => None,
2487 }
2488 }
2489
2490 async fn create_histogram_plan(
2492 &mut self,
2493 args: &PromFunctionArgs,
2494 query_engine_state: &QueryEngineState,
2495 ) -> Result<LogicalPlan> {
2496 if args.args.len() != 2 {
2497 return FunctionInvalidArgumentSnafu {
2498 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2499 }
2500 .fail();
2501 }
2502 #[allow(deprecated)]
2503 let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2504 FunctionInvalidArgumentSnafu {
2505 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2506 }
2507 })?;
2508
2509 let input = args.args[1].as_ref().clone();
2510 let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
2511
2512 if !self.ctx.has_le_tag() {
2513 return Ok(LogicalPlan::EmptyRelation(
2516 datafusion::logical_expr::EmptyRelation {
2517 produce_one_row: false,
2518 schema: Arc::new(DFSchema::empty()),
2519 },
2520 ));
2521 }
2522 let time_index_column =
2523 self.ctx
2524 .time_index_column
2525 .clone()
2526 .with_context(|| TimeIndexNotFoundSnafu {
2527 table: self.ctx.table_name.clone().unwrap_or_default(),
2528 })?;
2529 let field_column = self
2531 .ctx
2532 .field_columns
2533 .first()
2534 .with_context(|| FunctionInvalidArgumentSnafu {
2535 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2536 })?
2537 .clone();
2538 self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2540
2541 Ok(LogicalPlan::Extension(Extension {
2542 node: Arc::new(
2543 HistogramFold::new(
2544 LE_COLUMN_NAME.to_string(),
2545 field_column,
2546 time_index_column,
2547 phi,
2548 input_plan,
2549 )
2550 .context(DataFusionPlanningSnafu)?,
2551 ),
2552 }))
2553 }
2554
2555 async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
2557 if args.args.len() != 1 {
2558 return FunctionInvalidArgumentSnafu {
2559 fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
2560 }
2561 .fail();
2562 }
2563 let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
2564
2565 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2567 self.ctx.reset_table_name_and_schema();
2568 self.ctx.tag_columns = vec![];
2569 self.ctx.field_columns = vec![greptime_value().to_string()];
2570 Ok(LogicalPlan::Extension(Extension {
2571 node: Arc::new(
2572 EmptyMetric::new(
2573 self.ctx.start,
2574 self.ctx.end,
2575 self.ctx.interval,
2576 SPECIAL_TIME_FUNCTION.to_string(),
2577 greptime_value().to_string(),
2578 Some(lit),
2579 )
2580 .context(DataFusionPlanningSnafu)?,
2581 ),
2582 }))
2583 }
2584
2585 async fn create_scalar_plan(
2587 &mut self,
2588 args: &PromFunctionArgs,
2589 query_engine_state: &QueryEngineState,
2590 ) -> Result<LogicalPlan> {
2591 ensure!(
2592 args.len() == 1,
2593 FunctionInvalidArgumentSnafu {
2594 fn_name: SCALAR_FUNCTION
2595 }
2596 );
2597 let input = self
2598 .prom_expr_to_plan(&args.args[0], query_engine_state)
2599 .await?;
2600 ensure!(
2601 self.ctx.field_columns.len() == 1,
2602 MultiFieldsNotSupportedSnafu {
2603 operator: SCALAR_FUNCTION
2604 },
2605 );
2606 let scalar_plan = LogicalPlan::Extension(Extension {
2607 node: Arc::new(
2608 ScalarCalculate::new(
2609 self.ctx.start,
2610 self.ctx.end,
2611 self.ctx.interval,
2612 input,
2613 self.ctx.time_index_column.as_ref().unwrap(),
2614 &self.ctx.tag_columns,
2615 &self.ctx.field_columns[0],
2616 self.ctx.table_name.as_deref(),
2617 )
2618 .context(PromqlPlanNodeSnafu)?,
2619 ),
2620 });
2621 self.ctx.tag_columns.clear();
2623 self.ctx.field_columns.clear();
2624 self.ctx
2625 .field_columns
2626 .push(scalar_plan.schema().field(1).name().clone());
2627 Ok(scalar_plan)
2628 }
2629
2630 async fn create_absent_plan(
2632 &mut self,
2633 args: &PromFunctionArgs,
2634 query_engine_state: &QueryEngineState,
2635 ) -> Result<LogicalPlan> {
2636 if args.args.len() != 1 {
2637 return FunctionInvalidArgumentSnafu {
2638 fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
2639 }
2640 .fail();
2641 }
2642 let input = self
2643 .prom_expr_to_plan(&args.args[0], query_engine_state)
2644 .await?;
2645
2646 let time_index_expr = self.create_time_index_column_expr()?;
2647 let first_field_expr =
2648 self.create_field_column_exprs()?
2649 .pop()
2650 .with_context(|| ValueNotFoundSnafu {
2651 table: self.ctx.table_name.clone().unwrap_or_default(),
2652 })?;
2653 let first_value_expr = first_value(first_field_expr, vec![]);
2654
2655 let ordered_aggregated_input = LogicalPlanBuilder::from(input)
2656 .aggregate(
2657 vec![time_index_expr.clone()],
2658 vec![first_value_expr.clone()],
2659 )
2660 .context(DataFusionPlanningSnafu)?
2661 .sort(vec![time_index_expr.sort(true, false)])
2662 .context(DataFusionPlanningSnafu)?
2663 .build()
2664 .context(DataFusionPlanningSnafu)?;
2665
2666 let fake_labels = self
2667 .ctx
2668 .selector_matcher
2669 .iter()
2670 .filter_map(|matcher| match matcher.op {
2671 MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
2672 _ => None,
2673 })
2674 .collect::<Vec<_>>();
2675
2676 let absent_plan = LogicalPlan::Extension(Extension {
2678 node: Arc::new(
2679 Absent::try_new(
2680 self.ctx.start,
2681 self.ctx.end,
2682 self.ctx.interval,
2683 self.ctx.time_index_column.as_ref().unwrap().clone(),
2684 self.ctx.field_columns[0].clone(),
2685 fake_labels,
2686 ordered_aggregated_input,
2687 )
2688 .context(DataFusionPlanningSnafu)?,
2689 ),
2690 });
2691
2692 Ok(absent_plan)
2693 }
2694
2695 fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
2698 match expr {
2699 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
2700 PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
2701 PromExpr::VectorSelector(_)
2702 | PromExpr::MatrixSelector(_)
2703 | PromExpr::Extension(_)
2704 | PromExpr::Aggregate(_)
2705 | PromExpr::Subquery(_) => None,
2706 PromExpr::Call(Call { func, .. }) => {
2707 if func.name == SPECIAL_TIME_FUNCTION {
2708 None
2711 } else {
2712 None
2713 }
2714 }
2715 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
2716 PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
2718 PromExpr::Binary(PromBinaryExpr {
2719 lhs,
2720 rhs,
2721 op,
2722 modifier,
2723 }) => {
2724 let lhs = Self::try_build_literal_expr(lhs)?;
2725 let rhs = Self::try_build_literal_expr(rhs)?;
2726 let is_comparison_op = Self::is_token_a_comparison_op(*op);
2727 let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
2728 let expr = expr_builder(lhs, rhs).ok()?;
2729
2730 let should_return_bool = if let Some(m) = modifier {
2731 m.return_bool
2732 } else {
2733 false
2734 };
2735 if is_comparison_op && should_return_bool {
2736 Some(DfExpr::Cast(Cast {
2737 expr: Box::new(expr),
2738 data_type: ArrowDataType::Float64,
2739 }))
2740 } else {
2741 Some(expr)
2742 }
2743 }
2744 }
2745 }
2746
2747 fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
2748 match expr {
2749 PromExpr::Call(Call { func, .. }) => {
2750 if func.name == SPECIAL_TIME_FUNCTION
2751 && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
2752 {
2753 Some(build_special_time_expr(time_index_col))
2754 } else {
2755 None
2756 }
2757 }
2758 _ => None,
2759 }
2760 }
2761
2762 #[allow(clippy::type_complexity)]
2765 fn prom_token_to_binary_expr_builder(
2766 token: TokenType,
2767 ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
2768 match token.id() {
2769 token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
2770 token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
2771 token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
2772 token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
2773 token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
2774 token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
2775 token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
2776 token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
2777 token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
2778 token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
2779 token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
2780 token::T_POW => Ok(Box::new(|lhs, rhs| {
2781 Ok(DfExpr::ScalarFunction(ScalarFunction {
2782 func: datafusion_functions::math::power(),
2783 args: vec![lhs, rhs],
2784 }))
2785 })),
2786 token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
2787 Ok(DfExpr::ScalarFunction(ScalarFunction {
2788 func: datafusion_functions::math::atan2(),
2789 args: vec![lhs, rhs],
2790 }))
2791 })),
2792 _ => UnexpectedTokenSnafu { token }.fail(),
2793 }
2794 }
2795
2796 fn is_token_a_comparison_op(token: TokenType) -> bool {
2798 matches!(
2799 token.id(),
2800 token::T_EQLC
2801 | token::T_NEQ
2802 | token::T_GTR
2803 | token::T_LSS
2804 | token::T_GTE
2805 | token::T_LTE
2806 )
2807 }
2808
2809 fn is_token_a_set_op(token: TokenType) -> bool {
2811 matches!(
2812 token.id(),
2813 token::T_LAND | token::T_LOR | token::T_LUNLESS )
2817 }
2818
2819 #[allow(clippy::too_many_arguments)]
2822 fn join_on_non_field_columns(
2823 &self,
2824 left: LogicalPlan,
2825 right: LogicalPlan,
2826 left_table_ref: TableReference,
2827 right_table_ref: TableReference,
2828 left_time_index_column: Option<String>,
2829 right_time_index_column: Option<String>,
2830 only_join_time_index: bool,
2831 modifier: &Option<BinModifier>,
2832 ) -> Result<LogicalPlan> {
2833 let mut left_tag_columns = if only_join_time_index {
2834 BTreeSet::new()
2835 } else {
2836 self.ctx
2837 .tag_columns
2838 .iter()
2839 .cloned()
2840 .collect::<BTreeSet<_>>()
2841 };
2842 let mut right_tag_columns = left_tag_columns.clone();
2843
2844 if let Some(modifier) = modifier {
2846 if let Some(matching) = &modifier.matching {
2848 match matching {
2849 LabelModifier::Include(on) => {
2851 let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
2852 left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
2853 right_tag_columns =
2854 right_tag_columns.intersection(&mask).cloned().collect();
2855 }
2856 LabelModifier::Exclude(ignoring) => {
2858 for label in &ignoring.labels {
2860 let _ = left_tag_columns.remove(label);
2861 let _ = right_tag_columns.remove(label);
2862 }
2863 }
2864 }
2865 }
2866 }
2867
2868 if let (Some(left_time_index_column), Some(right_time_index_column)) =
2870 (left_time_index_column, right_time_index_column)
2871 {
2872 left_tag_columns.insert(left_time_index_column);
2873 right_tag_columns.insert(right_time_index_column);
2874 }
2875
2876 let right = LogicalPlanBuilder::from(right)
2877 .alias(right_table_ref)
2878 .context(DataFusionPlanningSnafu)?
2879 .build()
2880 .context(DataFusionPlanningSnafu)?;
2881
2882 LogicalPlanBuilder::from(left)
2884 .alias(left_table_ref)
2885 .context(DataFusionPlanningSnafu)?
2886 .join_detailed(
2887 right,
2888 JoinType::Inner,
2889 (
2890 left_tag_columns
2891 .into_iter()
2892 .map(Column::from_name)
2893 .collect::<Vec<_>>(),
2894 right_tag_columns
2895 .into_iter()
2896 .map(Column::from_name)
2897 .collect::<Vec<_>>(),
2898 ),
2899 None,
2900 NullEquality::NullEqualsNull,
2901 )
2902 .context(DataFusionPlanningSnafu)?
2903 .build()
2904 .context(DataFusionPlanningSnafu)
2905 }
2906
2907 fn set_op_on_non_field_columns(
2909 &mut self,
2910 left: LogicalPlan,
2911 mut right: LogicalPlan,
2912 left_context: PromPlannerContext,
2913 right_context: PromPlannerContext,
2914 op: TokenType,
2915 modifier: &Option<BinModifier>,
2916 ) -> Result<LogicalPlan> {
2917 let mut left_tag_col_set = left_context
2918 .tag_columns
2919 .iter()
2920 .cloned()
2921 .collect::<HashSet<_>>();
2922 let mut right_tag_col_set = right_context
2923 .tag_columns
2924 .iter()
2925 .cloned()
2926 .collect::<HashSet<_>>();
2927
2928 if matches!(op.id(), token::T_LOR) {
2929 return self.or_operator(
2930 left,
2931 right,
2932 left_tag_col_set,
2933 right_tag_col_set,
2934 left_context,
2935 right_context,
2936 modifier,
2937 );
2938 }
2939
2940 if let Some(modifier) = modifier {
2942 ensure!(
2944 matches!(
2945 modifier.card,
2946 VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
2947 ),
2948 UnsupportedVectorMatchSnafu {
2949 name: modifier.card.clone(),
2950 },
2951 );
2952 if let Some(matching) = &modifier.matching {
2954 match matching {
2955 LabelModifier::Include(on) => {
2957 let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
2958 left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
2959 right_tag_col_set =
2960 right_tag_col_set.intersection(&mask).cloned().collect();
2961 }
2962 LabelModifier::Exclude(ignoring) => {
2964 for label in &ignoring.labels {
2966 let _ = left_tag_col_set.remove(label);
2967 let _ = right_tag_col_set.remove(label);
2968 }
2969 }
2970 }
2971 }
2972 }
2973 if !matches!(op.id(), token::T_LOR) {
2975 ensure!(
2976 left_tag_col_set == right_tag_col_set,
2977 CombineTableColumnMismatchSnafu {
2978 left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
2979 right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
2980 }
2981 )
2982 };
2983 let left_time_index = left_context.time_index_column.clone().unwrap();
2984 let right_time_index = right_context.time_index_column.clone().unwrap();
2985 let join_keys = left_tag_col_set
2986 .iter()
2987 .cloned()
2988 .chain([left_time_index.clone()])
2989 .collect::<Vec<_>>();
2990 self.ctx.time_index_column = Some(left_time_index.clone());
2991
2992 if left_context.time_index_column != right_context.time_index_column {
2994 let right_project_exprs = right
2995 .schema()
2996 .fields()
2997 .iter()
2998 .map(|field| {
2999 if field.name() == &right_time_index {
3000 DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
3001 } else {
3002 DfExpr::Column(Column::from_name(field.name()))
3003 }
3004 })
3005 .collect::<Vec<_>>();
3006
3007 right = LogicalPlanBuilder::from(right)
3008 .project(right_project_exprs)
3009 .context(DataFusionPlanningSnafu)?
3010 .build()
3011 .context(DataFusionPlanningSnafu)?;
3012 }
3013
3014 ensure!(
3015 left_context.field_columns.len() == 1,
3016 MultiFieldsNotSupportedSnafu {
3017 operator: "AND operator"
3018 }
3019 );
3020 let left_field_col = left_context.field_columns.first().unwrap();
3023 self.ctx.field_columns = vec![left_field_col.clone()];
3024
3025 match op.id() {
3028 token::T_LAND => LogicalPlanBuilder::from(left)
3029 .distinct()
3030 .context(DataFusionPlanningSnafu)?
3031 .join_detailed(
3032 right,
3033 JoinType::LeftSemi,
3034 (join_keys.clone(), join_keys),
3035 None,
3036 NullEquality::NullEqualsNull,
3037 )
3038 .context(DataFusionPlanningSnafu)?
3039 .build()
3040 .context(DataFusionPlanningSnafu),
3041 token::T_LUNLESS => LogicalPlanBuilder::from(left)
3042 .distinct()
3043 .context(DataFusionPlanningSnafu)?
3044 .join_detailed(
3045 right,
3046 JoinType::LeftAnti,
3047 (join_keys.clone(), join_keys),
3048 None,
3049 NullEquality::NullEqualsNull,
3050 )
3051 .context(DataFusionPlanningSnafu)?
3052 .build()
3053 .context(DataFusionPlanningSnafu),
3054 token::T_LOR => {
3055 unreachable!()
3058 }
3059 _ => UnexpectedTokenSnafu { token: op }.fail(),
3060 }
3061 }
3062
3063 #[allow(clippy::too_many_arguments)]
3065 fn or_operator(
3066 &mut self,
3067 left: LogicalPlan,
3068 right: LogicalPlan,
3069 left_tag_cols_set: HashSet<String>,
3070 right_tag_cols_set: HashSet<String>,
3071 left_context: PromPlannerContext,
3072 right_context: PromPlannerContext,
3073 modifier: &Option<BinModifier>,
3074 ) -> Result<LogicalPlan> {
3075 ensure!(
3077 left_context.field_columns.len() == right_context.field_columns.len(),
3078 CombineTableColumnMismatchSnafu {
3079 left: left_context.field_columns.clone(),
3080 right: right_context.field_columns.clone()
3081 }
3082 );
3083 ensure!(
3084 left_context.field_columns.len() == 1,
3085 MultiFieldsNotSupportedSnafu {
3086 operator: "OR operator"
3087 }
3088 );
3089
3090 let all_tags = left_tag_cols_set
3092 .union(&right_tag_cols_set)
3093 .cloned()
3094 .collect::<HashSet<_>>();
3095 let tags_not_in_left = all_tags
3096 .difference(&left_tag_cols_set)
3097 .cloned()
3098 .collect::<Vec<_>>();
3099 let tags_not_in_right = all_tags
3100 .difference(&right_tag_cols_set)
3101 .cloned()
3102 .collect::<Vec<_>>();
3103 let left_qualifier = left.schema().qualified_field(0).0.cloned();
3104 let right_qualifier = right.schema().qualified_field(0).0.cloned();
3105 let left_qualifier_string = left_qualifier
3106 .as_ref()
3107 .map(|l| l.to_string())
3108 .unwrap_or_default();
3109 let right_qualifier_string = right_qualifier
3110 .as_ref()
3111 .map(|r| r.to_string())
3112 .unwrap_or_default();
3113 let left_time_index_column =
3114 left_context
3115 .time_index_column
3116 .clone()
3117 .with_context(|| TimeIndexNotFoundSnafu {
3118 table: left_qualifier_string.clone(),
3119 })?;
3120 let right_time_index_column =
3121 right_context
3122 .time_index_column
3123 .clone()
3124 .with_context(|| TimeIndexNotFoundSnafu {
3125 table: right_qualifier_string.clone(),
3126 })?;
3127 let left_field_col = left_context.field_columns.first().unwrap();
3129 let right_field_col = right_context.field_columns.first().unwrap();
3130
3131 let mut all_columns_set = left
3133 .schema()
3134 .fields()
3135 .iter()
3136 .chain(right.schema().fields().iter())
3137 .map(|field| field.name().clone())
3138 .collect::<HashSet<_>>();
3139 all_columns_set.remove(&left_time_index_column);
3141 all_columns_set.remove(&right_time_index_column);
3142 if left_field_col != right_field_col {
3144 all_columns_set.remove(right_field_col);
3145 }
3146 let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3147 all_columns.sort_unstable();
3149 all_columns.insert(0, left_time_index_column.clone());
3151
3152 let left_proj_exprs = all_columns.iter().map(|col| {
3154 if tags_not_in_left.contains(col) {
3155 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3156 } else {
3157 DfExpr::Column(Column::new(None::<String>, col))
3158 }
3159 });
3160 let right_time_index_expr = DfExpr::Column(Column::new(
3161 right_qualifier.clone(),
3162 right_time_index_column,
3163 ))
3164 .alias(left_time_index_column.clone());
3165 let right_qualifier_for_field = right
3168 .schema()
3169 .iter()
3170 .find(|(_, f)| f.name() == right_field_col)
3171 .map(|(q, _)| q)
3172 .with_context(|| ColumnNotFoundSnafu {
3173 col: right_field_col.clone(),
3174 })?
3175 .cloned();
3176
3177 let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3179 if col == left_field_col && left_field_col != right_field_col {
3181 DfExpr::Column(Column::new(
3183 right_qualifier_for_field.clone(),
3184 right_field_col,
3185 ))
3186 } else if tags_not_in_right.contains(col) {
3187 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3188 } else {
3189 DfExpr::Column(Column::new(None::<String>, col))
3190 }
3191 });
3192 let right_proj_exprs = [right_time_index_expr]
3193 .into_iter()
3194 .chain(right_proj_exprs_without_time_index);
3195
3196 let left_projected = LogicalPlanBuilder::from(left)
3197 .project(left_proj_exprs)
3198 .context(DataFusionPlanningSnafu)?
3199 .alias(left_qualifier_string.clone())
3200 .context(DataFusionPlanningSnafu)?
3201 .build()
3202 .context(DataFusionPlanningSnafu)?;
3203 let right_projected = LogicalPlanBuilder::from(right)
3204 .project(right_proj_exprs)
3205 .context(DataFusionPlanningSnafu)?
3206 .alias(right_qualifier_string.clone())
3207 .context(DataFusionPlanningSnafu)?
3208 .build()
3209 .context(DataFusionPlanningSnafu)?;
3210
3211 let mut match_columns = if let Some(modifier) = modifier
3213 && let Some(matching) = &modifier.matching
3214 {
3215 match matching {
3216 LabelModifier::Include(on) => on.labels.clone(),
3218 LabelModifier::Exclude(ignoring) => {
3220 let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3221 all_tags.difference(&ignoring).cloned().collect()
3222 }
3223 }
3224 } else {
3225 all_tags.iter().cloned().collect()
3226 };
3227 match_columns.sort_unstable();
3229 let schema = left_projected.schema().clone();
3231 let union_distinct_on = UnionDistinctOn::new(
3232 left_projected,
3233 right_projected,
3234 match_columns,
3235 left_time_index_column.clone(),
3236 schema,
3237 );
3238 let result = LogicalPlan::Extension(Extension {
3239 node: Arc::new(union_distinct_on),
3240 });
3241
3242 self.ctx.time_index_column = Some(left_time_index_column);
3244 self.ctx.tag_columns = all_tags.into_iter().collect();
3245 self.ctx.field_columns = vec![left_field_col.clone()];
3246
3247 Ok(result)
3248 }
3249
3250 fn projection_for_each_field_column<F>(
3258 &mut self,
3259 input: LogicalPlan,
3260 name_to_expr: F,
3261 ) -> Result<LogicalPlan>
3262 where
3263 F: FnMut(&String) -> Result<DfExpr>,
3264 {
3265 let non_field_columns_iter = self
3266 .ctx
3267 .tag_columns
3268 .iter()
3269 .chain(self.ctx.time_index_column.iter())
3270 .map(|col| {
3271 Ok(DfExpr::Column(Column::new(
3272 self.ctx.table_name.clone().map(TableReference::bare),
3273 col,
3274 )))
3275 });
3276
3277 let result_field_columns = self
3279 .ctx
3280 .field_columns
3281 .iter()
3282 .map(name_to_expr)
3283 .collect::<Result<Vec<_>>>()?;
3284
3285 self.ctx.field_columns = result_field_columns
3287 .iter()
3288 .map(|expr| expr.schema_name().to_string())
3289 .collect();
3290 let field_columns_iter = result_field_columns
3291 .into_iter()
3292 .zip(self.ctx.field_columns.iter())
3293 .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3294
3295 let project_fields = non_field_columns_iter
3297 .chain(field_columns_iter)
3298 .collect::<Result<Vec<_>>>()?;
3299
3300 LogicalPlanBuilder::from(input)
3301 .project(project_fields)
3302 .context(DataFusionPlanningSnafu)?
3303 .build()
3304 .context(DataFusionPlanningSnafu)
3305 }
3306
3307 fn filter_on_field_column<F>(
3310 &self,
3311 input: LogicalPlan,
3312 mut name_to_expr: F,
3313 ) -> Result<LogicalPlan>
3314 where
3315 F: FnMut(&String) -> Result<DfExpr>,
3316 {
3317 ensure!(
3318 self.ctx.field_columns.len() == 1,
3319 UnsupportedExprSnafu {
3320 name: "filter on multi-value input"
3321 }
3322 );
3323
3324 let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3325
3326 LogicalPlanBuilder::from(input)
3327 .filter(field_column_filter)
3328 .context(DataFusionPlanningSnafu)?
3329 .build()
3330 .context(DataFusionPlanningSnafu)
3331 }
3332
3333 fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3336 let input_expr = datafusion::logical_expr::col(
3337 self.ctx
3338 .time_index_column
3339 .as_ref()
3340 .with_context(|| TimeIndexNotFoundSnafu {
3342 table: "<doesn't matter>",
3343 })?,
3344 );
3345 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3346 func: datafusion_functions::datetime::date_part(),
3347 args: vec![date_part.lit(), input_expr],
3348 });
3349 Ok(fn_expr)
3350 }
3351
3352 fn apply_alias_projection(
3354 &mut self,
3355 plan: LogicalPlan,
3356 alias_name: String,
3357 ) -> Result<LogicalPlan> {
3358 let fields_expr = self.create_field_column_exprs()?;
3359
3360 ensure!(
3362 fields_expr.len() == 1,
3363 UnsupportedExprSnafu {
3364 name: "alias on multi-value result"
3365 }
3366 );
3367
3368 let project_fields = fields_expr
3369 .into_iter()
3370 .map(|expr| expr.alias(&alias_name))
3371 .chain(self.create_tag_column_exprs()?)
3372 .chain(Some(self.create_time_index_column_expr()?));
3373
3374 LogicalPlanBuilder::from(plan)
3375 .project(project_fields)
3376 .context(DataFusionPlanningSnafu)?
3377 .build()
3378 .context(DataFusionPlanningSnafu)
3379 }
3380}
3381
3382#[derive(Default, Debug)]
3383struct FunctionArgs {
3384 input: Option<PromExpr>,
3385 literals: Vec<DfExpr>,
3386}
3387
3388#[derive(Debug, Clone)]
3391enum ScalarFunc {
3392 DataFusionBuiltin(Arc<ScalarUdfDef>),
3396 DataFusionUdf(Arc<ScalarUdfDef>),
3400 Udf(Arc<ScalarUdfDef>),
3405 ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3412 GeneratedExpr,
3416}
3417
3418#[cfg(test)]
3419mod test {
3420 use std::time::{Duration, UNIX_EPOCH};
3421
3422 use catalog::RegisterTableRequest;
3423 use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
3424 use common_base::Plugins;
3425 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3426 use common_query::prelude::greptime_timestamp;
3427 use common_query::test_util::DummyDecoder;
3428 use datatypes::prelude::ConcreteDataType;
3429 use datatypes::schema::{ColumnSchema, Schema};
3430 use promql_parser::label::Labels;
3431 use promql_parser::parser;
3432 use session::context::QueryContext;
3433 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3434 use table::test_util::EmptyTable;
3435
3436 use super::*;
3437 use crate::options::QueryOptions;
3438
3439 fn build_query_engine_state() -> QueryEngineState {
3440 QueryEngineState::new(
3441 new_memory_catalog_manager().unwrap(),
3442 None,
3443 None,
3444 None,
3445 None,
3446 None,
3447 false,
3448 Plugins::default(),
3449 QueryOptions::default(),
3450 )
3451 }
3452
3453 async fn build_test_table_provider(
3454 table_name_tuples: &[(String, String)],
3455 num_tag: usize,
3456 num_field: usize,
3457 ) -> DfTableSourceProvider {
3458 let catalog_list = MemoryCatalogManager::with_default_setup();
3459 for (schema_name, table_name) in table_name_tuples {
3460 let mut columns = vec![];
3461 for i in 0..num_tag {
3462 columns.push(ColumnSchema::new(
3463 format!("tag_{i}"),
3464 ConcreteDataType::string_datatype(),
3465 false,
3466 ));
3467 }
3468 columns.push(
3469 ColumnSchema::new(
3470 "timestamp".to_string(),
3471 ConcreteDataType::timestamp_millisecond_datatype(),
3472 false,
3473 )
3474 .with_time_index(true),
3475 );
3476 for i in 0..num_field {
3477 columns.push(ColumnSchema::new(
3478 format!("field_{i}"),
3479 ConcreteDataType::float64_datatype(),
3480 true,
3481 ));
3482 }
3483 let schema = Arc::new(Schema::new(columns));
3484 let table_meta = TableMetaBuilder::empty()
3485 .schema(schema)
3486 .primary_key_indices((0..num_tag).collect())
3487 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3488 .next_column_id(1024)
3489 .build()
3490 .unwrap();
3491 let table_info = TableInfoBuilder::default()
3492 .name(table_name.clone())
3493 .meta(table_meta)
3494 .build()
3495 .unwrap();
3496 let table = EmptyTable::from_table_info(&table_info);
3497
3498 assert!(
3499 catalog_list
3500 .register_table_sync(RegisterTableRequest {
3501 catalog: DEFAULT_CATALOG_NAME.to_string(),
3502 schema: schema_name.clone(),
3503 table_name: table_name.clone(),
3504 table_id: 1024,
3505 table,
3506 })
3507 .is_ok()
3508 );
3509 }
3510
3511 DfTableSourceProvider::new(
3512 catalog_list,
3513 false,
3514 QueryContext::arc(),
3515 DummyDecoder::arc(),
3516 false,
3517 )
3518 }
3519
3520 async fn build_test_table_provider_with_fields(
3521 table_name_tuples: &[(String, String)],
3522 tags: &[&str],
3523 ) -> DfTableSourceProvider {
3524 let catalog_list = MemoryCatalogManager::with_default_setup();
3525 for (schema_name, table_name) in table_name_tuples {
3526 let mut columns = vec![];
3527 let num_tag = tags.len();
3528 for tag in tags {
3529 columns.push(ColumnSchema::new(
3530 tag.to_string(),
3531 ConcreteDataType::string_datatype(),
3532 false,
3533 ));
3534 }
3535 columns.push(
3536 ColumnSchema::new(
3537 greptime_timestamp().to_string(),
3538 ConcreteDataType::timestamp_millisecond_datatype(),
3539 false,
3540 )
3541 .with_time_index(true),
3542 );
3543 columns.push(ColumnSchema::new(
3544 greptime_value().to_string(),
3545 ConcreteDataType::float64_datatype(),
3546 true,
3547 ));
3548 let schema = Arc::new(Schema::new(columns));
3549 let table_meta = TableMetaBuilder::empty()
3550 .schema(schema)
3551 .primary_key_indices((0..num_tag).collect())
3552 .next_column_id(1024)
3553 .build()
3554 .unwrap();
3555 let table_info = TableInfoBuilder::default()
3556 .name(table_name.clone())
3557 .meta(table_meta)
3558 .build()
3559 .unwrap();
3560 let table = EmptyTable::from_table_info(&table_info);
3561
3562 assert!(
3563 catalog_list
3564 .register_table_sync(RegisterTableRequest {
3565 catalog: DEFAULT_CATALOG_NAME.to_string(),
3566 schema: schema_name.clone(),
3567 table_name: table_name.clone(),
3568 table_id: 1024,
3569 table,
3570 })
3571 .is_ok()
3572 );
3573 }
3574
3575 DfTableSourceProvider::new(
3576 catalog_list,
3577 false,
3578 QueryContext::arc(),
3579 DummyDecoder::arc(),
3580 false,
3581 )
3582 }
3583
3584 async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
3600 let prom_expr =
3601 parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
3602 let eval_stmt = EvalStmt {
3603 expr: prom_expr,
3604 start: UNIX_EPOCH,
3605 end: UNIX_EPOCH
3606 .checked_add(Duration::from_secs(100_000))
3607 .unwrap(),
3608 interval: Duration::from_secs(5),
3609 lookback_delta: Duration::from_secs(1),
3610 };
3611
3612 let table_provider = build_test_table_provider(
3613 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3614 1,
3615 1,
3616 )
3617 .await;
3618 let plan =
3619 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3620 .await
3621 .unwrap();
3622
3623 let expected = String::from(
3624 "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3625 \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
3626 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3627 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3628 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3629 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3630 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
3631 ).replace("TEMPLATE", plan_name);
3632
3633 assert_eq!(plan.display_indent_schema().to_string(), expected);
3634 }
3635
3636 #[tokio::test]
3637 async fn single_abs() {
3638 do_single_instant_function_call("abs", "abs").await;
3639 }
3640
3641 #[tokio::test]
3642 #[should_panic]
3643 async fn single_absent() {
3644 do_single_instant_function_call("absent", "").await;
3645 }
3646
3647 #[tokio::test]
3648 async fn single_ceil() {
3649 do_single_instant_function_call("ceil", "ceil").await;
3650 }
3651
3652 #[tokio::test]
3653 async fn single_exp() {
3654 do_single_instant_function_call("exp", "exp").await;
3655 }
3656
3657 #[tokio::test]
3658 async fn single_ln() {
3659 do_single_instant_function_call("ln", "ln").await;
3660 }
3661
3662 #[tokio::test]
3663 async fn single_log2() {
3664 do_single_instant_function_call("log2", "log2").await;
3665 }
3666
3667 #[tokio::test]
3668 async fn single_log10() {
3669 do_single_instant_function_call("log10", "log10").await;
3670 }
3671
3672 #[tokio::test]
3673 #[should_panic]
3674 async fn single_scalar() {
3675 do_single_instant_function_call("scalar", "").await;
3676 }
3677
3678 #[tokio::test]
3679 #[should_panic]
3680 async fn single_sgn() {
3681 do_single_instant_function_call("sgn", "").await;
3682 }
3683
3684 #[tokio::test]
3685 #[should_panic]
3686 async fn single_sort() {
3687 do_single_instant_function_call("sort", "").await;
3688 }
3689
3690 #[tokio::test]
3691 #[should_panic]
3692 async fn single_sort_desc() {
3693 do_single_instant_function_call("sort_desc", "").await;
3694 }
3695
3696 #[tokio::test]
3697 async fn single_sqrt() {
3698 do_single_instant_function_call("sqrt", "sqrt").await;
3699 }
3700
3701 #[tokio::test]
3702 #[should_panic]
3703 async fn single_timestamp() {
3704 do_single_instant_function_call("timestamp", "").await;
3705 }
3706
3707 #[tokio::test]
3708 async fn single_acos() {
3709 do_single_instant_function_call("acos", "acos").await;
3710 }
3711
3712 #[tokio::test]
3713 #[should_panic]
3714 async fn single_acosh() {
3715 do_single_instant_function_call("acosh", "").await;
3716 }
3717
3718 #[tokio::test]
3719 async fn single_asin() {
3720 do_single_instant_function_call("asin", "asin").await;
3721 }
3722
3723 #[tokio::test]
3724 #[should_panic]
3725 async fn single_asinh() {
3726 do_single_instant_function_call("asinh", "").await;
3727 }
3728
3729 #[tokio::test]
3730 async fn single_atan() {
3731 do_single_instant_function_call("atan", "atan").await;
3732 }
3733
3734 #[tokio::test]
3735 #[should_panic]
3736 async fn single_atanh() {
3737 do_single_instant_function_call("atanh", "").await;
3738 }
3739
3740 #[tokio::test]
3741 async fn single_cos() {
3742 do_single_instant_function_call("cos", "cos").await;
3743 }
3744
3745 #[tokio::test]
3746 #[should_panic]
3747 async fn single_cosh() {
3748 do_single_instant_function_call("cosh", "").await;
3749 }
3750
3751 #[tokio::test]
3752 async fn single_sin() {
3753 do_single_instant_function_call("sin", "sin").await;
3754 }
3755
3756 #[tokio::test]
3757 #[should_panic]
3758 async fn single_sinh() {
3759 do_single_instant_function_call("sinh", "").await;
3760 }
3761
3762 #[tokio::test]
3763 async fn single_tan() {
3764 do_single_instant_function_call("tan", "tan").await;
3765 }
3766
3767 #[tokio::test]
3768 #[should_panic]
3769 async fn single_tanh() {
3770 do_single_instant_function_call("tanh", "").await;
3771 }
3772
3773 #[tokio::test]
3774 #[should_panic]
3775 async fn single_deg() {
3776 do_single_instant_function_call("deg", "").await;
3777 }
3778
3779 #[tokio::test]
3780 #[should_panic]
3781 async fn single_rad() {
3782 do_single_instant_function_call("rad", "").await;
3783 }
3784
3785 async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
3807 let prom_expr = parser::parse(&format!(
3808 "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
3809 ))
3810 .unwrap();
3811 let mut eval_stmt = EvalStmt {
3812 expr: prom_expr,
3813 start: UNIX_EPOCH,
3814 end: UNIX_EPOCH
3815 .checked_add(Duration::from_secs(100_000))
3816 .unwrap(),
3817 interval: Duration::from_secs(5),
3818 lookback_delta: Duration::from_secs(1),
3819 };
3820
3821 let table_provider = build_test_table_provider(
3823 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3824 2,
3825 2,
3826 )
3827 .await;
3828 let plan =
3829 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3830 .await
3831 .unwrap();
3832 let expected_no_without = String::from(
3833 "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3834 \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3835 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3836 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3837 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3838 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3839 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3840 ).replace("TEMPLATE", plan_name);
3841 assert_eq!(
3842 plan.display_indent_schema().to_string(),
3843 expected_no_without
3844 );
3845
3846 if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
3848 *modifier = Some(LabelModifier::Exclude(Labels {
3849 labels: vec![String::from("tag_1")].into_iter().collect(),
3850 }));
3851 }
3852 let table_provider = build_test_table_provider(
3853 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3854 2,
3855 2,
3856 )
3857 .await;
3858 let plan =
3859 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3860 .await
3861 .unwrap();
3862 let expected_without = String::from(
3863 "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3864 \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
3865 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3866 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3867 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3868 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
3869 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
3870 ).replace("TEMPLATE", plan_name);
3871 assert_eq!(plan.display_indent_schema().to_string(), expected_without);
3872 }
3873
3874 #[tokio::test]
3875 async fn aggregate_sum() {
3876 do_aggregate_expr_plan("sum", "sum").await;
3877 }
3878
3879 #[tokio::test]
3880 async fn aggregate_avg() {
3881 do_aggregate_expr_plan("avg", "avg").await;
3882 }
3883
3884 #[tokio::test]
3885 #[should_panic] async fn aggregate_count() {
3887 do_aggregate_expr_plan("count", "count").await;
3888 }
3889
3890 #[tokio::test]
3891 async fn aggregate_min() {
3892 do_aggregate_expr_plan("min", "min").await;
3893 }
3894
3895 #[tokio::test]
3896 async fn aggregate_max() {
3897 do_aggregate_expr_plan("max", "max").await;
3898 }
3899
3900 #[tokio::test]
3901 #[should_panic] async fn aggregate_group() {
3903 do_aggregate_expr_plan("grouping", "GROUPING").await;
3904 }
3905
3906 #[tokio::test]
3907 async fn aggregate_stddev() {
3908 do_aggregate_expr_plan("stddev", "stddev_pop").await;
3909 }
3910
3911 #[tokio::test]
3912 async fn aggregate_stdvar() {
3913 do_aggregate_expr_plan("stdvar", "var_pop").await;
3914 }
3915
3916 #[tokio::test]
3940 async fn binary_op_column_column() {
3941 let prom_expr =
3942 parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
3943 let eval_stmt = EvalStmt {
3944 expr: prom_expr,
3945 start: UNIX_EPOCH,
3946 end: UNIX_EPOCH
3947 .checked_add(Duration::from_secs(100_000))
3948 .unwrap(),
3949 interval: Duration::from_secs(5),
3950 lookback_delta: Duration::from_secs(1),
3951 };
3952
3953 let table_provider = build_test_table_provider(
3954 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
3955 1,
3956 1,
3957 )
3958 .await;
3959 let plan =
3960 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
3961 .await
3962 .unwrap();
3963
3964 let expected = String::from(
3965 "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
3966 \n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3967 \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3968 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3969 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3970 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3971 \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3972 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3973 \n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3974 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3975 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3976 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3977 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
3978 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
3979 );
3980
3981 assert_eq!(plan.display_indent_schema().to_string(), expected);
3982 }
3983
3984 async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
3985 let prom_expr = parser::parse(query).unwrap();
3986 let eval_stmt = EvalStmt {
3987 expr: prom_expr,
3988 start: UNIX_EPOCH,
3989 end: UNIX_EPOCH
3990 .checked_add(Duration::from_secs(100_000))
3991 .unwrap(),
3992 interval: Duration::from_secs(5),
3993 lookback_delta: Duration::from_secs(1),
3994 };
3995
3996 let table_provider = build_test_table_provider(
3997 &[
3998 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
3999 (
4000 "greptime_private".to_string(),
4001 "some_alt_metric".to_string(),
4002 ),
4003 ],
4004 1,
4005 1,
4006 )
4007 .await;
4008 let plan =
4009 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4010 .await
4011 .unwrap();
4012
4013 assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
4014 }
4015
4016 #[tokio::test]
4017 async fn binary_op_literal_column() {
4018 let query = r#"1 + some_metric{tag_0="bar"}"#;
4019 let expected = String::from(
4020 "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
4021 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4022 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4023 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4024 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4025 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4026 );
4027
4028 indie_query_plan_compare(query, expected).await;
4029 }
4030
4031 #[tokio::test]
4032 async fn binary_op_literal_literal() {
4033 let query = r#"1 + 1"#;
4034 let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
4035 TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
4036 indie_query_plan_compare(query, expected).await;
4037 }
4038
4039 #[tokio::test]
4040 async fn simple_bool_grammar() {
4041 let query = "some_metric != bool 1.2345";
4042 let expected = String::from(
4043 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
4044 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4045 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4046 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4047 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4048 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4049 );
4050
4051 indie_query_plan_compare(query, expected).await;
4052 }
4053
4054 #[tokio::test]
4055 async fn bool_with_additional_arithmetic() {
4056 let query = "some_metric + (1 == bool 2)";
4057 let expected = String::from(
4058 "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
4059 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4060 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4061 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4062 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4063 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4064 );
4065
4066 indie_query_plan_compare(query, expected).await;
4067 }
4068
4069 #[tokio::test]
4070 async fn simple_unary() {
4071 let query = "-some_metric";
4072 let expected = String::from(
4073 "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
4074 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4075 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4076 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4077 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4078 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4079 );
4080
4081 indie_query_plan_compare(query, expected).await;
4082 }
4083
4084 #[tokio::test]
4085 async fn increase_aggr() {
4086 let query = "increase(some_metric[5m])";
4087 let expected = String::from(
4088 "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4089 \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
4090 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4091 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4092 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4093 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4094 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4095 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4096 );
4097
4098 indie_query_plan_compare(query, expected).await;
4099 }
4100
4101 #[tokio::test]
4102 async fn less_filter_on_value() {
4103 let query = "some_metric < 1.2345";
4104 let expected = String::from(
4105 "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4106 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4107 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4108 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4109 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4110 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4111 );
4112
4113 indie_query_plan_compare(query, expected).await;
4114 }
4115
4116 #[tokio::test]
4117 async fn count_over_time() {
4118 let query = "count_over_time(some_metric[5m])";
4119 let expected = String::from(
4120 "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4121 \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
4122 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4123 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4124 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4125 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4126 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4127 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4128 );
4129
4130 indie_query_plan_compare(query, expected).await;
4131 }
4132
4133 #[tokio::test]
4134 async fn test_hash_join() {
4135 let mut eval_stmt = EvalStmt {
4136 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4137 start: UNIX_EPOCH,
4138 end: UNIX_EPOCH
4139 .checked_add(Duration::from_secs(100_000))
4140 .unwrap(),
4141 interval: Duration::from_secs(5),
4142 lookback_delta: Duration::from_secs(1),
4143 };
4144
4145 let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
4146
4147 let prom_expr = parser::parse(case).unwrap();
4148 eval_stmt.expr = prom_expr;
4149 let table_provider = build_test_table_provider_with_fields(
4150 &[
4151 (
4152 DEFAULT_SCHEMA_NAME.to_string(),
4153 "http_server_requests_seconds_sum".to_string(),
4154 ),
4155 (
4156 DEFAULT_SCHEMA_NAME.to_string(),
4157 "http_server_requests_seconds_count".to_string(),
4158 ),
4159 ],
4160 &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
4161 )
4162 .await;
4163 let plan =
4165 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4166 .await
4167 .unwrap();
4168 let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
4169 \n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
4170 \n SubqueryAlias: http_server_requests_seconds_sum\
4171 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4172 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4173 \n Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
4174 \n Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4175 \n TableScan: http_server_requests_seconds_sum\
4176 \n SubqueryAlias: http_server_requests_seconds_count\
4177 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
4178 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
4179 \n Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
4180 \n Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
4181 \n TableScan: http_server_requests_seconds_count";
4182 assert_eq!(plan.to_string(), expected);
4183 }
4184
4185 #[tokio::test]
4186 async fn test_nested_histogram_quantile() {
4187 let mut eval_stmt = EvalStmt {
4188 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4189 start: UNIX_EPOCH,
4190 end: UNIX_EPOCH
4191 .checked_add(Duration::from_secs(100_000))
4192 .unwrap(),
4193 interval: Duration::from_secs(5),
4194 lookback_delta: Duration::from_secs(1),
4195 };
4196
4197 let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
4198
4199 let prom_expr = parser::parse(case).unwrap();
4200 eval_stmt.expr = prom_expr;
4201 let table_provider = build_test_table_provider_with_fields(
4202 &[(
4203 DEFAULT_SCHEMA_NAME.to_string(),
4204 "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
4205 )],
4206 &["pod", "le", "path", "code", "container"],
4207 )
4208 .await;
4209 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4211 .await
4212 .unwrap();
4213 }
4214
4215 #[tokio::test]
4216 async fn test_parse_and_operator() {
4217 let mut eval_stmt = EvalStmt {
4218 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4219 start: UNIX_EPOCH,
4220 end: UNIX_EPOCH
4221 .checked_add(Duration::from_secs(100_000))
4222 .unwrap(),
4223 interval: Duration::from_secs(5),
4224 lookback_delta: Duration::from_secs(1),
4225 };
4226
4227 let cases = [
4228 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4229 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
4230 ];
4231
4232 for case in cases {
4233 let prom_expr = parser::parse(case).unwrap();
4234 eval_stmt.expr = prom_expr;
4235 let table_provider = build_test_table_provider_with_fields(
4236 &[
4237 (
4238 DEFAULT_SCHEMA_NAME.to_string(),
4239 "kubelet_volume_stats_used_bytes".to_string(),
4240 ),
4241 (
4242 DEFAULT_SCHEMA_NAME.to_string(),
4243 "kubelet_volume_stats_capacity_bytes".to_string(),
4244 ),
4245 ],
4246 &["namespace", "persistentvolumeclaim"],
4247 )
4248 .await;
4249 let _ =
4251 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4252 .await
4253 .unwrap();
4254 }
4255 }
4256
4257 #[tokio::test]
4258 async fn test_nested_binary_op() {
4259 let mut eval_stmt = EvalStmt {
4260 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4261 start: UNIX_EPOCH,
4262 end: UNIX_EPOCH
4263 .checked_add(Duration::from_secs(100_000))
4264 .unwrap(),
4265 interval: Duration::from_secs(5),
4266 lookback_delta: Duration::from_secs(1),
4267 };
4268
4269 let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
4270 (
4271 sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
4272 or
4273 vector(0)
4274 )"#;
4275
4276 let prom_expr = parser::parse(case).unwrap();
4277 eval_stmt.expr = prom_expr;
4278 let table_provider = build_test_table_provider_with_fields(
4279 &[(
4280 DEFAULT_SCHEMA_NAME.to_string(),
4281 "nginx_ingress_controller_requests".to_string(),
4282 )],
4283 &["namespace", "job"],
4284 )
4285 .await;
4286 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4288 .await
4289 .unwrap();
4290 }
4291
4292 #[tokio::test]
4293 async fn test_parse_or_operator() {
4294 let mut eval_stmt = EvalStmt {
4295 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4296 start: UNIX_EPOCH,
4297 end: UNIX_EPOCH
4298 .checked_add(Duration::from_secs(100_000))
4299 .unwrap(),
4300 interval: Duration::from_secs(5),
4301 lookback_delta: Duration::from_secs(1),
4302 };
4303
4304 let case = r#"
4305 sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
4306 (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
4307 or
4308 200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
4309 sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
4310
4311 let table_provider = build_test_table_provider_with_fields(
4312 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4313 &["tenant_name", "cluster_name"],
4314 )
4315 .await;
4316 eval_stmt.expr = parser::parse(case).unwrap();
4317 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4318 .await
4319 .unwrap();
4320
4321 let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4322 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
4323 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4324 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4325 or
4326 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4327 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
4328 or
4329 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
4330 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
4331 let table_provider = build_test_table_provider_with_fields(
4332 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
4333 &["tenant_name", "cluster_name"],
4334 )
4335 .await;
4336 eval_stmt.expr = parser::parse(case).unwrap();
4337 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4338 .await
4339 .unwrap();
4340
4341 let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
4342 sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4343 (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
4344 (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
4345 let table_provider = build_test_table_provider_with_fields(
4346 &[
4347 (
4348 DEFAULT_SCHEMA_NAME.to_string(),
4349 "background_waitevent_cnt".to_string(),
4350 ),
4351 (
4352 DEFAULT_SCHEMA_NAME.to_string(),
4353 "foreground_waitevent_cnt".to_string(),
4354 ),
4355 ],
4356 &["tenant_name", "cluster_name"],
4357 )
4358 .await;
4359 eval_stmt.expr = parser::parse(case).unwrap();
4360 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4361 .await
4362 .unwrap();
4363
4364 let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
4365 let table_provider = build_test_table_provider_with_fields(
4366 &[
4367 (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
4368 (
4369 DEFAULT_SCHEMA_NAME.to_string(),
4370 "container_cpu_load_average_10s".to_string(),
4371 ),
4372 (
4373 DEFAULT_SCHEMA_NAME.to_string(),
4374 "container_spec_cpu_quota".to_string(),
4375 ),
4376 ],
4377 &["cluster_name", "host_name"],
4378 )
4379 .await;
4380 eval_stmt.expr = parser::parse(case).unwrap();
4381 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4382 .await
4383 .unwrap();
4384 }
4385
4386 #[tokio::test]
4387 async fn value_matcher() {
4388 let mut eval_stmt = EvalStmt {
4390 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4391 start: UNIX_EPOCH,
4392 end: UNIX_EPOCH
4393 .checked_add(Duration::from_secs(100_000))
4394 .unwrap(),
4395 interval: Duration::from_secs(5),
4396 lookback_delta: Duration::from_secs(1),
4397 };
4398
4399 let cases = [
4400 (
4402 r#"some_metric{__field__="field_1"}"#,
4403 vec![
4404 "some_metric.field_1",
4405 "some_metric.tag_0",
4406 "some_metric.tag_1",
4407 "some_metric.tag_2",
4408 "some_metric.timestamp",
4409 ],
4410 ),
4411 (
4413 r#"some_metric{__field__="field_1", __field__="field_0"}"#,
4414 vec![
4415 "some_metric.field_0",
4416 "some_metric.field_1",
4417 "some_metric.tag_0",
4418 "some_metric.tag_1",
4419 "some_metric.tag_2",
4420 "some_metric.timestamp",
4421 ],
4422 ),
4423 (
4425 r#"some_metric{__field__!="field_1"}"#,
4426 vec![
4427 "some_metric.field_0",
4428 "some_metric.field_2",
4429 "some_metric.tag_0",
4430 "some_metric.tag_1",
4431 "some_metric.tag_2",
4432 "some_metric.timestamp",
4433 ],
4434 ),
4435 (
4437 r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
4438 vec![
4439 "some_metric.field_0",
4440 "some_metric.tag_0",
4441 "some_metric.tag_1",
4442 "some_metric.tag_2",
4443 "some_metric.timestamp",
4444 ],
4445 ),
4446 (
4448 r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
4449 vec![
4450 "some_metric.field_1",
4451 "some_metric.tag_0",
4452 "some_metric.tag_1",
4453 "some_metric.tag_2",
4454 "some_metric.timestamp",
4455 ],
4456 ),
4457 (
4459 r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
4460 vec![
4461 "some_metric.tag_0",
4462 "some_metric.tag_1",
4463 "some_metric.tag_2",
4464 "some_metric.timestamp",
4465 ],
4466 ),
4467 (
4469 r#"some_metric{__field__=~"field_1|field_2"}"#,
4470 vec![
4471 "some_metric.field_1",
4472 "some_metric.field_2",
4473 "some_metric.tag_0",
4474 "some_metric.tag_1",
4475 "some_metric.tag_2",
4476 "some_metric.timestamp",
4477 ],
4478 ),
4479 (
4481 r#"some_metric{__field__!~"field_1|field_2"}"#,
4482 vec![
4483 "some_metric.field_0",
4484 "some_metric.tag_0",
4485 "some_metric.tag_1",
4486 "some_metric.tag_2",
4487 "some_metric.timestamp",
4488 ],
4489 ),
4490 ];
4491
4492 for case in cases {
4493 let prom_expr = parser::parse(case.0).unwrap();
4494 eval_stmt.expr = prom_expr;
4495 let table_provider = build_test_table_provider(
4496 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4497 3,
4498 3,
4499 )
4500 .await;
4501 let plan =
4502 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4503 .await
4504 .unwrap();
4505 let mut fields = plan.schema().field_names();
4506 let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
4507 fields.sort();
4508 expected.sort();
4509 assert_eq!(fields, expected, "case: {:?}", case.0);
4510 }
4511
4512 let bad_cases = [
4513 r#"some_metric{__field__="nonexistent"}"#,
4514 r#"some_metric{__field__!="nonexistent"}"#,
4515 ];
4516
4517 for case in bad_cases {
4518 let prom_expr = parser::parse(case).unwrap();
4519 eval_stmt.expr = prom_expr;
4520 let table_provider = build_test_table_provider(
4521 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4522 3,
4523 3,
4524 )
4525 .await;
4526 let plan =
4527 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4528 .await;
4529 assert!(plan.is_err(), "case: {:?}", case);
4530 }
4531 }
4532
4533 #[tokio::test]
4534 async fn custom_schema() {
4535 let query = "some_alt_metric{__schema__=\"greptime_private\"}";
4536 let expected = String::from(
4537 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4538 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4539 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4540 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4541 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4542 );
4543
4544 indie_query_plan_compare(query, expected).await;
4545
4546 let query = "some_alt_metric{__database__=\"greptime_private\"}";
4547 let expected = String::from(
4548 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4549 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4550 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4551 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4552 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4553 );
4554
4555 indie_query_plan_compare(query, expected).await;
4556
4557 let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
4558 let expected = String::from(
4559 "Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
4560 \n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4561 \n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4562 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4563 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4564 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4565 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4566 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4567 \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4568 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4569 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4570 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4571 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4572 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
4573 );
4574
4575 indie_query_plan_compare(query, expected).await;
4576 }
4577
4578 #[tokio::test]
4579 async fn only_equals_is_supported_for_special_matcher() {
4580 let queries = &[
4581 "some_alt_metric{__schema__!=\"greptime_private\"}",
4582 "some_alt_metric{__schema__=~\"lalala\"}",
4583 "some_alt_metric{__database__!=\"greptime_private\"}",
4584 "some_alt_metric{__database__=~\"lalala\"}",
4585 ];
4586
4587 for query in queries {
4588 let prom_expr = parser::parse(query).unwrap();
4589 let eval_stmt = EvalStmt {
4590 expr: prom_expr,
4591 start: UNIX_EPOCH,
4592 end: UNIX_EPOCH
4593 .checked_add(Duration::from_secs(100_000))
4594 .unwrap(),
4595 interval: Duration::from_secs(5),
4596 lookback_delta: Duration::from_secs(1),
4597 };
4598
4599 let table_provider = build_test_table_provider(
4600 &[
4601 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
4602 (
4603 "greptime_private".to_string(),
4604 "some_alt_metric".to_string(),
4605 ),
4606 ],
4607 1,
4608 1,
4609 )
4610 .await;
4611
4612 let plan =
4613 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4614 .await;
4615 assert!(plan.is_err(), "query: {:?}", query);
4616 }
4617 }
4618
4619 #[tokio::test]
4620 async fn test_non_ms_precision() {
4621 let catalog_list = MemoryCatalogManager::with_default_setup();
4622 let columns = vec![
4623 ColumnSchema::new(
4624 "tag".to_string(),
4625 ConcreteDataType::string_datatype(),
4626 false,
4627 ),
4628 ColumnSchema::new(
4629 "timestamp".to_string(),
4630 ConcreteDataType::timestamp_nanosecond_datatype(),
4631 false,
4632 )
4633 .with_time_index(true),
4634 ColumnSchema::new(
4635 "field".to_string(),
4636 ConcreteDataType::float64_datatype(),
4637 true,
4638 ),
4639 ];
4640 let schema = Arc::new(Schema::new(columns));
4641 let table_meta = TableMetaBuilder::empty()
4642 .schema(schema)
4643 .primary_key_indices(vec![0])
4644 .value_indices(vec![2])
4645 .next_column_id(1024)
4646 .build()
4647 .unwrap();
4648 let table_info = TableInfoBuilder::default()
4649 .name("metrics".to_string())
4650 .meta(table_meta)
4651 .build()
4652 .unwrap();
4653 let table = EmptyTable::from_table_info(&table_info);
4654 assert!(
4655 catalog_list
4656 .register_table_sync(RegisterTableRequest {
4657 catalog: DEFAULT_CATALOG_NAME.to_string(),
4658 schema: DEFAULT_SCHEMA_NAME.to_string(),
4659 table_name: "metrics".to_string(),
4660 table_id: 1024,
4661 table,
4662 })
4663 .is_ok()
4664 );
4665
4666 let plan = PromPlanner::stmt_to_plan(
4667 DfTableSourceProvider::new(
4668 catalog_list.clone(),
4669 false,
4670 QueryContext::arc(),
4671 DummyDecoder::arc(),
4672 true,
4673 ),
4674 &EvalStmt {
4675 expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
4676 start: UNIX_EPOCH,
4677 end: UNIX_EPOCH
4678 .checked_add(Duration::from_secs(100_000))
4679 .unwrap(),
4680 interval: Duration::from_secs(5),
4681 lookback_delta: Duration::from_secs(1),
4682 },
4683 &build_query_engine_state(),
4684 )
4685 .await
4686 .unwrap();
4687 assert_eq!(
4688 plan.display_indent_schema().to_string(),
4689 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4690 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4691 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4692 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4693 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4694 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4695 );
4696 let plan = PromPlanner::stmt_to_plan(
4697 DfTableSourceProvider::new(
4698 catalog_list.clone(),
4699 false,
4700 QueryContext::arc(),
4701 DummyDecoder::arc(),
4702 true,
4703 ),
4704 &EvalStmt {
4705 expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
4706 start: UNIX_EPOCH,
4707 end: UNIX_EPOCH
4708 .checked_add(Duration::from_secs(100_000))
4709 .unwrap(),
4710 interval: Duration::from_secs(5),
4711 lookback_delta: Duration::from_secs(1),
4712 },
4713 &build_query_engine_state(),
4714 )
4715 .await
4716 .unwrap();
4717 assert_eq!(
4718 plan.display_indent_schema().to_string(),
4719 "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4720 \n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
4721 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
4722 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4723 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4724 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4725 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4726 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
4727 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
4728 );
4729 }
4730
4731 #[tokio::test]
4732 async fn test_nonexistent_label() {
4733 let mut eval_stmt = EvalStmt {
4735 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4736 start: UNIX_EPOCH,
4737 end: UNIX_EPOCH
4738 .checked_add(Duration::from_secs(100_000))
4739 .unwrap(),
4740 interval: Duration::from_secs(5),
4741 lookback_delta: Duration::from_secs(1),
4742 };
4743
4744 let case = r#"some_metric{nonexistent="hi"}"#;
4745 let prom_expr = parser::parse(case).unwrap();
4746 eval_stmt.expr = prom_expr;
4747 let table_provider = build_test_table_provider(
4748 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4749 3,
4750 3,
4751 )
4752 .await;
4753 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4755 .await
4756 .unwrap();
4757 }
4758
4759 #[tokio::test]
4760 async fn test_label_join() {
4761 let prom_expr = parser::parse(
4762 "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
4763 )
4764 .unwrap();
4765 let eval_stmt = EvalStmt {
4766 expr: prom_expr,
4767 start: UNIX_EPOCH,
4768 end: UNIX_EPOCH
4769 .checked_add(Duration::from_secs(100_000))
4770 .unwrap(),
4771 interval: Duration::from_secs(5),
4772 lookback_delta: Duration::from_secs(1),
4773 };
4774
4775 let table_provider =
4776 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
4777 .await;
4778 let plan =
4779 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4780 .await
4781 .unwrap();
4782
4783 let expected = r#"
4784Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4785 Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
4786 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4787 PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4788 Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4789 Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4790 TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4791
4792 let ret = plan.display_indent_schema().to_string();
4793 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4794 }
4795
4796 #[tokio::test]
4797 async fn test_label_replace() {
4798 let prom_expr = parser::parse(
4799 "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
4800 )
4801 .unwrap();
4802 let eval_stmt = EvalStmt {
4803 expr: prom_expr,
4804 start: UNIX_EPOCH,
4805 end: UNIX_EPOCH
4806 .checked_add(Duration::from_secs(100_000))
4807 .unwrap(),
4808 interval: Duration::from_secs(5),
4809 lookback_delta: Duration::from_secs(1),
4810 };
4811
4812 let table_provider =
4813 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
4814 .await;
4815 let plan =
4816 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4817 .await
4818 .unwrap();
4819
4820 let expected = r#"
4821Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4822 Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
4823 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4824 PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4825 Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4826 Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
4827 TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
4828
4829 let ret = plan.display_indent_schema().to_string();
4830 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
4831 }
4832
4833 #[tokio::test]
4834 async fn test_matchers_to_expr() {
4835 let mut eval_stmt = EvalStmt {
4836 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4837 start: UNIX_EPOCH,
4838 end: UNIX_EPOCH
4839 .checked_add(Duration::from_secs(100_000))
4840 .unwrap(),
4841 interval: Duration::from_secs(5),
4842 lookback_delta: Duration::from_secs(1),
4843 };
4844 let case =
4845 r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
4846
4847 let prom_expr = parser::parse(case).unwrap();
4848 eval_stmt.expr = prom_expr;
4849 let table_provider = build_test_table_provider(
4850 &[(
4851 DEFAULT_SCHEMA_NAME.to_string(),
4852 "prometheus_tsdb_head_series".to_string(),
4853 )],
4854 3,
4855 3,
4856 )
4857 .await;
4858 let plan =
4859 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4860 .await
4861 .unwrap();
4862 let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4863 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
4864 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4865 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4866 \n Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4867 \n Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
4868 \n TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
4869 assert_eq!(plan.display_indent_schema().to_string(), expected);
4870 }
4871
4872 #[tokio::test]
4873 async fn test_topk_expr() {
4874 let mut eval_stmt = EvalStmt {
4875 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4876 start: UNIX_EPOCH,
4877 end: UNIX_EPOCH
4878 .checked_add(Duration::from_secs(100_000))
4879 .unwrap(),
4880 interval: Duration::from_secs(5),
4881 lookback_delta: Duration::from_secs(1),
4882 };
4883 let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
4884
4885 let prom_expr = parser::parse(case).unwrap();
4886 eval_stmt.expr = prom_expr;
4887 let table_provider = build_test_table_provider_with_fields(
4888 &[
4889 (
4890 DEFAULT_SCHEMA_NAME.to_string(),
4891 "prometheus_tsdb_head_series".to_string(),
4892 ),
4893 (
4894 DEFAULT_SCHEMA_NAME.to_string(),
4895 "http_server_requests_seconds_count".to_string(),
4896 ),
4897 ],
4898 &["ip"],
4899 )
4900 .await;
4901
4902 let plan =
4903 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4904 .await
4905 .unwrap();
4906 let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
4907 \n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4908 \n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4909 \n WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
4910 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4911 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
4912 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4913 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4914 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4915 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4916 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4917
4918 assert_eq!(plan.display_indent_schema().to_string(), expected);
4919 }
4920
4921 #[tokio::test]
4922 async fn test_count_values_expr() {
4923 let mut eval_stmt = EvalStmt {
4924 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4925 start: UNIX_EPOCH,
4926 end: UNIX_EPOCH
4927 .checked_add(Duration::from_secs(100_000))
4928 .unwrap(),
4929 interval: Duration::from_secs(5),
4930 lookback_delta: Duration::from_secs(1),
4931 };
4932 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4933
4934 let prom_expr = parser::parse(case).unwrap();
4935 eval_stmt.expr = prom_expr;
4936 let table_provider = build_test_table_provider_with_fields(
4937 &[
4938 (
4939 DEFAULT_SCHEMA_NAME.to_string(),
4940 "prometheus_tsdb_head_series".to_string(),
4941 ),
4942 (
4943 DEFAULT_SCHEMA_NAME.to_string(),
4944 "http_server_requests_seconds_count".to_string(),
4945 ),
4946 ],
4947 &["ip"],
4948 )
4949 .await;
4950
4951 let plan =
4952 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4953 .await
4954 .unwrap();
4955 let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
4956 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4957 \n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
4958 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
4959 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4960 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4961 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4962 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
4963 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
4964
4965 assert_eq!(plan.display_indent_schema().to_string(), expected);
4966 }
4967
4968 #[tokio::test]
4969 async fn test_value_alias() {
4970 let mut eval_stmt = EvalStmt {
4971 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
4972 start: UNIX_EPOCH,
4973 end: UNIX_EPOCH
4974 .checked_add(Duration::from_secs(100_000))
4975 .unwrap(),
4976 interval: Duration::from_secs(5),
4977 lookback_delta: Duration::from_secs(1),
4978 };
4979 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
4980
4981 let prom_expr = parser::parse(case).unwrap();
4982 eval_stmt.expr = prom_expr;
4983 let table_provider = build_test_table_provider_with_fields(
4984 &[
4985 (
4986 DEFAULT_SCHEMA_NAME.to_string(),
4987 "prometheus_tsdb_head_series".to_string(),
4988 ),
4989 (
4990 DEFAULT_SCHEMA_NAME.to_string(),
4991 "http_server_requests_seconds_count".to_string(),
4992 ),
4993 ],
4994 &["ip"],
4995 )
4996 .await;
4997
4998 let alias = Some("my_series".to_string());
4999 let plan = PromPlanner::stmt_to_plan_with_alias(
5000 table_provider,
5001 &eval_stmt,
5002 alias,
5003 &build_query_engine_state(),
5004 )
5005 .await
5006 .unwrap();
5007 let expected = r#"
5008Projection: count(prometheus_tsdb_head_series.greptime_value) AS my_series, prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [my_series:Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]
5009 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]
5010 Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
5011 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
5012 Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
5013 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5014 PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5015 Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5016 Filter: prometheus_tsdb_head_series.ip ~ Utf8("^(?:(10.0.160.237:8080|10.0.160.237:9090))$") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5017 TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
5018 assert_eq!(format!("\n{}", plan.display_indent_schema()), expected);
5019 }
5020
5021 #[tokio::test]
5022 async fn test_quantile_expr() {
5023 let mut eval_stmt = EvalStmt {
5024 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5025 start: UNIX_EPOCH,
5026 end: UNIX_EPOCH
5027 .checked_add(Duration::from_secs(100_000))
5028 .unwrap(),
5029 interval: Duration::from_secs(5),
5030 lookback_delta: Duration::from_secs(1),
5031 };
5032 let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
5033
5034 let prom_expr = parser::parse(case).unwrap();
5035 eval_stmt.expr = prom_expr;
5036 let table_provider = build_test_table_provider_with_fields(
5037 &[
5038 (
5039 DEFAULT_SCHEMA_NAME.to_string(),
5040 "prometheus_tsdb_head_series".to_string(),
5041 ),
5042 (
5043 DEFAULT_SCHEMA_NAME.to_string(),
5044 "http_server_requests_seconds_count".to_string(),
5045 ),
5046 ],
5047 &["ip"],
5048 )
5049 .await;
5050
5051 let plan =
5052 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5053 .await
5054 .unwrap();
5055 let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
5056 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
5057 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
5058 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
5059 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5060 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5061 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5062 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5063 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
5064
5065 assert_eq!(plan.display_indent_schema().to_string(), expected);
5066 }
5067
5068 #[tokio::test]
5069 async fn test_or_not_exists_table_label() {
5070 let mut eval_stmt = EvalStmt {
5071 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5072 start: UNIX_EPOCH,
5073 end: UNIX_EPOCH
5074 .checked_add(Duration::from_secs(100_000))
5075 .unwrap(),
5076 interval: Duration::from_secs(5),
5077 lookback_delta: Duration::from_secs(1),
5078 };
5079 let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
5080
5081 let prom_expr = parser::parse(case).unwrap();
5082 eval_stmt.expr = prom_expr;
5083 let table_provider = build_test_table_provider_with_fields(
5084 &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
5085 &["job"],
5086 )
5087 .await;
5088
5089 let plan =
5090 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5091 .await
5092 .unwrap();
5093 let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
5094 SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
5095 Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
5096 Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
5097 Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
5098 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5099 PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5100 Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5101 Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5102 TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
5103 SubqueryAlias: [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
5104 Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
5105 Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
5106 Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
5107 EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
5108 TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
5109
5110 assert_eq!(plan.display_indent_schema().to_string(), expected);
5111 }
5112
5113 #[tokio::test]
5114 async fn test_histogram_quantile_missing_le_column() {
5115 let mut eval_stmt = EvalStmt {
5116 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5117 start: UNIX_EPOCH,
5118 end: UNIX_EPOCH
5119 .checked_add(Duration::from_secs(100_000))
5120 .unwrap(),
5121 interval: Duration::from_secs(5),
5122 lookback_delta: Duration::from_secs(1),
5123 };
5124
5125 let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
5127
5128 let prom_expr = parser::parse(case).unwrap();
5129 eval_stmt.expr = prom_expr;
5130
5131 let table_provider = build_test_table_provider_with_fields(
5133 &[(
5134 DEFAULT_SCHEMA_NAME.to_string(),
5135 "non_existent_histogram_bucket".to_string(),
5136 )],
5137 &["pod", "instance"], )
5139 .await;
5140
5141 let result =
5143 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5144 .await;
5145
5146 assert!(
5148 result.is_ok(),
5149 "Expected successful plan creation with empty result, but got error: {:?}",
5150 result.err()
5151 );
5152
5153 let plan = result.unwrap();
5155 match plan {
5156 LogicalPlan::EmptyRelation(_) => {
5157 }
5159 _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
5160 }
5161 }
5162}