1use std::collections::{BTreeSet, HashSet, VecDeque};
16use std::sync::Arc;
17use std::time::UNIX_EPOCH;
18
19use arrow::datatypes::IntervalDayTime;
20use async_recursion::async_recursion;
21use catalog::table_source::DfTableSourceProvider;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_function::function::FunctionContext;
25use common_query::prelude::greptime_value;
26use datafusion::common::DFSchemaRef;
27use datafusion::datasource::DefaultTableSource;
28use datafusion::functions_aggregate::average::avg_udaf;
29use datafusion::functions_aggregate::count::count_udaf;
30use datafusion::functions_aggregate::expr_fn::first_value;
31use datafusion::functions_aggregate::grouping::grouping_udaf;
32use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
33use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
34use datafusion::functions_aggregate::sum::sum_udaf;
35use datafusion::functions_aggregate::variance::var_pop_udaf;
36use datafusion::functions_window::row_number::RowNumber;
37use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
38use datafusion::logical_expr::expr_rewriter::normalize_cols;
39use datafusion::logical_expr::{
40 BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
41 ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
42};
43use datafusion::prelude as df_prelude;
44use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
45use datafusion::scalar::ScalarValue;
46use datafusion::sql::TableReference;
47use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
48use datafusion_common::{DFSchema, NullEquality};
49use datafusion_expr::expr::WindowFunctionParams;
50use datafusion_expr::utils::conjunction;
51use datafusion_expr::{
52 ExprSchemable, Literal, Projection, SortExpr, TableScan, TableSource, col, lit,
53};
54use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
55use datatypes::data_type::ConcreteDataType;
56use itertools::Itertools;
57use once_cell::sync::Lazy;
58use promql::extension_plan::{
59 Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate,
60 ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, build_special_time_expr,
61};
62use promql::functions::{
63 AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
64 Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
65 QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
66 quantile_udaf,
67};
68use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
69use promql_parser::parser::token::TokenType;
70use promql_parser::parser::{
71 AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, Expr as PromExpr,
72 Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, NumberLiteral,
73 Offset, ParenExpr, StringLiteral, SubqueryExpr, UnaryExpr, VectorMatchCardinality,
74 VectorSelector, token,
75};
76use regex::{self, Regex};
77use snafu::{OptionExt, ResultExt, ensure};
78use store_api::metric_engine_consts::{
79 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
80 METRIC_ENGINE_NAME, is_metric_engine_internal_column,
81};
82use table::table::adapter::DfTableProviderAdapter;
83
84use crate::promql::error::{
85 CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
86 ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, InvalidDestinationLabelNameSnafu,
87 InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
88 MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
89 Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
90 UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
91 UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
92 ZeroRangeSelectorSnafu,
93};
94use crate::query_engine::QueryEngineState;
95
96const SPECIAL_TIME_FUNCTION: &str = "time";
98const SCALAR_FUNCTION: &str = "scalar";
100const SPECIAL_ABSENT_FUNCTION: &str = "absent";
102const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
104const SPECIAL_VECTOR_FUNCTION: &str = "vector";
106const LE_COLUMN_NAME: &str = "le";
108
109static LABEL_NAME_REGEX: Lazy<Regex> =
112 Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
113
114const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
115
116const DEFAULT_FIELD_COLUMN: &str = "value";
118
119const FIELD_COLUMN_MATCHER: &str = "__field__";
121
122const SCHEMA_COLUMN_MATCHER: &str = "__schema__";
124const DB_COLUMN_MATCHER: &str = "__database__";
125
126const MAX_SCATTER_POINTS: i64 = 400;
128
129const INTERVAL_1H: i64 = 60 * 60 * 1000;
131
132#[derive(Default, Debug, Clone)]
133struct PromPlannerContext {
134 start: Millisecond,
136 end: Millisecond,
137 interval: Millisecond,
138 lookback_delta: Millisecond,
139
140 table_name: Option<String>,
142 time_index_column: Option<String>,
143 field_columns: Vec<String>,
144 tag_columns: Vec<String>,
145 use_tsid: bool,
151 field_column_matcher: Option<Vec<Matcher>>,
153 selector_matcher: Vec<Matcher>,
155 schema_name: Option<String>,
156 range: Option<Millisecond>,
158}
159
160impl PromPlannerContext {
161 fn from_eval_stmt(stmt: &EvalStmt) -> Self {
162 Self {
163 start: stmt.start.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
164 end: stmt.end.duration_since(UNIX_EPOCH).unwrap().as_millis() as _,
165 interval: stmt.interval.as_millis() as _,
166 lookback_delta: stmt.lookback_delta.as_millis() as _,
167 ..Default::default()
168 }
169 }
170
171 fn reset(&mut self) {
173 self.table_name = None;
174 self.time_index_column = None;
175 self.field_columns = vec![];
176 self.tag_columns = vec![];
177 self.use_tsid = false;
178 self.field_column_matcher = None;
179 self.selector_matcher.clear();
180 self.schema_name = None;
181 self.range = None;
182 }
183
184 fn reset_table_name_and_schema(&mut self) {
186 self.table_name = Some(String::new());
187 self.schema_name = None;
188 self.use_tsid = false;
189 }
190
191 fn has_le_tag(&self) -> bool {
193 self.tag_columns.iter().any(|c| c.eq(&LE_COLUMN_NAME))
194 }
195}
196
197pub struct PromPlanner {
198 table_provider: DfTableSourceProvider,
199 ctx: PromPlannerContext,
200}
201
202impl PromPlanner {
203 pub async fn stmt_to_plan_with_alias(
204 table_provider: DfTableSourceProvider,
205 stmt: &EvalStmt,
206 alias: Option<String>,
207 query_engine_state: &QueryEngineState,
208 ) -> Result<LogicalPlan> {
209 let mut planner = Self {
210 table_provider,
211 ctx: PromPlannerContext::from_eval_stmt(stmt),
212 };
213
214 let plan = planner
215 .prom_expr_to_plan(&stmt.expr, query_engine_state)
216 .await?;
217
218 let plan = if let Some(alias_name) = alias {
220 planner.apply_alias_projection(plan, alias_name)?
221 } else {
222 plan
223 };
224
225 planner.strip_tsid_column(plan)
227 }
228
229 #[cfg(test)]
230 async fn stmt_to_plan(
231 table_provider: DfTableSourceProvider,
232 stmt: &EvalStmt,
233 query_engine_state: &QueryEngineState,
234 ) -> Result<LogicalPlan> {
235 Self::stmt_to_plan_with_alias(table_provider, stmt, None, query_engine_state).await
236 }
237
238 pub async fn prom_expr_to_plan(
239 &mut self,
240 prom_expr: &PromExpr,
241 query_engine_state: &QueryEngineState,
242 ) -> Result<LogicalPlan> {
243 self.prom_expr_to_plan_inner(prom_expr, false, query_engine_state)
244 .await
245 }
246
247 #[async_recursion]
257 async fn prom_expr_to_plan_inner(
258 &mut self,
259 prom_expr: &PromExpr,
260 timestamp_fn: bool,
261 query_engine_state: &QueryEngineState,
262 ) -> Result<LogicalPlan> {
263 let res = match prom_expr {
264 PromExpr::Aggregate(expr) => {
265 self.prom_aggr_expr_to_plan(query_engine_state, expr)
266 .await?
267 }
268 PromExpr::Unary(expr) => {
269 self.prom_unary_expr_to_plan(query_engine_state, expr)
270 .await?
271 }
272 PromExpr::Binary(expr) => {
273 self.prom_binary_expr_to_plan(query_engine_state, expr)
274 .await?
275 }
276 PromExpr::Paren(ParenExpr { expr }) => {
277 self.prom_expr_to_plan_inner(expr, timestamp_fn, query_engine_state)
278 .await?
279 }
280 PromExpr::Subquery(expr) => {
281 self.prom_subquery_expr_to_plan(query_engine_state, expr)
282 .await?
283 }
284 PromExpr::NumberLiteral(lit) => self.prom_number_lit_to_plan(lit)?,
285 PromExpr::StringLiteral(lit) => self.prom_string_lit_to_plan(lit)?,
286 PromExpr::VectorSelector(selector) => {
287 self.prom_vector_selector_to_plan(selector, timestamp_fn)
288 .await?
289 }
290 PromExpr::MatrixSelector(selector) => {
291 self.prom_matrix_selector_to_plan(selector).await?
292 }
293 PromExpr::Call(expr) => {
294 self.prom_call_expr_to_plan(query_engine_state, expr)
295 .await?
296 }
297 PromExpr::Extension(expr) => {
298 self.prom_ext_expr_to_plan(query_engine_state, expr).await?
299 }
300 };
301
302 Ok(res)
303 }
304
305 async fn prom_subquery_expr_to_plan(
306 &mut self,
307 query_engine_state: &QueryEngineState,
308 subquery_expr: &SubqueryExpr,
309 ) -> Result<LogicalPlan> {
310 let SubqueryExpr {
311 expr, range, step, ..
312 } = subquery_expr;
313
314 let current_interval = self.ctx.interval;
315 if let Some(step) = step {
316 self.ctx.interval = step.as_millis() as _;
317 }
318 let current_start = self.ctx.start;
319 self.ctx.start -= range.as_millis() as i64 - self.ctx.interval;
320 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
321 self.ctx.interval = current_interval;
322 self.ctx.start = current_start;
323
324 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
325 let range_ms = range.as_millis() as _;
326 self.ctx.range = Some(range_ms);
327
328 let manipulate = RangeManipulate::new(
329 self.ctx.start,
330 self.ctx.end,
331 self.ctx.interval,
332 range_ms,
333 self.ctx
334 .time_index_column
335 .clone()
336 .expect("time index should be set in `setup_context`"),
337 self.ctx.field_columns.clone(),
338 input,
339 )
340 .context(DataFusionPlanningSnafu)?;
341
342 Ok(LogicalPlan::Extension(Extension {
343 node: Arc::new(manipulate),
344 }))
345 }
346
347 async fn prom_aggr_expr_to_plan(
348 &mut self,
349 query_engine_state: &QueryEngineState,
350 aggr_expr: &AggregateExpr,
351 ) -> Result<LogicalPlan> {
352 let AggregateExpr {
353 op,
354 expr,
355 modifier,
356 param,
357 } = aggr_expr;
358
359 let mut input = self.prom_expr_to_plan(expr, query_engine_state).await?;
360 let input_has_tsid = input.schema().fields().iter().any(|field| {
361 field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
362 && field.data_type() == &ArrowDataType::UInt64
363 });
364
365 let required_group_tags = match modifier {
368 None => BTreeSet::new(),
369 Some(LabelModifier::Include(labels)) => labels
370 .labels
371 .iter()
372 .filter(|label| !is_metric_engine_internal_column(label.as_str()))
373 .cloned()
374 .collect(),
375 Some(LabelModifier::Exclude(labels)) => {
376 let mut all_tags = self.collect_row_key_tag_columns_from_plan(&input)?;
377 for label in &labels.labels {
378 let _ = all_tags.remove(label);
379 }
380 all_tags
381 }
382 };
383
384 if !required_group_tags.is_empty()
385 && required_group_tags
386 .iter()
387 .any(|tag| Self::find_case_sensitive_column(input.schema(), tag.as_str()).is_none())
388 {
389 input = self.ensure_tag_columns_available(input, &required_group_tags)?;
390 self.refresh_tag_columns_from_schema(input.schema());
391 }
392
393 match (*op).id() {
394 token::T_TOPK | token::T_BOTTOMK => {
395 self.prom_topk_bottomk_to_plan(aggr_expr, input).await
396 }
397 _ => {
398 let input_tag_columns = if input_has_tsid {
402 self.collect_row_key_tag_columns_from_plan(&input)?
403 .into_iter()
404 .collect::<Vec<_>>()
405 } else {
406 self.ctx.tag_columns.clone()
407 };
408 let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
411 let (mut aggr_exprs, prev_field_exprs) =
413 self.create_aggregate_exprs(*op, param, &input)?;
414
415 let keep_tsid = op.id() != token::T_COUNT_VALUES
416 && input_has_tsid
417 && input_tag_columns.iter().collect::<HashSet<_>>()
418 == self.ctx.tag_columns.iter().collect::<HashSet<_>>();
419
420 if keep_tsid {
421 aggr_exprs.push(
422 first_value(
423 DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)),
424 vec![],
425 )
426 .alias(DATA_SCHEMA_TSID_COLUMN_NAME),
427 );
428 }
429 self.ctx.use_tsid = keep_tsid;
430
431 let builder = LogicalPlanBuilder::from(input);
433 let builder = if op.id() == token::T_COUNT_VALUES {
434 let label = Self::get_param_value_as_str(*op, param)?;
435 group_exprs.extend(prev_field_exprs.clone());
438 let project_fields = self
439 .create_field_column_exprs()?
440 .into_iter()
441 .chain(self.create_tag_column_exprs()?)
442 .chain(Some(self.create_time_index_column_expr()?))
443 .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
444
445 builder
446 .aggregate(group_exprs.clone(), aggr_exprs)
447 .context(DataFusionPlanningSnafu)?
448 .project(project_fields)
449 .context(DataFusionPlanningSnafu)?
450 } else {
451 builder
452 .aggregate(group_exprs.clone(), aggr_exprs)
453 .context(DataFusionPlanningSnafu)?
454 };
455
456 let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
457
458 builder
459 .sort(sort_expr)
460 .context(DataFusionPlanningSnafu)?
461 .build()
462 .context(DataFusionPlanningSnafu)
463 }
464 }
465 }
466
467 async fn prom_topk_bottomk_to_plan(
469 &mut self,
470 aggr_expr: &AggregateExpr,
471 input: LogicalPlan,
472 ) -> Result<LogicalPlan> {
473 let AggregateExpr {
474 op,
475 param,
476 modifier,
477 ..
478 } = aggr_expr;
479
480 let input_has_tsid = input.schema().fields().iter().any(|field| {
481 field.name() == DATA_SCHEMA_TSID_COLUMN_NAME
482 && field.data_type() == &ArrowDataType::UInt64
483 });
484 self.ctx.use_tsid = input_has_tsid;
485
486 let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
487
488 let val = Self::get_param_as_literal_expr(param, Some(*op), Some(ArrowDataType::Float64))?;
489
490 let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
492
493 let rank_columns: Vec<_> = window_exprs
494 .iter()
495 .map(|expr| expr.schema_name().to_string())
496 .collect();
497
498 let filter: DfExpr = rank_columns
501 .iter()
502 .fold(None, |expr, rank| {
503 let predicate = DfExpr::BinaryExpr(BinaryExpr {
504 left: Box::new(col(rank)),
505 op: Operator::LtEq,
506 right: Box::new(val.clone()),
507 });
508
509 match expr {
510 None => Some(predicate),
511 Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
512 left: Box::new(expr),
513 op: Operator::Or,
514 right: Box::new(predicate),
515 })),
516 }
517 })
518 .unwrap();
519
520 let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
521
522 let mut new_group_exprs = group_exprs.clone();
523 new_group_exprs.extend(rank_columns);
525
526 let group_sort_expr = new_group_exprs
527 .into_iter()
528 .map(|expr| expr.sort(true, false));
529
530 let project_fields = self
531 .create_field_column_exprs()?
532 .into_iter()
533 .chain(self.create_tag_column_exprs()?)
534 .chain(
535 self.ctx
536 .use_tsid
537 .then_some(DfExpr::Column(Column::from_name(
538 DATA_SCHEMA_TSID_COLUMN_NAME,
539 )))
540 .into_iter(),
541 )
542 .chain(Some(self.create_time_index_column_expr()?));
543
544 LogicalPlanBuilder::from(input)
545 .window(window_exprs)
546 .context(DataFusionPlanningSnafu)?
547 .filter(filter)
548 .context(DataFusionPlanningSnafu)?
549 .sort(group_sort_expr)
550 .context(DataFusionPlanningSnafu)?
551 .project(project_fields)
552 .context(DataFusionPlanningSnafu)?
553 .build()
554 .context(DataFusionPlanningSnafu)
555 }
556
557 async fn prom_unary_expr_to_plan(
558 &mut self,
559 query_engine_state: &QueryEngineState,
560 unary_expr: &UnaryExpr,
561 ) -> Result<LogicalPlan> {
562 let UnaryExpr { expr } = unary_expr;
563 let input = self.prom_expr_to_plan(expr, query_engine_state).await?;
565 self.projection_for_each_field_column(input, |col| {
566 Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
567 })
568 }
569
570 async fn prom_binary_expr_to_plan(
571 &mut self,
572 query_engine_state: &QueryEngineState,
573 binary_expr: &PromBinaryExpr,
574 ) -> Result<LogicalPlan> {
575 let PromBinaryExpr {
576 lhs,
577 rhs,
578 op,
579 modifier,
580 } = binary_expr;
581
582 let should_return_bool = if let Some(m) = modifier {
585 m.return_bool
586 } else {
587 false
588 };
589 let is_comparison_op = Self::is_token_a_comparison_op(*op);
590
591 match (
594 Self::try_build_literal_expr(lhs),
595 Self::try_build_literal_expr(rhs),
596 ) {
597 (Some(lhs), Some(rhs)) => {
598 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
599 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
600 self.ctx.reset_table_name_and_schema();
601 let field_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
602 let mut field_expr = field_expr_builder(lhs, rhs)?;
603
604 if is_comparison_op && should_return_bool {
605 field_expr = DfExpr::Cast(Cast {
606 expr: Box::new(field_expr),
607 data_type: ArrowDataType::Float64,
608 });
609 }
610
611 Ok(LogicalPlan::Extension(Extension {
612 node: Arc::new(
613 EmptyMetric::new(
614 self.ctx.start,
615 self.ctx.end,
616 self.ctx.interval,
617 SPECIAL_TIME_FUNCTION.to_string(),
618 DEFAULT_FIELD_COLUMN.to_string(),
619 Some(field_expr),
620 )
621 .context(DataFusionPlanningSnafu)?,
622 ),
623 }))
624 }
625 (Some(mut expr), None) => {
627 let input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
628 if let Some(time_expr) = self.try_build_special_time_expr_with_context(lhs) {
630 expr = time_expr
631 }
632 let bin_expr_builder = |col: &String| {
633 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
634 let mut binary_expr =
635 binary_expr_builder(expr.clone(), DfExpr::Column(col.into()))?;
636
637 if is_comparison_op && should_return_bool {
638 binary_expr = DfExpr::Cast(Cast {
639 expr: Box::new(binary_expr),
640 data_type: ArrowDataType::Float64,
641 });
642 }
643 Ok(binary_expr)
644 };
645 if is_comparison_op && !should_return_bool {
646 self.filter_on_field_column(input, bin_expr_builder)
647 } else {
648 self.projection_for_each_field_column(input, bin_expr_builder)
649 }
650 }
651 (None, Some(mut expr)) => {
653 let input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
654 if let Some(time_expr) = self.try_build_special_time_expr_with_context(rhs) {
656 expr = time_expr
657 }
658 let bin_expr_builder = |col: &String| {
659 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
660 let mut binary_expr =
661 binary_expr_builder(DfExpr::Column(col.into()), expr.clone())?;
662
663 if is_comparison_op && should_return_bool {
664 binary_expr = DfExpr::Cast(Cast {
665 expr: Box::new(binary_expr),
666 data_type: ArrowDataType::Float64,
667 });
668 }
669 Ok(binary_expr)
670 };
671 if is_comparison_op && !should_return_bool {
672 self.filter_on_field_column(input, bin_expr_builder)
673 } else {
674 self.projection_for_each_field_column(input, bin_expr_builder)
675 }
676 }
677 (None, None) => {
679 let left_input = self.prom_expr_to_plan(lhs, query_engine_state).await?;
680 let left_field_columns = self.ctx.field_columns.clone();
681 let left_time_index_column = self.ctx.time_index_column.clone();
682 let mut left_table_ref = self
683 .table_ref()
684 .unwrap_or_else(|_| TableReference::bare(""));
685 let left_context = self.ctx.clone();
686
687 let right_input = self.prom_expr_to_plan(rhs, query_engine_state).await?;
688 let right_field_columns = self.ctx.field_columns.clone();
689 let right_time_index_column = self.ctx.time_index_column.clone();
690 let mut right_table_ref = self
691 .table_ref()
692 .unwrap_or_else(|_| TableReference::bare(""));
693 let right_context = self.ctx.clone();
694
695 if Self::is_token_a_set_op(*op) {
699 return self.set_op_on_non_field_columns(
700 left_input,
701 right_input,
702 left_context,
703 right_context,
704 *op,
705 modifier,
706 );
707 }
708
709 if left_table_ref == right_table_ref {
711 left_table_ref = TableReference::bare("lhs");
713 right_table_ref = TableReference::bare("rhs");
714 if self.ctx.tag_columns.is_empty() {
720 self.ctx = left_context.clone();
721 self.ctx.table_name = Some("lhs".to_string());
722 } else {
723 self.ctx.table_name = Some("rhs".to_string());
724 }
725 }
726 let mut field_columns = left_field_columns.iter().zip(right_field_columns.iter());
727
728 let join_plan = self.join_on_non_field_columns(
729 left_input,
730 right_input,
731 left_table_ref.clone(),
732 right_table_ref.clone(),
733 left_time_index_column,
734 right_time_index_column,
735 left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
738 modifier,
739 )?;
740 let join_plan_schema = join_plan.schema().clone();
741
742 let bin_expr_builder = |_: &String| {
743 let (left_col_name, right_col_name) = field_columns.next().unwrap();
744 let left_col = join_plan_schema
745 .qualified_field_with_name(Some(&left_table_ref), left_col_name)
746 .context(DataFusionPlanningSnafu)?
747 .into();
748 let right_col = join_plan_schema
749 .qualified_field_with_name(Some(&right_table_ref), right_col_name)
750 .context(DataFusionPlanningSnafu)?
751 .into();
752
753 let binary_expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
754 let mut binary_expr =
755 binary_expr_builder(DfExpr::Column(left_col), DfExpr::Column(right_col))?;
756 if is_comparison_op && should_return_bool {
757 binary_expr = DfExpr::Cast(Cast {
758 expr: Box::new(binary_expr),
759 data_type: ArrowDataType::Float64,
760 });
761 }
762 Ok(binary_expr)
763 };
764 if is_comparison_op && !should_return_bool {
765 self.filter_on_field_column(join_plan, bin_expr_builder)
766 } else {
767 self.projection_for_each_field_column(join_plan, bin_expr_builder)
768 }
769 }
770 }
771 }
772
773 fn prom_number_lit_to_plan(&mut self, number_literal: &NumberLiteral) -> Result<LogicalPlan> {
774 let NumberLiteral { val } = number_literal;
775 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
776 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
777 self.ctx.reset_table_name_and_schema();
778 let literal_expr = df_prelude::lit(*val);
779
780 let plan = LogicalPlan::Extension(Extension {
781 node: Arc::new(
782 EmptyMetric::new(
783 self.ctx.start,
784 self.ctx.end,
785 self.ctx.interval,
786 SPECIAL_TIME_FUNCTION.to_string(),
787 DEFAULT_FIELD_COLUMN.to_string(),
788 Some(literal_expr),
789 )
790 .context(DataFusionPlanningSnafu)?,
791 ),
792 });
793 Ok(plan)
794 }
795
796 fn prom_string_lit_to_plan(&mut self, string_literal: &StringLiteral) -> Result<LogicalPlan> {
797 let StringLiteral { val } = string_literal;
798 self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string());
799 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
800 self.ctx.reset_table_name_and_schema();
801 let literal_expr = df_prelude::lit(val.clone());
802
803 let plan = LogicalPlan::Extension(Extension {
804 node: Arc::new(
805 EmptyMetric::new(
806 self.ctx.start,
807 self.ctx.end,
808 self.ctx.interval,
809 SPECIAL_TIME_FUNCTION.to_string(),
810 DEFAULT_FIELD_COLUMN.to_string(),
811 Some(literal_expr),
812 )
813 .context(DataFusionPlanningSnafu)?,
814 ),
815 });
816 Ok(plan)
817 }
818
819 async fn prom_vector_selector_to_plan(
820 &mut self,
821 vector_selector: &VectorSelector,
822 timestamp_fn: bool,
823 ) -> Result<LogicalPlan> {
824 let VectorSelector {
825 name,
826 offset,
827 matchers,
828 at: _,
829 } = vector_selector;
830 let matchers = self.preprocess_label_matchers(matchers, name)?;
831 if let Some(empty_plan) = self.setup_context().await? {
832 return Ok(empty_plan);
833 }
834 let normalize = self
835 .selector_to_series_normalize_plan(offset, matchers, false)
836 .await?;
837
838 let normalize = if timestamp_fn {
839 self.create_timestamp_func_plan(normalize)?
842 } else {
843 normalize
844 };
845
846 let manipulate = InstantManipulate::new(
847 self.ctx.start,
848 self.ctx.end,
849 self.ctx.lookback_delta,
850 self.ctx.interval,
851 self.ctx
852 .time_index_column
853 .clone()
854 .expect("time index should be set in `setup_context`"),
855 self.ctx.field_columns.first().cloned(),
856 normalize,
857 );
858 Ok(LogicalPlan::Extension(Extension {
859 node: Arc::new(manipulate),
860 }))
861 }
862
863 fn create_timestamp_func_plan(&mut self, normalize: LogicalPlan) -> Result<LogicalPlan> {
885 let time_expr = build_special_time_expr(self.ctx.time_index_column.as_ref().unwrap())
886 .alias(DEFAULT_FIELD_COLUMN);
887 self.ctx.field_columns = vec![time_expr.schema_name().to_string()];
888 let mut project_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 2);
889 project_exprs.push(self.create_time_index_column_expr()?);
890 project_exprs.push(time_expr);
891 project_exprs.extend(self.create_tag_column_exprs()?);
892
893 LogicalPlanBuilder::from(normalize)
894 .project(project_exprs)
895 .context(DataFusionPlanningSnafu)?
896 .build()
897 .context(DataFusionPlanningSnafu)
898 }
899
900 async fn prom_matrix_selector_to_plan(
901 &mut self,
902 matrix_selector: &MatrixSelector,
903 ) -> Result<LogicalPlan> {
904 let MatrixSelector { vs, range } = matrix_selector;
905 let VectorSelector {
906 name,
907 offset,
908 matchers,
909 ..
910 } = vs;
911 let matchers = self.preprocess_label_matchers(matchers, name)?;
912 ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
913 let range_ms = range.as_millis() as _;
914 self.ctx.range = Some(range_ms);
915
916 let normalize = match self.setup_context().await? {
919 Some(empty_plan) => empty_plan,
920 None => {
921 self.selector_to_series_normalize_plan(offset, matchers, true)
922 .await?
923 }
924 };
925 let manipulate = RangeManipulate::new(
926 self.ctx.start,
927 self.ctx.end,
928 self.ctx.interval,
929 range_ms,
931 self.ctx
932 .time_index_column
933 .clone()
934 .expect("time index should be set in `setup_context`"),
935 self.ctx.field_columns.clone(),
936 normalize,
937 )
938 .context(DataFusionPlanningSnafu)?;
939
940 Ok(LogicalPlan::Extension(Extension {
941 node: Arc::new(manipulate),
942 }))
943 }
944
945 async fn prom_call_expr_to_plan(
946 &mut self,
947 query_engine_state: &QueryEngineState,
948 call_expr: &Call,
949 ) -> Result<LogicalPlan> {
950 let Call { func, args } = call_expr;
951 match func.name {
953 SPECIAL_HISTOGRAM_QUANTILE => {
954 return self.create_histogram_plan(args, query_engine_state).await;
955 }
956 SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await,
957 SCALAR_FUNCTION => return self.create_scalar_plan(args, query_engine_state).await,
958 SPECIAL_ABSENT_FUNCTION => {
959 return self.create_absent_plan(args, query_engine_state).await;
960 }
961 _ => {}
962 }
963
964 let args = self.create_function_args(&args.args)?;
966 let input = if let Some(prom_expr) = &args.input {
967 self.prom_expr_to_plan_inner(prom_expr, func.name == "timestamp", query_engine_state)
968 .await?
969 } else {
970 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
971 self.ctx.reset_table_name_and_schema();
972 self.ctx.tag_columns = vec![];
973 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
974 LogicalPlan::Extension(Extension {
975 node: Arc::new(
976 EmptyMetric::new(
977 self.ctx.start,
978 self.ctx.end,
979 self.ctx.interval,
980 SPECIAL_TIME_FUNCTION.to_string(),
981 DEFAULT_FIELD_COLUMN.to_string(),
982 None,
983 )
984 .context(DataFusionPlanningSnafu)?,
985 ),
986 })
987 };
988 let (mut func_exprs, new_tags) =
989 self.create_function_expr(func, args.literals.clone(), query_engine_state)?;
990 func_exprs.insert(0, self.create_time_index_column_expr()?);
991 func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
992
993 let builder = LogicalPlanBuilder::from(input)
994 .project(func_exprs)
995 .context(DataFusionPlanningSnafu)?
996 .filter(self.create_empty_values_filter_expr()?)
997 .context(DataFusionPlanningSnafu)?;
998
999 let builder = match func.name {
1000 "sort" => builder
1001 .sort(self.create_field_columns_sort_exprs(true))
1002 .context(DataFusionPlanningSnafu)?,
1003 "sort_desc" => builder
1004 .sort(self.create_field_columns_sort_exprs(false))
1005 .context(DataFusionPlanningSnafu)?,
1006 "sort_by_label" => builder
1007 .sort(Self::create_sort_exprs_by_tags(
1008 func.name,
1009 args.literals,
1010 true,
1011 )?)
1012 .context(DataFusionPlanningSnafu)?,
1013 "sort_by_label_desc" => builder
1014 .sort(Self::create_sort_exprs_by_tags(
1015 func.name,
1016 args.literals,
1017 false,
1018 )?)
1019 .context(DataFusionPlanningSnafu)?,
1020
1021 _ => builder,
1022 };
1023
1024 for tag in new_tags {
1027 self.ctx.tag_columns.push(tag);
1028 }
1029
1030 let plan = builder.build().context(DataFusionPlanningSnafu)?;
1031 common_telemetry::debug!("Created PromQL function plan: {plan:?} for {call_expr:?}");
1032
1033 Ok(plan)
1034 }
1035
1036 async fn prom_ext_expr_to_plan(
1037 &mut self,
1038 query_engine_state: &QueryEngineState,
1039 ext_expr: &promql_parser::parser::ast::Extension,
1040 ) -> Result<LogicalPlan> {
1041 let expr = &ext_expr.expr;
1043 let children = expr.children();
1044 let plan = self
1045 .prom_expr_to_plan(&children[0], query_engine_state)
1046 .await?;
1047 match expr.name() {
1053 "ANALYZE" => LogicalPlanBuilder::from(plan)
1054 .explain(false, true)
1055 .unwrap()
1056 .build()
1057 .context(DataFusionPlanningSnafu),
1058 "ANALYZE VERBOSE" => LogicalPlanBuilder::from(plan)
1059 .explain(true, true)
1060 .unwrap()
1061 .build()
1062 .context(DataFusionPlanningSnafu),
1063 "EXPLAIN" => LogicalPlanBuilder::from(plan)
1064 .explain(false, false)
1065 .unwrap()
1066 .build()
1067 .context(DataFusionPlanningSnafu),
1068 "EXPLAIN VERBOSE" => LogicalPlanBuilder::from(plan)
1069 .explain(true, false)
1070 .unwrap()
1071 .build()
1072 .context(DataFusionPlanningSnafu),
1073 _ => LogicalPlanBuilder::empty(true)
1074 .build()
1075 .context(DataFusionPlanningSnafu),
1076 }
1077 }
1078
1079 #[allow(clippy::mutable_key_type)]
1089 fn preprocess_label_matchers(
1090 &mut self,
1091 label_matchers: &Matchers,
1092 name: &Option<String>,
1093 ) -> Result<Matchers> {
1094 self.ctx.reset();
1095
1096 let metric_name;
1097 if let Some(name) = name.clone() {
1098 metric_name = Some(name);
1099 ensure!(
1100 label_matchers.find_matchers(METRIC_NAME).is_empty(),
1101 MultipleMetricMatchersSnafu
1102 );
1103 } else {
1104 let mut matches = label_matchers.find_matchers(METRIC_NAME);
1105 ensure!(!matches.is_empty(), NoMetricMatcherSnafu);
1106 ensure!(matches.len() == 1, MultipleMetricMatchersSnafu);
1107 ensure!(
1108 matches[0].op == MatchOp::Equal,
1109 UnsupportedMatcherOpSnafu {
1110 matcher_op: matches[0].op.to_string(),
1111 matcher: METRIC_NAME
1112 }
1113 );
1114 metric_name = matches.pop().map(|m| m.value);
1115 }
1116
1117 self.ctx.table_name = metric_name;
1118
1119 let mut matchers = HashSet::new();
1120 for matcher in &label_matchers.matchers {
1121 if matcher.name == FIELD_COLUMN_MATCHER {
1123 self.ctx
1124 .field_column_matcher
1125 .get_or_insert_default()
1126 .push(matcher.clone());
1127 } else if matcher.name == SCHEMA_COLUMN_MATCHER || matcher.name == DB_COLUMN_MATCHER {
1128 ensure!(
1129 matcher.op == MatchOp::Equal,
1130 UnsupportedMatcherOpSnafu {
1131 matcher: matcher.name.clone(),
1132 matcher_op: matcher.op.to_string(),
1133 }
1134 );
1135 self.ctx.schema_name = Some(matcher.value.clone());
1136 } else if matcher.name != METRIC_NAME {
1137 self.ctx.selector_matcher.push(matcher.clone());
1138 let _ = matchers.insert(matcher.clone());
1139 }
1140 }
1141
1142 Ok(Matchers::new(matchers.into_iter().collect()))
1143 }
1144
1145 async fn selector_to_series_normalize_plan(
1146 &mut self,
1147 offset: &Option<Offset>,
1148 label_matchers: Matchers,
1149 is_range_selector: bool,
1150 ) -> Result<LogicalPlan> {
1151 let table_ref = self.table_ref()?;
1153 let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
1154 let table_schema = table_scan.schema();
1155
1156 let offset_duration = match offset {
1158 Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
1159 Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
1160 None => 0,
1161 };
1162 let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
1163 if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
1164 scan_filters.push(time_index_filter);
1165 }
1166 table_scan = LogicalPlanBuilder::from(table_scan)
1167 .filter(conjunction(scan_filters).unwrap()) .context(DataFusionPlanningSnafu)?
1169 .build()
1170 .context(DataFusionPlanningSnafu)?;
1171
1172 if let Some(field_matchers) = &self.ctx.field_column_matcher {
1174 let col_set = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1175 let mut result_set = HashSet::new();
1177 let mut reverse_set = HashSet::new();
1179 for matcher in field_matchers {
1180 match &matcher.op {
1181 MatchOp::Equal => {
1182 if col_set.contains(&matcher.value) {
1183 let _ = result_set.insert(matcher.value.clone());
1184 } else {
1185 return Err(ColumnNotFoundSnafu {
1186 col: matcher.value.clone(),
1187 }
1188 .build());
1189 }
1190 }
1191 MatchOp::NotEqual => {
1192 if col_set.contains(&matcher.value) {
1193 let _ = reverse_set.insert(matcher.value.clone());
1194 } else {
1195 return Err(ColumnNotFoundSnafu {
1196 col: matcher.value.clone(),
1197 }
1198 .build());
1199 }
1200 }
1201 MatchOp::Re(regex) => {
1202 for col in &self.ctx.field_columns {
1203 if regex.is_match(col) {
1204 let _ = result_set.insert(col.clone());
1205 }
1206 }
1207 }
1208 MatchOp::NotRe(regex) => {
1209 for col in &self.ctx.field_columns {
1210 if regex.is_match(col) {
1211 let _ = reverse_set.insert(col.clone());
1212 }
1213 }
1214 }
1215 }
1216 }
1217 if result_set.is_empty() {
1219 result_set = col_set.into_iter().cloned().collect();
1220 }
1221 for col in reverse_set {
1222 let _ = result_set.remove(&col);
1223 }
1224
1225 self.ctx.field_columns = self
1227 .ctx
1228 .field_columns
1229 .drain(..)
1230 .filter(|col| result_set.contains(col))
1231 .collect();
1232
1233 let exprs = result_set
1234 .into_iter()
1235 .map(|col| DfExpr::Column(Column::new_unqualified(col)))
1236 .chain(self.create_tag_column_exprs()?)
1237 .chain(
1238 self.ctx
1239 .use_tsid
1240 .then_some(DfExpr::Column(Column::new_unqualified(
1241 DATA_SCHEMA_TSID_COLUMN_NAME,
1242 )))
1243 .into_iter(),
1244 )
1245 .chain(Some(self.create_time_index_column_expr()?))
1246 .collect::<Vec<_>>();
1247
1248 table_scan = LogicalPlanBuilder::from(table_scan)
1250 .project(exprs)
1251 .context(DataFusionPlanningSnafu)?
1252 .build()
1253 .context(DataFusionPlanningSnafu)?;
1254 }
1255
1256 let series_key_columns = if self.ctx.use_tsid {
1258 vec![DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]
1259 } else {
1260 self.ctx.tag_columns.clone()
1261 };
1262
1263 let sort_exprs = if self.ctx.use_tsid {
1264 vec![
1265 DfExpr::Column(Column::from_name(DATA_SCHEMA_TSID_COLUMN_NAME)).sort(true, true),
1266 self.create_time_index_column_expr()?.sort(true, true),
1267 ]
1268 } else {
1269 self.create_tag_and_time_index_column_sort_exprs()?
1270 };
1271
1272 let sort_plan = LogicalPlanBuilder::from(table_scan)
1273 .sort(sort_exprs)
1274 .context(DataFusionPlanningSnafu)?
1275 .build()
1276 .context(DataFusionPlanningSnafu)?;
1277
1278 let time_index_column =
1280 self.ctx
1281 .time_index_column
1282 .clone()
1283 .with_context(|| TimeIndexNotFoundSnafu {
1284 table: table_ref.to_string(),
1285 })?;
1286 let divide_plan = LogicalPlan::Extension(Extension {
1287 node: Arc::new(SeriesDivide::new(
1288 series_key_columns.clone(),
1289 time_index_column,
1290 sort_plan,
1291 )),
1292 });
1293
1294 if !is_range_selector && offset_duration == 0 {
1296 return Ok(divide_plan);
1297 }
1298 let series_normalize = SeriesNormalize::new(
1299 offset_duration,
1300 self.ctx
1301 .time_index_column
1302 .clone()
1303 .with_context(|| TimeIndexNotFoundSnafu {
1304 table: table_ref.to_quoted_string(),
1305 })?,
1306 is_range_selector,
1307 series_key_columns,
1308 divide_plan,
1309 );
1310 let logical_plan = LogicalPlan::Extension(Extension {
1311 node: Arc::new(series_normalize),
1312 });
1313
1314 Ok(logical_plan)
1315 }
1316
1317 fn agg_modifier_to_col(
1324 &mut self,
1325 input_schema: &DFSchemaRef,
1326 modifier: &Option<LabelModifier>,
1327 update_ctx: bool,
1328 ) -> Result<Vec<DfExpr>> {
1329 match modifier {
1330 None => {
1331 if update_ctx {
1332 self.ctx.tag_columns.clear();
1333 }
1334 Ok(vec![self.create_time_index_column_expr()?])
1335 }
1336 Some(LabelModifier::Include(labels)) => {
1337 if update_ctx {
1338 self.ctx.tag_columns.clear();
1339 }
1340 let mut exprs = Vec::with_capacity(labels.labels.len());
1341 for label in &labels.labels {
1342 if is_metric_engine_internal_column(label) {
1343 continue;
1344 }
1345 if let Some(column_name) = Self::find_case_sensitive_column(input_schema, label)
1347 {
1348 exprs.push(DfExpr::Column(Column::from_name(column_name.clone())));
1349
1350 if update_ctx {
1351 self.ctx.tag_columns.push(column_name);
1353 }
1354 }
1355 }
1356 exprs.push(self.create_time_index_column_expr()?);
1358
1359 Ok(exprs)
1360 }
1361 Some(LabelModifier::Exclude(labels)) => {
1362 let mut all_fields = input_schema
1363 .fields()
1364 .iter()
1365 .map(|f| f.name())
1366 .collect::<BTreeSet<_>>();
1367
1368 all_fields.retain(|col| !is_metric_engine_internal_column(col.as_str()));
1371
1372 for label in &labels.labels {
1375 let _ = all_fields.remove(label);
1376 }
1377
1378 if let Some(time_index) = &self.ctx.time_index_column {
1380 let _ = all_fields.remove(time_index);
1381 }
1382 for value in &self.ctx.field_columns {
1383 let _ = all_fields.remove(value);
1384 }
1385
1386 if update_ctx {
1387 self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
1389 }
1390
1391 let mut exprs = all_fields
1393 .into_iter()
1394 .map(|c| DfExpr::Column(Column::from(c)))
1395 .collect::<Vec<_>>();
1396
1397 exprs.push(self.create_time_index_column_expr()?);
1399
1400 Ok(exprs)
1401 }
1402 }
1403 }
1404
1405 pub fn matchers_to_expr(
1407 label_matchers: Matchers,
1408 table_schema: &DFSchemaRef,
1409 ) -> Result<Vec<DfExpr>> {
1410 let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
1411 for matcher in label_matchers.matchers {
1412 if matcher.name == SCHEMA_COLUMN_MATCHER
1413 || matcher.name == DB_COLUMN_MATCHER
1414 || matcher.name == FIELD_COLUMN_MATCHER
1415 {
1416 continue;
1417 }
1418
1419 let column_name = Self::find_case_sensitive_column(table_schema, matcher.name.as_str());
1420 let col = if let Some(column_name) = column_name {
1421 DfExpr::Column(Column::from_name(column_name))
1422 } else {
1423 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
1424 .alias(matcher.name.clone())
1425 };
1426 let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)), None);
1427 let expr = match matcher.op {
1428 MatchOp::Equal => col.eq(lit),
1429 MatchOp::NotEqual => col.not_eq(lit),
1430 MatchOp::Re(re) => {
1431 if re.as_str() == "^(?:.*)$" {
1437 continue;
1438 }
1439 if re.as_str() == "^(?:.+)$" {
1440 col.not_eq(DfExpr::Literal(
1441 ScalarValue::Utf8(Some(String::new())),
1442 None,
1443 ))
1444 } else {
1445 DfExpr::BinaryExpr(BinaryExpr {
1446 left: Box::new(col),
1447 op: Operator::RegexMatch,
1448 right: Box::new(DfExpr::Literal(
1449 ScalarValue::Utf8(Some(re.as_str().to_string())),
1450 None,
1451 )),
1452 })
1453 }
1454 }
1455 MatchOp::NotRe(re) => {
1456 if re.as_str() == "^(?:.*)$" {
1457 DfExpr::Literal(ScalarValue::Boolean(Some(false)), None)
1458 } else if re.as_str() == "^(?:.+)$" {
1459 col.eq(DfExpr::Literal(
1460 ScalarValue::Utf8(Some(String::new())),
1461 None,
1462 ))
1463 } else {
1464 DfExpr::BinaryExpr(BinaryExpr {
1465 left: Box::new(col),
1466 op: Operator::RegexNotMatch,
1467 right: Box::new(DfExpr::Literal(
1468 ScalarValue::Utf8(Some(re.as_str().to_string())),
1469 None,
1470 )),
1471 })
1472 }
1473 }
1474 };
1475 exprs.push(expr);
1476 }
1477
1478 Ok(exprs)
1479 }
1480
1481 fn find_case_sensitive_column(schema: &DFSchemaRef, column: &str) -> Option<String> {
1482 if is_metric_engine_internal_column(column) {
1483 return None;
1484 }
1485 schema
1486 .fields()
1487 .iter()
1488 .find(|field| field.name() == column)
1489 .map(|field| field.name().clone())
1490 }
1491
1492 fn table_from_source(&self, source: &Arc<dyn TableSource>) -> Result<table::TableRef> {
1493 Ok(source
1494 .as_any()
1495 .downcast_ref::<DefaultTableSource>()
1496 .context(UnknownTableSnafu)?
1497 .table_provider
1498 .as_any()
1499 .downcast_ref::<DfTableProviderAdapter>()
1500 .context(UnknownTableSnafu)?
1501 .table())
1502 }
1503
1504 fn table_ref(&self) -> Result<TableReference> {
1505 let table_name = self
1506 .ctx
1507 .table_name
1508 .clone()
1509 .context(TableNameNotFoundSnafu)?;
1510
1511 let table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1513 TableReference::partial(schema_name.as_str(), table_name.as_str())
1514 } else {
1515 TableReference::bare(table_name.as_str())
1516 };
1517
1518 Ok(table_ref)
1519 }
1520
1521 fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
1522 let start = self.ctx.start;
1523 let end = self.ctx.end;
1524 if end < start {
1525 return InvalidTimeRangeSnafu { start, end }.fail();
1526 }
1527 let lookback_delta = self.ctx.lookback_delta;
1528 let range = self.ctx.range.unwrap_or_default();
1529 let interval = self.ctx.interval;
1530 let time_index_expr = self.create_time_index_column_expr()?;
1531 let num_points = (end - start) / interval;
1532
1533 if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
1535 let single_time_range = time_index_expr
1536 .clone()
1537 .gt_eq(DfExpr::Literal(
1538 ScalarValue::TimestampMillisecond(
1539 Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
1540 None,
1541 ),
1542 None,
1543 ))
1544 .and(time_index_expr.lt_eq(DfExpr::Literal(
1545 ScalarValue::TimestampMillisecond(
1546 Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
1547 None,
1548 ),
1549 None,
1550 )));
1551 return Ok(Some(single_time_range));
1552 }
1553
1554 let mut filters = Vec::with_capacity(num_points as usize);
1556 for timestamp in (start..end).step_by(interval as usize) {
1557 filters.push(
1558 time_index_expr
1559 .clone()
1560 .gt_eq(DfExpr::Literal(
1561 ScalarValue::TimestampMillisecond(
1562 Some(timestamp - offset_duration - lookback_delta - range),
1563 None,
1564 ),
1565 None,
1566 ))
1567 .and(time_index_expr.clone().lt_eq(DfExpr::Literal(
1568 ScalarValue::TimestampMillisecond(
1569 Some(timestamp - offset_duration + lookback_delta),
1570 None,
1571 ),
1572 None,
1573 ))),
1574 )
1575 }
1576
1577 Ok(filters.into_iter().reduce(DfExpr::or))
1578 }
1579
1580 async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
1585 let provider = self
1586 .table_provider
1587 .resolve_table(table_ref.clone())
1588 .await
1589 .context(CatalogSnafu)?;
1590
1591 let logical_table = self.table_from_source(&provider)?;
1592
1593 let mut maybe_phy_table_ref = table_ref.clone();
1595 let mut scan_provider = provider;
1596 let mut table_id_filter: Option<u32> = None;
1597
1598 if logical_table.table_info().meta.engine == METRIC_ENGINE_NAME
1601 && let Some(physical_table_name) = logical_table
1602 .table_info()
1603 .meta
1604 .options
1605 .extra_options
1606 .get(LOGICAL_TABLE_METADATA_KEY)
1607 {
1608 let physical_table_ref = if let Some(schema_name) = &self.ctx.schema_name {
1609 TableReference::partial(schema_name.as_str(), physical_table_name.as_str())
1610 } else {
1611 TableReference::bare(physical_table_name.as_str())
1612 };
1613
1614 let physical_provider = match self
1615 .table_provider
1616 .resolve_table(physical_table_ref.clone())
1617 .await
1618 {
1619 Ok(provider) => provider,
1620 Err(e) if e.status_code() == StatusCode::TableNotFound => {
1621 scan_provider.clone()
1624 }
1625 Err(e) => return Err(e).context(CatalogSnafu),
1626 };
1627
1628 if !Arc::ptr_eq(&physical_provider, &scan_provider) {
1629 let physical_table = self.table_from_source(&physical_provider)?;
1631
1632 let has_table_id = physical_table
1633 .schema()
1634 .column_schema_by_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
1635 .is_some();
1636 let has_tsid = physical_table
1637 .schema()
1638 .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1639 .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1640
1641 if has_table_id && has_tsid {
1642 scan_provider = physical_provider;
1643 maybe_phy_table_ref = physical_table_ref;
1644 table_id_filter = Some(logical_table.table_info().ident.table_id);
1645 }
1646 }
1647 }
1648
1649 let scan_table = self.table_from_source(&scan_provider)?;
1650
1651 let use_tsid = table_id_filter.is_some()
1652 && scan_table
1653 .schema()
1654 .column_schema_by_name(DATA_SCHEMA_TSID_COLUMN_NAME)
1655 .is_some_and(|col| matches!(col.data_type, ConcreteDataType::UInt64(_)));
1656 self.ctx.use_tsid = use_tsid;
1657
1658 let all_table_tags = self.ctx.tag_columns.clone();
1659
1660 let scan_tag_columns = if use_tsid {
1661 let mut scan_tags = self.ctx.tag_columns.clone();
1662 for matcher in &self.ctx.selector_matcher {
1663 if is_metric_engine_internal_column(&matcher.name) {
1664 continue;
1665 }
1666 if all_table_tags.iter().any(|tag| tag == &matcher.name) {
1667 scan_tags.push(matcher.name.clone());
1668 }
1669 }
1670 scan_tags.sort_unstable();
1671 scan_tags.dedup();
1672 scan_tags
1673 } else {
1674 self.ctx.tag_columns.clone()
1675 };
1676
1677 let is_time_index_ms = scan_table
1678 .schema()
1679 .timestamp_column()
1680 .with_context(|| TimeIndexNotFoundSnafu {
1681 table: maybe_phy_table_ref.to_quoted_string(),
1682 })?
1683 .data_type
1684 == ConcreteDataType::timestamp_millisecond_datatype();
1685
1686 let scan_projection = if table_id_filter.is_some() {
1687 let mut required_columns = HashSet::new();
1688 required_columns.insert(DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string());
1689 required_columns.insert(self.ctx.time_index_column.clone().with_context(|| {
1690 TimeIndexNotFoundSnafu {
1691 table: maybe_phy_table_ref.to_quoted_string(),
1692 }
1693 })?);
1694 for col in &scan_tag_columns {
1695 required_columns.insert(col.clone());
1696 }
1697 for col in &self.ctx.field_columns {
1698 required_columns.insert(col.clone());
1699 }
1700 if use_tsid {
1701 required_columns.insert(DATA_SCHEMA_TSID_COLUMN_NAME.to_string());
1702 }
1703
1704 let arrow_schema = scan_table.schema().arrow_schema().clone();
1705 Some(
1706 arrow_schema
1707 .fields()
1708 .iter()
1709 .enumerate()
1710 .filter(|(_, field)| required_columns.contains(field.name().as_str()))
1711 .map(|(idx, _)| idx)
1712 .collect::<Vec<_>>(),
1713 )
1714 } else {
1715 None
1716 };
1717
1718 let mut scan_plan =
1719 LogicalPlanBuilder::scan(maybe_phy_table_ref.clone(), scan_provider, scan_projection)
1720 .context(DataFusionPlanningSnafu)?
1721 .build()
1722 .context(DataFusionPlanningSnafu)?;
1723
1724 if let Some(table_id) = table_id_filter {
1725 scan_plan = LogicalPlanBuilder::from(scan_plan)
1726 .filter(
1727 DfExpr::Column(Column::from_name(DATA_SCHEMA_TABLE_ID_COLUMN_NAME))
1728 .eq(lit(table_id)),
1729 )
1730 .context(DataFusionPlanningSnafu)?
1731 .alias(table_ref.clone()) .context(DataFusionPlanningSnafu)?
1733 .build()
1734 .context(DataFusionPlanningSnafu)?;
1735 }
1736
1737 if !is_time_index_ms {
1738 let expr: Vec<_> = self
1740 .create_field_column_exprs()?
1741 .into_iter()
1742 .chain(
1743 scan_tag_columns
1744 .iter()
1745 .map(|tag| DfExpr::Column(Column::from_name(tag))),
1746 )
1747 .chain(
1748 self.ctx
1749 .use_tsid
1750 .then_some(DfExpr::Column(Column::new(
1751 Some(table_ref.clone()),
1752 DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
1753 )))
1754 .into_iter(),
1755 )
1756 .chain(Some(DfExpr::Alias(Alias {
1757 expr: Box::new(DfExpr::Cast(Cast {
1758 expr: Box::new(self.create_time_index_column_expr()?),
1759 data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
1760 })),
1761 relation: Some(table_ref.clone()),
1762 name: self
1763 .ctx
1764 .time_index_column
1765 .as_ref()
1766 .with_context(|| TimeIndexNotFoundSnafu {
1767 table: table_ref.to_quoted_string(),
1768 })?
1769 .clone(),
1770 metadata: None,
1771 })))
1772 .collect::<Vec<_>>();
1773 scan_plan = LogicalPlanBuilder::from(scan_plan)
1774 .project(expr)
1775 .context(DataFusionPlanningSnafu)?
1776 .build()
1777 .context(DataFusionPlanningSnafu)?;
1778 } else if table_id_filter.is_some() {
1779 let project_exprs = self
1781 .create_field_column_exprs()?
1782 .into_iter()
1783 .chain(
1784 scan_tag_columns
1785 .iter()
1786 .map(|tag| DfExpr::Column(Column::from_name(tag))),
1787 )
1788 .chain(
1789 self.ctx
1790 .use_tsid
1791 .then_some(DfExpr::Column(Column::from_name(
1792 DATA_SCHEMA_TSID_COLUMN_NAME,
1793 )))
1794 .into_iter(),
1795 )
1796 .chain(Some(self.create_time_index_column_expr()?))
1797 .collect::<Vec<_>>();
1798
1799 scan_plan = LogicalPlanBuilder::from(scan_plan)
1800 .project(project_exprs)
1801 .context(DataFusionPlanningSnafu)?
1802 .build()
1803 .context(DataFusionPlanningSnafu)?;
1804 }
1805
1806 let result = LogicalPlanBuilder::from(scan_plan)
1807 .build()
1808 .context(DataFusionPlanningSnafu)?;
1809 Ok(result)
1810 }
1811
1812 fn collect_row_key_tag_columns_from_plan(
1813 &self,
1814 plan: &LogicalPlan,
1815 ) -> Result<BTreeSet<String>> {
1816 fn walk(
1817 planner: &PromPlanner,
1818 plan: &LogicalPlan,
1819 out: &mut BTreeSet<String>,
1820 ) -> Result<()> {
1821 if let LogicalPlan::TableScan(scan) = plan {
1822 let table = planner.table_from_source(&scan.source)?;
1823 for col in table.table_info().meta.row_key_column_names() {
1824 if col != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
1825 && col != DATA_SCHEMA_TSID_COLUMN_NAME
1826 && !is_metric_engine_internal_column(col)
1827 {
1828 out.insert(col.clone());
1829 }
1830 }
1831 }
1832
1833 for input in plan.inputs() {
1834 walk(planner, input, out)?;
1835 }
1836 Ok(())
1837 }
1838
1839 let mut out = BTreeSet::new();
1840 walk(self, plan, &mut out)?;
1841 Ok(out)
1842 }
1843
1844 fn ensure_tag_columns_available(
1845 &self,
1846 plan: LogicalPlan,
1847 required_tags: &BTreeSet<String>,
1848 ) -> Result<LogicalPlan> {
1849 if required_tags.is_empty() {
1850 return Ok(plan);
1851 }
1852
1853 struct Rewriter {
1854 required_tags: BTreeSet<String>,
1855 }
1856
1857 impl TreeNodeRewriter for Rewriter {
1858 type Node = LogicalPlan;
1859
1860 fn f_up(
1861 &mut self,
1862 node: Self::Node,
1863 ) -> datafusion_common::Result<Transformed<Self::Node>> {
1864 match node {
1865 LogicalPlan::TableScan(scan) => {
1866 let schema = scan.source.schema();
1867 let mut projection = match scan.projection.clone() {
1868 Some(p) => p,
1869 None => {
1870 return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1872 }
1873 };
1874
1875 let mut changed = false;
1876 for tag in &self.required_tags {
1877 if let Some((idx, _)) = schema
1878 .fields()
1879 .iter()
1880 .enumerate()
1881 .find(|(_, field)| field.name() == tag)
1882 && !projection.contains(&idx)
1883 {
1884 projection.push(idx);
1885 changed = true;
1886 }
1887 }
1888
1889 if !changed {
1890 return Ok(Transformed::no(LogicalPlan::TableScan(scan)));
1891 }
1892
1893 projection.sort_unstable();
1894 projection.dedup();
1895
1896 let new_scan = TableScan::try_new(
1897 scan.table_name.clone(),
1898 scan.source.clone(),
1899 Some(projection),
1900 scan.filters,
1901 scan.fetch,
1902 )?;
1903 Ok(Transformed::yes(LogicalPlan::TableScan(new_scan)))
1904 }
1905 LogicalPlan::Projection(proj) => {
1906 let input_schema = proj.input.schema();
1907
1908 let existing = proj
1909 .schema
1910 .fields()
1911 .iter()
1912 .map(|f| f.name().as_str())
1913 .collect::<HashSet<_>>();
1914
1915 let mut expr = proj.expr.clone();
1916 let mut has_changed = false;
1917 for tag in &self.required_tags {
1918 if existing.contains(tag.as_str()) {
1919 continue;
1920 }
1921
1922 if let Some(idx) = input_schema.index_of_column_by_name(None, tag) {
1923 expr.push(DfExpr::Column(Column::from(
1924 input_schema.qualified_field(idx),
1925 )));
1926 has_changed = true;
1927 }
1928 }
1929
1930 if !has_changed {
1931 return Ok(Transformed::no(LogicalPlan::Projection(proj)));
1932 }
1933
1934 let new_proj = Projection::try_new(expr, proj.input)?;
1935 Ok(Transformed::yes(LogicalPlan::Projection(new_proj)))
1936 }
1937 other => Ok(Transformed::no(other)),
1938 }
1939 }
1940 }
1941
1942 let mut rewriter = Rewriter {
1943 required_tags: required_tags.clone(),
1944 };
1945 let rewritten = plan
1946 .rewrite(&mut rewriter)
1947 .context(DataFusionPlanningSnafu)?;
1948 Ok(rewritten.data)
1949 }
1950
1951 fn refresh_tag_columns_from_schema(&mut self, schema: &DFSchemaRef) {
1952 let time_index = self.ctx.time_index_column.as_deref();
1953 let field_columns = self.ctx.field_columns.iter().collect::<HashSet<_>>();
1954
1955 let mut tags = schema
1956 .fields()
1957 .iter()
1958 .map(|f| f.name())
1959 .filter(|name| Some(name.as_str()) != time_index)
1960 .filter(|name| !field_columns.contains(name))
1961 .filter(|name| !is_metric_engine_internal_column(name))
1962 .cloned()
1963 .collect::<Vec<_>>();
1964 tags.sort_unstable();
1965 tags.dedup();
1966 self.ctx.tag_columns = tags;
1967 }
1968
1969 async fn setup_context(&mut self) -> Result<Option<LogicalPlan>> {
1973 let table_ref = self.table_ref()?;
1974 let source = match self.table_provider.resolve_table(table_ref.clone()).await {
1975 Err(e) if e.status_code() == StatusCode::TableNotFound => {
1976 let plan = self.setup_context_for_empty_metric()?;
1977 return Ok(Some(plan));
1978 }
1979 res => res.context(CatalogSnafu)?,
1980 };
1981 let table = self.table_from_source(&source)?;
1982
1983 let time_index = table
1985 .schema()
1986 .timestamp_column()
1987 .with_context(|| TimeIndexNotFoundSnafu {
1988 table: table_ref.to_quoted_string(),
1989 })?
1990 .name
1991 .clone();
1992 self.ctx.time_index_column = Some(time_index);
1993
1994 let values = table
1996 .table_info()
1997 .meta
1998 .field_column_names()
1999 .cloned()
2000 .collect();
2001 self.ctx.field_columns = values;
2002
2003 let tags = table
2005 .table_info()
2006 .meta
2007 .row_key_column_names()
2008 .filter(|col| {
2009 col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME
2011 })
2012 .cloned()
2013 .collect();
2014 self.ctx.tag_columns = tags;
2015
2016 self.ctx.use_tsid = false;
2017
2018 Ok(None)
2019 }
2020
2021 fn setup_context_for_empty_metric(&mut self) -> Result<LogicalPlan> {
2024 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
2025 self.ctx.reset_table_name_and_schema();
2026 self.ctx.tag_columns = vec![];
2027 self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()];
2028 self.ctx.use_tsid = false;
2029
2030 let plan = LogicalPlan::Extension(Extension {
2032 node: Arc::new(
2033 EmptyMetric::new(
2034 0,
2035 -1,
2036 self.ctx.interval,
2037 SPECIAL_TIME_FUNCTION.to_string(),
2038 DEFAULT_FIELD_COLUMN.to_string(),
2039 Some(lit(0.0f64)),
2040 )
2041 .context(DataFusionPlanningSnafu)?,
2042 ),
2043 });
2044 Ok(plan)
2045 }
2046
2047 fn create_function_args(&self, args: &[Box<PromExpr>]) -> Result<FunctionArgs> {
2049 let mut result = FunctionArgs::default();
2050
2051 for arg in args {
2052 if let Some(expr) = Self::try_build_literal_expr(arg) {
2054 result.literals.push(expr);
2055 } else {
2056 match arg.as_ref() {
2058 PromExpr::Subquery(_)
2059 | PromExpr::VectorSelector(_)
2060 | PromExpr::MatrixSelector(_)
2061 | PromExpr::Extension(_)
2062 | PromExpr::Aggregate(_)
2063 | PromExpr::Paren(_)
2064 | PromExpr::Call(_)
2065 | PromExpr::Binary(_)
2066 | PromExpr::Unary(_) => {
2067 if result.input.replace(*arg.clone()).is_some() {
2068 MultipleVectorSnafu { expr: *arg.clone() }.fail()?;
2069 }
2070 }
2071
2072 _ => {
2073 let expr = Self::get_param_as_literal_expr(&Some(arg.clone()), None, None)?;
2074 result.literals.push(expr);
2075 }
2076 }
2077 }
2078 }
2079
2080 Ok(result)
2081 }
2082
2083 fn create_function_expr(
2089 &mut self,
2090 func: &Function,
2091 other_input_exprs: Vec<DfExpr>,
2092 query_engine_state: &QueryEngineState,
2093 ) -> Result<(Vec<DfExpr>, Vec<String>)> {
2094 let mut other_input_exprs: VecDeque<DfExpr> = other_input_exprs.into();
2096
2097 let field_column_pos = 0;
2099 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2100 let mut new_tags = vec![];
2102 let scalar_func = match func.name {
2103 "increase" => ScalarFunc::ExtrapolateUdf(
2104 Arc::new(Increase::scalar_udf()),
2105 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2106 ),
2107 "rate" => ScalarFunc::ExtrapolateUdf(
2108 Arc::new(Rate::scalar_udf()),
2109 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2110 ),
2111 "delta" => ScalarFunc::ExtrapolateUdf(
2112 Arc::new(Delta::scalar_udf()),
2113 self.ctx.range.context(ExpectRangeSelectorSnafu)?,
2114 ),
2115 "idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
2116 "irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
2117 "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
2118 "changes" => ScalarFunc::Udf(Arc::new(Changes::scalar_udf())),
2119 "deriv" => ScalarFunc::Udf(Arc::new(Deriv::scalar_udf())),
2120 "avg_over_time" => ScalarFunc::Udf(Arc::new(AvgOverTime::scalar_udf())),
2121 "min_over_time" => ScalarFunc::Udf(Arc::new(MinOverTime::scalar_udf())),
2122 "max_over_time" => ScalarFunc::Udf(Arc::new(MaxOverTime::scalar_udf())),
2123 "sum_over_time" => ScalarFunc::Udf(Arc::new(SumOverTime::scalar_udf())),
2124 "count_over_time" => ScalarFunc::Udf(Arc::new(CountOverTime::scalar_udf())),
2125 "last_over_time" => ScalarFunc::Udf(Arc::new(LastOverTime::scalar_udf())),
2126 "absent_over_time" => ScalarFunc::Udf(Arc::new(AbsentOverTime::scalar_udf())),
2127 "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
2128 "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
2129 "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
2130 "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
2131 "predict_linear" => {
2132 other_input_exprs[0] = DfExpr::Cast(Cast {
2133 expr: Box::new(other_input_exprs[0].clone()),
2134 data_type: ArrowDataType::Int64,
2135 });
2136 ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf()))
2137 }
2138 "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
2139 "time" => {
2140 exprs.push(build_special_time_expr(
2141 self.ctx.time_index_column.as_ref().unwrap(),
2142 ));
2143 ScalarFunc::GeneratedExpr
2144 }
2145 "minute" => {
2146 let expr = self.date_part_on_time_index("minute")?;
2148 exprs.push(expr);
2149 ScalarFunc::GeneratedExpr
2150 }
2151 "hour" => {
2152 let expr = self.date_part_on_time_index("hour")?;
2154 exprs.push(expr);
2155 ScalarFunc::GeneratedExpr
2156 }
2157 "month" => {
2158 let expr = self.date_part_on_time_index("month")?;
2160 exprs.push(expr);
2161 ScalarFunc::GeneratedExpr
2162 }
2163 "year" => {
2164 let expr = self.date_part_on_time_index("year")?;
2166 exprs.push(expr);
2167 ScalarFunc::GeneratedExpr
2168 }
2169 "day_of_month" => {
2170 let expr = self.date_part_on_time_index("day")?;
2172 exprs.push(expr);
2173 ScalarFunc::GeneratedExpr
2174 }
2175 "day_of_week" => {
2176 let expr = self.date_part_on_time_index("dow")?;
2178 exprs.push(expr);
2179 ScalarFunc::GeneratedExpr
2180 }
2181 "day_of_year" => {
2182 let expr = self.date_part_on_time_index("doy")?;
2184 exprs.push(expr);
2185 ScalarFunc::GeneratedExpr
2186 }
2187 "days_in_month" => {
2188 let day_lit_expr = "day".lit();
2193 let month_lit_expr = "month".lit();
2194 let interval_1month_lit_expr =
2195 DfExpr::Literal(ScalarValue::IntervalYearMonth(Some(1)), None);
2196 let interval_1day_lit_expr = DfExpr::Literal(
2197 ScalarValue::IntervalDayTime(Some(IntervalDayTime::new(1, 0))),
2198 None,
2199 );
2200 let the_1month_minus_1day_expr = DfExpr::BinaryExpr(BinaryExpr {
2201 left: Box::new(interval_1month_lit_expr),
2202 op: Operator::Minus,
2203 right: Box::new(interval_1day_lit_expr),
2204 });
2205 let date_trunc_expr = DfExpr::ScalarFunction(ScalarFunction {
2206 func: datafusion_functions::datetime::date_trunc(),
2207 args: vec![month_lit_expr, self.create_time_index_column_expr()?],
2208 });
2209 let date_trunc_plus_interval_expr = DfExpr::BinaryExpr(BinaryExpr {
2210 left: Box::new(date_trunc_expr),
2211 op: Operator::Plus,
2212 right: Box::new(the_1month_minus_1day_expr),
2213 });
2214 let date_part_expr = DfExpr::ScalarFunction(ScalarFunction {
2215 func: datafusion_functions::datetime::date_part(),
2216 args: vec![day_lit_expr, date_trunc_plus_interval_expr],
2217 });
2218
2219 exprs.push(date_part_expr);
2220 ScalarFunc::GeneratedExpr
2221 }
2222
2223 "label_join" => {
2224 let (concat_expr, dst_label) = Self::build_concat_labels_expr(
2225 &mut other_input_exprs,
2226 &self.ctx,
2227 query_engine_state,
2228 )?;
2229
2230 for value in &self.ctx.field_columns {
2232 if *value != dst_label {
2233 let expr = DfExpr::Column(Column::from_name(value));
2234 exprs.push(expr);
2235 }
2236 }
2237
2238 self.ctx.tag_columns.retain(|tag| *tag != dst_label);
2240 new_tags.push(dst_label);
2241 exprs.push(concat_expr);
2243
2244 ScalarFunc::GeneratedExpr
2245 }
2246 "label_replace" => {
2247 if let Some((replace_expr, dst_label)) = self
2248 .build_regexp_replace_label_expr(&mut other_input_exprs, query_engine_state)?
2249 {
2250 for value in &self.ctx.field_columns {
2252 if *value != dst_label {
2253 let expr = DfExpr::Column(Column::from_name(value));
2254 exprs.push(expr);
2255 }
2256 }
2257
2258 ensure!(
2259 !self.ctx.tag_columns.contains(&dst_label),
2260 SameLabelSetSnafu
2261 );
2262 new_tags.push(dst_label);
2263 exprs.push(replace_expr);
2265 } else {
2266 for value in &self.ctx.field_columns {
2268 let expr = DfExpr::Column(Column::from_name(value));
2269 exprs.push(expr);
2270 }
2271 }
2272
2273 ScalarFunc::GeneratedExpr
2274 }
2275 "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" | "timestamp" => {
2276 for value in &self.ctx.field_columns {
2279 let expr = DfExpr::Column(Column::from_name(value));
2280 exprs.push(expr);
2281 }
2282
2283 ScalarFunc::GeneratedExpr
2284 }
2285 "round" => {
2286 if other_input_exprs.is_empty() {
2287 other_input_exprs.push_front(0.0f64.lit());
2288 }
2289 ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
2290 }
2291 "rad" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::radians()),
2292 "deg" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::degrees()),
2293 "sgn" => ScalarFunc::DataFusionBuiltin(datafusion::functions::math::signum()),
2294 "pi" => {
2295 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2297 func: datafusion::functions::math::pi(),
2298 args: vec![],
2299 });
2300 exprs.push(fn_expr);
2301
2302 ScalarFunc::GeneratedExpr
2303 }
2304 _ => {
2305 if let Some(f) = query_engine_state
2306 .session_state()
2307 .scalar_functions()
2308 .get(func.name)
2309 {
2310 ScalarFunc::DataFusionBuiltin(f.clone())
2311 } else if let Some(factory) = query_engine_state.scalar_function(func.name) {
2312 let func_state = query_engine_state.function_state();
2313 let query_ctx = self.table_provider.query_ctx();
2314
2315 ScalarFunc::DataFusionUdf(Arc::new(factory.provide(FunctionContext {
2316 state: func_state,
2317 query_ctx: query_ctx.clone(),
2318 })))
2319 } else if let Some(f) = datafusion_functions::math::functions()
2320 .iter()
2321 .find(|f| f.name() == func.name)
2322 {
2323 ScalarFunc::DataFusionUdf(f.clone())
2324 } else {
2325 return UnsupportedExprSnafu {
2326 name: func.name.to_string(),
2327 }
2328 .fail();
2329 }
2330 }
2331 };
2332
2333 for value in &self.ctx.field_columns {
2334 let col_expr = DfExpr::Column(Column::from_name(value));
2335
2336 match scalar_func.clone() {
2337 ScalarFunc::DataFusionBuiltin(func) => {
2338 other_input_exprs.insert(field_column_pos, col_expr);
2339 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2340 func,
2341 args: other_input_exprs.clone().into(),
2342 });
2343 exprs.push(fn_expr);
2344 let _ = other_input_exprs.remove(field_column_pos);
2345 }
2346 ScalarFunc::DataFusionUdf(func) => {
2347 let args = itertools::chain!(
2348 other_input_exprs.iter().take(field_column_pos).cloned(),
2349 std::iter::once(col_expr),
2350 other_input_exprs.iter().skip(field_column_pos).cloned()
2351 )
2352 .collect_vec();
2353 exprs.push(DfExpr::ScalarFunction(ScalarFunction { func, args }))
2354 }
2355 ScalarFunc::Udf(func) => {
2356 let ts_range_expr = DfExpr::Column(Column::from_name(
2357 RangeManipulate::build_timestamp_range_name(
2358 self.ctx.time_index_column.as_ref().unwrap(),
2359 ),
2360 ));
2361 other_input_exprs.insert(field_column_pos, ts_range_expr);
2362 other_input_exprs.insert(field_column_pos + 1, col_expr);
2363 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2364 func,
2365 args: other_input_exprs.clone().into(),
2366 });
2367 exprs.push(fn_expr);
2368 let _ = other_input_exprs.remove(field_column_pos + 1);
2369 let _ = other_input_exprs.remove(field_column_pos);
2370 }
2371 ScalarFunc::ExtrapolateUdf(func, range_length) => {
2372 let ts_range_expr = DfExpr::Column(Column::from_name(
2373 RangeManipulate::build_timestamp_range_name(
2374 self.ctx.time_index_column.as_ref().unwrap(),
2375 ),
2376 ));
2377 other_input_exprs.insert(field_column_pos, ts_range_expr);
2378 other_input_exprs.insert(field_column_pos + 1, col_expr);
2379 other_input_exprs
2380 .insert(field_column_pos + 2, self.create_time_index_column_expr()?);
2381 other_input_exprs.push_back(lit(range_length));
2382 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
2383 func,
2384 args: other_input_exprs.clone().into(),
2385 });
2386 exprs.push(fn_expr);
2387 let _ = other_input_exprs.pop_back();
2388 let _ = other_input_exprs.remove(field_column_pos + 2);
2389 let _ = other_input_exprs.remove(field_column_pos + 1);
2390 let _ = other_input_exprs.remove(field_column_pos);
2391 }
2392 ScalarFunc::GeneratedExpr => {}
2393 }
2394 }
2395
2396 if !matches!(func.name, "label_join" | "label_replace") {
2400 let mut new_field_columns = Vec::with_capacity(exprs.len());
2401
2402 exprs = exprs
2403 .into_iter()
2404 .map(|expr| {
2405 let display_name = expr.schema_name().to_string();
2406 new_field_columns.push(display_name.clone());
2407 Ok(expr.alias(display_name))
2408 })
2409 .collect::<std::result::Result<Vec<_>, _>>()
2410 .context(DataFusionPlanningSnafu)?;
2411
2412 self.ctx.field_columns = new_field_columns;
2413 }
2414
2415 Ok((exprs, new_tags))
2416 }
2417
2418 fn validate_label_name(label_name: &str) -> Result<()> {
2422 if label_name.starts_with("__") {
2424 return InvalidDestinationLabelNameSnafu { label_name }.fail();
2425 }
2426 if !LABEL_NAME_REGEX.is_match(label_name) {
2428 return InvalidDestinationLabelNameSnafu { label_name }.fail();
2429 }
2430
2431 Ok(())
2432 }
2433
2434 fn build_regexp_replace_label_expr(
2436 &self,
2437 other_input_exprs: &mut VecDeque<DfExpr>,
2438 query_engine_state: &QueryEngineState,
2439 ) -> Result<Option<(DfExpr, String)>> {
2440 let dst_label = match other_input_exprs.pop_front() {
2442 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2443 other => UnexpectedPlanExprSnafu {
2444 desc: format!("expected dst_label string literal, but found {:?}", other),
2445 }
2446 .fail()?,
2447 };
2448
2449 Self::validate_label_name(&dst_label)?;
2451 let replacement = match other_input_exprs.pop_front() {
2452 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), _)) => r,
2453 other => UnexpectedPlanExprSnafu {
2454 desc: format!("expected replacement string literal, but found {:?}", other),
2455 }
2456 .fail()?,
2457 };
2458 let src_label = match other_input_exprs.pop_front() {
2459 Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)), None)) => s,
2460 other => UnexpectedPlanExprSnafu {
2461 desc: format!("expected src_label string literal, but found {:?}", other),
2462 }
2463 .fail()?,
2464 };
2465
2466 let regex = match other_input_exprs.pop_front() {
2467 Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)), None)) => r,
2468 other => UnexpectedPlanExprSnafu {
2469 desc: format!("expected regex string literal, but found {:?}", other),
2470 }
2471 .fail()?,
2472 };
2473
2474 regex::Regex::new(®ex).map_err(|_| {
2477 InvalidRegularExpressionSnafu {
2478 regex: regex.clone(),
2479 }
2480 .build()
2481 })?;
2482
2483 if self.ctx.tag_columns.contains(&src_label) && regex.is_empty() {
2485 return Ok(None);
2486 }
2487
2488 if !self.ctx.tag_columns.contains(&src_label) {
2490 if replacement.is_empty() {
2491 return Ok(None);
2493 } else {
2494 return Ok(Some((
2496 lit(replacement).alias(&dst_label),
2498 dst_label,
2499 )));
2500 }
2501 }
2502
2503 let regex = format!("^(?s:{regex})$");
2506
2507 let session_state = query_engine_state.session_state();
2508 let func = session_state
2509 .scalar_functions()
2510 .get("regexp_replace")
2511 .context(UnsupportedExprSnafu {
2512 name: "regexp_replace",
2513 })?;
2514
2515 let args = vec![
2517 if src_label.is_empty() {
2518 DfExpr::Literal(ScalarValue::Utf8(Some(String::new())), None)
2519 } else {
2520 DfExpr::Column(Column::from_name(src_label))
2521 },
2522 DfExpr::Literal(ScalarValue::Utf8(Some(regex)), None),
2523 DfExpr::Literal(ScalarValue::Utf8(Some(replacement)), None),
2524 ];
2525
2526 Ok(Some((
2527 DfExpr::ScalarFunction(ScalarFunction {
2528 func: func.clone(),
2529 args,
2530 })
2531 .alias(&dst_label),
2532 dst_label,
2533 )))
2534 }
2535
2536 fn build_concat_labels_expr(
2538 other_input_exprs: &mut VecDeque<DfExpr>,
2539 ctx: &PromPlannerContext,
2540 query_engine_state: &QueryEngineState,
2541 ) -> Result<(DfExpr, String)> {
2542 let dst_label = match other_input_exprs.pop_front() {
2545 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2546 other => UnexpectedPlanExprSnafu {
2547 desc: format!("expected dst_label string literal, but found {:?}", other),
2548 }
2549 .fail()?,
2550 };
2551 let separator = match other_input_exprs.pop_front() {
2552 Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)), _)) => d,
2553 other => UnexpectedPlanExprSnafu {
2554 desc: format!("expected separator string literal, but found {:?}", other),
2555 }
2556 .fail()?,
2557 };
2558
2559 let available_columns: HashSet<&str> = ctx
2561 .tag_columns
2562 .iter()
2563 .chain(ctx.field_columns.iter())
2564 .chain(ctx.time_index_column.as_ref())
2565 .map(|s| s.as_str())
2566 .collect();
2567
2568 let src_labels = other_input_exprs
2569 .iter()
2570 .map(|expr| {
2571 match expr {
2573 DfExpr::Literal(ScalarValue::Utf8(Some(label)), None) => {
2574 if label.is_empty() {
2575 Ok(DfExpr::Literal(ScalarValue::Null, None))
2576 } else if available_columns.contains(label.as_str()) {
2577 Ok(DfExpr::Column(Column::from_name(label)))
2579 } else {
2580 Ok(DfExpr::Literal(ScalarValue::Null, None))
2582 }
2583 }
2584 other => UnexpectedPlanExprSnafu {
2585 desc: format!(
2586 "expected source label string literal, but found {:?}",
2587 other
2588 ),
2589 }
2590 .fail(),
2591 }
2592 })
2593 .collect::<Result<Vec<_>>>()?;
2594 ensure!(
2595 !src_labels.is_empty(),
2596 FunctionInvalidArgumentSnafu {
2597 fn_name: "label_join"
2598 }
2599 );
2600
2601 let session_state = query_engine_state.session_state();
2602 let func = session_state
2603 .scalar_functions()
2604 .get("concat_ws")
2605 .context(UnsupportedExprSnafu { name: "concat_ws" })?;
2606
2607 let mut args = Vec::with_capacity(1 + src_labels.len());
2609 args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)), None));
2610 args.extend(src_labels);
2611
2612 Ok((
2613 DfExpr::ScalarFunction(ScalarFunction {
2614 func: func.clone(),
2615 args,
2616 })
2617 .alias(&dst_label),
2618 dst_label,
2619 ))
2620 }
2621
2622 fn create_time_index_column_expr(&self) -> Result<DfExpr> {
2623 Ok(DfExpr::Column(Column::from_name(
2624 self.ctx
2625 .time_index_column
2626 .clone()
2627 .with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
2628 )))
2629 }
2630
2631 fn create_tag_column_exprs(&self) -> Result<Vec<DfExpr>> {
2632 let mut result = Vec::with_capacity(self.ctx.tag_columns.len());
2633 for tag in &self.ctx.tag_columns {
2634 let expr = DfExpr::Column(Column::from_name(tag));
2635 result.push(expr);
2636 }
2637 Ok(result)
2638 }
2639
2640 fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
2641 let mut result = Vec::with_capacity(self.ctx.field_columns.len());
2642 for field in &self.ctx.field_columns {
2643 let expr = DfExpr::Column(Column::from_name(field));
2644 result.push(expr);
2645 }
2646 Ok(result)
2647 }
2648
2649 fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
2650 let mut result = self
2651 .ctx
2652 .tag_columns
2653 .iter()
2654 .map(|col| DfExpr::Column(Column::from_name(col)).sort(true, true))
2655 .collect::<Vec<_>>();
2656 result.push(self.create_time_index_column_expr()?.sort(true, true));
2657 Ok(result)
2658 }
2659
2660 fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
2661 self.ctx
2662 .field_columns
2663 .iter()
2664 .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, true))
2665 .collect::<Vec<_>>()
2666 }
2667
2668 fn create_sort_exprs_by_tags(
2669 func: &str,
2670 tags: Vec<DfExpr>,
2671 asc: bool,
2672 ) -> Result<Vec<SortExpr>> {
2673 ensure!(
2674 !tags.is_empty(),
2675 FunctionInvalidArgumentSnafu { fn_name: func }
2676 );
2677
2678 tags.iter()
2679 .map(|col| match col {
2680 DfExpr::Literal(ScalarValue::Utf8(Some(label)), _) => {
2681 Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
2682 }
2683 other => UnexpectedPlanExprSnafu {
2684 desc: format!("expected label string literal, but found {:?}", other),
2685 }
2686 .fail(),
2687 })
2688 .collect::<Result<Vec<_>>>()
2689 }
2690
2691 fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
2692 let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
2693 for value in &self.ctx.field_columns {
2694 let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
2695 exprs.push(expr);
2696 }
2697
2698 conjunction(exprs).context(ValueNotFoundSnafu {
2699 table: self.table_ref()?.to_quoted_string(),
2700 })
2701 }
2702
2703 fn create_aggregate_exprs(
2719 &mut self,
2720 op: TokenType,
2721 param: &Option<Box<PromExpr>>,
2722 input_plan: &LogicalPlan,
2723 ) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
2724 let mut non_col_args = Vec::new();
2725 let aggr = match op.id() {
2726 token::T_SUM => sum_udaf(),
2727 token::T_QUANTILE => {
2728 let q =
2729 Self::get_param_as_literal_expr(param, Some(op), Some(ArrowDataType::Float64))?;
2730 non_col_args.push(q);
2731 quantile_udaf()
2732 }
2733 token::T_AVG => avg_udaf(),
2734 token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
2735 token::T_MIN => min_udaf(),
2736 token::T_MAX => max_udaf(),
2737 token::T_GROUP => grouping_udaf(),
2738 token::T_STDDEV => stddev_pop_udaf(),
2739 token::T_STDVAR => var_pop_udaf(),
2740 token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
2741 name: format!("{op:?}"),
2742 }
2743 .fail()?,
2744 _ => UnexpectedTokenSnafu { token: op }.fail()?,
2745 };
2746
2747 let exprs: Vec<DfExpr> = self
2749 .ctx
2750 .field_columns
2751 .iter()
2752 .map(|col| {
2753 non_col_args.push(DfExpr::Column(Column::from_name(col)));
2754 let expr = aggr.call(non_col_args.clone());
2755 non_col_args.pop();
2756 expr
2757 })
2758 .collect::<Vec<_>>();
2759
2760 let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
2762 let prev_field_exprs: Vec<_> = self
2763 .ctx
2764 .field_columns
2765 .iter()
2766 .map(|col| DfExpr::Column(Column::from_name(col)))
2767 .collect();
2768
2769 ensure!(
2770 self.ctx.field_columns.len() == 1,
2771 UnsupportedExprSnafu {
2772 name: "count_values on multi-value input"
2773 }
2774 );
2775
2776 prev_field_exprs
2777 } else {
2778 vec![]
2779 };
2780
2781 let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
2783
2784 let normalized_exprs =
2785 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2786 for expr in normalized_exprs {
2787 new_field_columns.push(expr.schema_name().to_string());
2788 }
2789 self.ctx.field_columns = new_field_columns;
2790
2791 Ok((exprs, prev_field_exprs))
2792 }
2793
2794 fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
2795 let param = param
2796 .as_deref()
2797 .with_context(|| FunctionInvalidArgumentSnafu {
2798 fn_name: op.to_string(),
2799 })?;
2800 let PromExpr::StringLiteral(StringLiteral { val }) = param else {
2801 return FunctionInvalidArgumentSnafu {
2802 fn_name: op.to_string(),
2803 }
2804 .fail();
2805 };
2806
2807 Ok(val)
2808 }
2809
2810 fn get_param_as_literal_expr(
2811 param: &Option<Box<PromExpr>>,
2812 op: Option<TokenType>,
2813 expected_type: Option<ArrowDataType>,
2814 ) -> Result<DfExpr> {
2815 let prom_param = param.as_deref().with_context(|| {
2816 if let Some(op) = op {
2817 FunctionInvalidArgumentSnafu {
2818 fn_name: op.to_string(),
2819 }
2820 } else {
2821 FunctionInvalidArgumentSnafu {
2822 fn_name: "unknown".to_string(),
2823 }
2824 }
2825 })?;
2826
2827 let expr = Self::try_build_literal_expr(prom_param).with_context(|| {
2828 if let Some(op) = op {
2829 FunctionInvalidArgumentSnafu {
2830 fn_name: op.to_string(),
2831 }
2832 } else {
2833 FunctionInvalidArgumentSnafu {
2834 fn_name: "unknown".to_string(),
2835 }
2836 }
2837 })?;
2838
2839 if let Some(expected_type) = expected_type {
2841 let expr_type = expr
2843 .get_type(&DFSchema::empty())
2844 .context(DataFusionPlanningSnafu)?;
2845 if expected_type != expr_type {
2846 return FunctionInvalidArgumentSnafu {
2847 fn_name: format!("expected {expected_type:?}, but found {expr_type:?}"),
2848 }
2849 .fail();
2850 }
2851 }
2852
2853 Ok(expr)
2854 }
2855
2856 fn create_window_exprs(
2859 &mut self,
2860 op: TokenType,
2861 group_exprs: Vec<DfExpr>,
2862 input_plan: &LogicalPlan,
2863 ) -> Result<Vec<DfExpr>> {
2864 ensure!(
2865 self.ctx.field_columns.len() == 1,
2866 UnsupportedExprSnafu {
2867 name: "topk or bottomk on multi-value input"
2868 }
2869 );
2870
2871 assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
2872
2873 let asc = matches!(op.id(), token::T_BOTTOMK);
2874
2875 let tag_sort_exprs = self
2876 .create_tag_column_exprs()?
2877 .into_iter()
2878 .map(|expr| expr.sort(asc, true));
2879
2880 let exprs: Vec<DfExpr> = self
2882 .ctx
2883 .field_columns
2884 .iter()
2885 .map(|col| {
2886 let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
2887 sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, true));
2889 sort_exprs.extend(tag_sort_exprs.clone());
2892
2893 DfExpr::WindowFunction(Box::new(WindowFunction {
2894 fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
2895 params: WindowFunctionParams {
2896 args: vec![],
2897 partition_by: group_exprs.clone(),
2898 order_by: sort_exprs,
2899 window_frame: WindowFrame::new(Some(true)),
2900 null_treatment: None,
2901 distinct: false,
2902 filter: None,
2903 },
2904 }))
2905 })
2906 .collect();
2907
2908 let normalized_exprs =
2909 normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
2910 Ok(normalized_exprs)
2911 }
2912
2913 #[deprecated(
2915 note = "use `Self::get_param_as_literal_expr` instead. This is only for `create_histogram_plan`"
2916 )]
2917 fn try_build_float_literal(expr: &PromExpr) -> Option<f64> {
2918 match expr {
2919 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(*val),
2920 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_float_literal(expr),
2921 PromExpr::Unary(UnaryExpr { expr, .. }) => {
2922 Self::try_build_float_literal(expr).map(|f| -f)
2923 }
2924 PromExpr::StringLiteral(_)
2925 | PromExpr::Binary(_)
2926 | PromExpr::VectorSelector(_)
2927 | PromExpr::MatrixSelector(_)
2928 | PromExpr::Call(_)
2929 | PromExpr::Extension(_)
2930 | PromExpr::Aggregate(_)
2931 | PromExpr::Subquery(_) => None,
2932 }
2933 }
2934
2935 async fn create_histogram_plan(
2937 &mut self,
2938 args: &PromFunctionArgs,
2939 query_engine_state: &QueryEngineState,
2940 ) -> Result<LogicalPlan> {
2941 if args.args.len() != 2 {
2942 return FunctionInvalidArgumentSnafu {
2943 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2944 }
2945 .fail();
2946 }
2947 #[allow(deprecated)]
2948 let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
2949 FunctionInvalidArgumentSnafu {
2950 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2951 }
2952 })?;
2953
2954 let input = args.args[1].as_ref().clone();
2955 let input_plan = self.prom_expr_to_plan(&input, query_engine_state).await?;
2956 let input_plan = self.strip_tsid_column(input_plan)?;
2960 self.ctx.use_tsid = false;
2961
2962 if !self.ctx.has_le_tag() {
2963 return Ok(LogicalPlan::EmptyRelation(
2966 datafusion::logical_expr::EmptyRelation {
2967 produce_one_row: false,
2968 schema: Arc::new(DFSchema::empty()),
2969 },
2970 ));
2971 }
2972 let time_index_column =
2973 self.ctx
2974 .time_index_column
2975 .clone()
2976 .with_context(|| TimeIndexNotFoundSnafu {
2977 table: self.ctx.table_name.clone().unwrap_or_default(),
2978 })?;
2979 let field_column = self
2981 .ctx
2982 .field_columns
2983 .first()
2984 .with_context(|| FunctionInvalidArgumentSnafu {
2985 fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
2986 })?
2987 .clone();
2988 self.ctx.tag_columns.retain(|col| col != LE_COLUMN_NAME);
2990
2991 Ok(LogicalPlan::Extension(Extension {
2992 node: Arc::new(
2993 HistogramFold::new(
2994 LE_COLUMN_NAME.to_string(),
2995 field_column,
2996 time_index_column,
2997 phi,
2998 input_plan,
2999 )
3000 .context(DataFusionPlanningSnafu)?,
3001 ),
3002 }))
3003 }
3004
3005 async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
3007 if args.args.len() != 1 {
3008 return FunctionInvalidArgumentSnafu {
3009 fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
3010 }
3011 .fail();
3012 }
3013 let lit = Self::get_param_as_literal_expr(&Some(args.args[0].clone()), None, None)?;
3014
3015 self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
3017 self.ctx.reset_table_name_and_schema();
3018 self.ctx.tag_columns = vec![];
3019 self.ctx.field_columns = vec![greptime_value().to_string()];
3020 Ok(LogicalPlan::Extension(Extension {
3021 node: Arc::new(
3022 EmptyMetric::new(
3023 self.ctx.start,
3024 self.ctx.end,
3025 self.ctx.interval,
3026 SPECIAL_TIME_FUNCTION.to_string(),
3027 greptime_value().to_string(),
3028 Some(lit),
3029 )
3030 .context(DataFusionPlanningSnafu)?,
3031 ),
3032 }))
3033 }
3034
3035 async fn create_scalar_plan(
3037 &mut self,
3038 args: &PromFunctionArgs,
3039 query_engine_state: &QueryEngineState,
3040 ) -> Result<LogicalPlan> {
3041 ensure!(
3042 args.len() == 1,
3043 FunctionInvalidArgumentSnafu {
3044 fn_name: SCALAR_FUNCTION
3045 }
3046 );
3047 let input = self
3048 .prom_expr_to_plan(&args.args[0], query_engine_state)
3049 .await?;
3050 ensure!(
3051 self.ctx.field_columns.len() == 1,
3052 MultiFieldsNotSupportedSnafu {
3053 operator: SCALAR_FUNCTION
3054 },
3055 );
3056 let scalar_plan = LogicalPlan::Extension(Extension {
3057 node: Arc::new(
3058 ScalarCalculate::new(
3059 self.ctx.start,
3060 self.ctx.end,
3061 self.ctx.interval,
3062 input,
3063 self.ctx.time_index_column.as_ref().unwrap(),
3064 &self.ctx.tag_columns,
3065 &self.ctx.field_columns[0],
3066 self.ctx.table_name.as_deref(),
3067 )
3068 .context(PromqlPlanNodeSnafu)?,
3069 ),
3070 });
3071 self.ctx.tag_columns.clear();
3073 self.ctx.field_columns.clear();
3074 self.ctx
3075 .field_columns
3076 .push(scalar_plan.schema().field(1).name().clone());
3077 Ok(scalar_plan)
3078 }
3079
3080 async fn create_absent_plan(
3082 &mut self,
3083 args: &PromFunctionArgs,
3084 query_engine_state: &QueryEngineState,
3085 ) -> Result<LogicalPlan> {
3086 if args.args.len() != 1 {
3087 return FunctionInvalidArgumentSnafu {
3088 fn_name: SPECIAL_ABSENT_FUNCTION.to_string(),
3089 }
3090 .fail();
3091 }
3092 let input = self
3093 .prom_expr_to_plan(&args.args[0], query_engine_state)
3094 .await?;
3095
3096 let time_index_expr = self.create_time_index_column_expr()?;
3097 let first_field_expr =
3098 self.create_field_column_exprs()?
3099 .pop()
3100 .with_context(|| ValueNotFoundSnafu {
3101 table: self.ctx.table_name.clone().unwrap_or_default(),
3102 })?;
3103 let first_value_expr = first_value(first_field_expr, vec![]);
3104
3105 let ordered_aggregated_input = LogicalPlanBuilder::from(input)
3106 .aggregate(
3107 vec![time_index_expr.clone()],
3108 vec![first_value_expr.clone()],
3109 )
3110 .context(DataFusionPlanningSnafu)?
3111 .sort(vec![time_index_expr.sort(true, false)])
3112 .context(DataFusionPlanningSnafu)?
3113 .build()
3114 .context(DataFusionPlanningSnafu)?;
3115
3116 let fake_labels = self
3117 .ctx
3118 .selector_matcher
3119 .iter()
3120 .filter_map(|matcher| match matcher.op {
3121 MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())),
3122 _ => None,
3123 })
3124 .collect::<Vec<_>>();
3125
3126 let absent_plan = LogicalPlan::Extension(Extension {
3128 node: Arc::new(
3129 Absent::try_new(
3130 self.ctx.start,
3131 self.ctx.end,
3132 self.ctx.interval,
3133 self.ctx.time_index_column.as_ref().unwrap().clone(),
3134 self.ctx.field_columns[0].clone(),
3135 fake_labels,
3136 ordered_aggregated_input,
3137 )
3138 .context(DataFusionPlanningSnafu)?,
3139 ),
3140 });
3141
3142 Ok(absent_plan)
3143 }
3144
3145 fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
3148 match expr {
3149 PromExpr::NumberLiteral(NumberLiteral { val }) => Some(val.lit()),
3150 PromExpr::StringLiteral(StringLiteral { val }) => Some(val.lit()),
3151 PromExpr::VectorSelector(_)
3152 | PromExpr::MatrixSelector(_)
3153 | PromExpr::Extension(_)
3154 | PromExpr::Aggregate(_)
3155 | PromExpr::Subquery(_) => None,
3156 PromExpr::Call(Call { func, .. }) => {
3157 if func.name == SPECIAL_TIME_FUNCTION {
3158 None
3161 } else {
3162 None
3163 }
3164 }
3165 PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
3166 PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
3168 PromExpr::Binary(PromBinaryExpr {
3169 lhs,
3170 rhs,
3171 op,
3172 modifier,
3173 }) => {
3174 let lhs = Self::try_build_literal_expr(lhs)?;
3175 let rhs = Self::try_build_literal_expr(rhs)?;
3176 let is_comparison_op = Self::is_token_a_comparison_op(*op);
3177 let expr_builder = Self::prom_token_to_binary_expr_builder(*op).ok()?;
3178 let expr = expr_builder(lhs, rhs).ok()?;
3179
3180 let should_return_bool = if let Some(m) = modifier {
3181 m.return_bool
3182 } else {
3183 false
3184 };
3185 if is_comparison_op && should_return_bool {
3186 Some(DfExpr::Cast(Cast {
3187 expr: Box::new(expr),
3188 data_type: ArrowDataType::Float64,
3189 }))
3190 } else {
3191 Some(expr)
3192 }
3193 }
3194 }
3195 }
3196
3197 fn try_build_special_time_expr_with_context(&self, expr: &PromExpr) -> Option<DfExpr> {
3198 match expr {
3199 PromExpr::Call(Call { func, .. }) => {
3200 if func.name == SPECIAL_TIME_FUNCTION
3201 && let Some(time_index_col) = self.ctx.time_index_column.as_ref()
3202 {
3203 Some(build_special_time_expr(time_index_col))
3204 } else {
3205 None
3206 }
3207 }
3208 _ => None,
3209 }
3210 }
3211
3212 #[allow(clippy::type_complexity)]
3215 fn prom_token_to_binary_expr_builder(
3216 token: TokenType,
3217 ) -> Result<Box<dyn Fn(DfExpr, DfExpr) -> Result<DfExpr>>> {
3218 match token.id() {
3219 token::T_ADD => Ok(Box::new(|lhs, rhs| Ok(lhs + rhs))),
3220 token::T_SUB => Ok(Box::new(|lhs, rhs| Ok(lhs - rhs))),
3221 token::T_MUL => Ok(Box::new(|lhs, rhs| Ok(lhs * rhs))),
3222 token::T_DIV => Ok(Box::new(|lhs, rhs| Ok(lhs / rhs))),
3223 token::T_MOD => Ok(Box::new(|lhs: DfExpr, rhs| Ok(lhs % rhs))),
3224 token::T_EQLC => Ok(Box::new(|lhs, rhs| Ok(lhs.eq(rhs)))),
3225 token::T_NEQ => Ok(Box::new(|lhs, rhs| Ok(lhs.not_eq(rhs)))),
3226 token::T_GTR => Ok(Box::new(|lhs, rhs| Ok(lhs.gt(rhs)))),
3227 token::T_LSS => Ok(Box::new(|lhs, rhs| Ok(lhs.lt(rhs)))),
3228 token::T_GTE => Ok(Box::new(|lhs, rhs| Ok(lhs.gt_eq(rhs)))),
3229 token::T_LTE => Ok(Box::new(|lhs, rhs| Ok(lhs.lt_eq(rhs)))),
3230 token::T_POW => Ok(Box::new(|lhs, rhs| {
3231 Ok(DfExpr::ScalarFunction(ScalarFunction {
3232 func: datafusion_functions::math::power(),
3233 args: vec![lhs, rhs],
3234 }))
3235 })),
3236 token::T_ATAN2 => Ok(Box::new(|lhs, rhs| {
3237 Ok(DfExpr::ScalarFunction(ScalarFunction {
3238 func: datafusion_functions::math::atan2(),
3239 args: vec![lhs, rhs],
3240 }))
3241 })),
3242 _ => UnexpectedTokenSnafu { token }.fail(),
3243 }
3244 }
3245
3246 fn is_token_a_comparison_op(token: TokenType) -> bool {
3248 matches!(
3249 token.id(),
3250 token::T_EQLC
3251 | token::T_NEQ
3252 | token::T_GTR
3253 | token::T_LSS
3254 | token::T_GTE
3255 | token::T_LTE
3256 )
3257 }
3258
3259 fn is_token_a_set_op(token: TokenType) -> bool {
3261 matches!(
3262 token.id(),
3263 token::T_LAND | token::T_LOR | token::T_LUNLESS )
3267 }
3268
3269 #[allow(clippy::too_many_arguments)]
3272 fn join_on_non_field_columns(
3273 &self,
3274 left: LogicalPlan,
3275 right: LogicalPlan,
3276 left_table_ref: TableReference,
3277 right_table_ref: TableReference,
3278 left_time_index_column: Option<String>,
3279 right_time_index_column: Option<String>,
3280 only_join_time_index: bool,
3281 modifier: &Option<BinModifier>,
3282 ) -> Result<LogicalPlan> {
3283 let mut left_tag_columns = if only_join_time_index {
3284 BTreeSet::new()
3285 } else {
3286 self.ctx
3287 .tag_columns
3288 .iter()
3289 .cloned()
3290 .collect::<BTreeSet<_>>()
3291 };
3292 let mut right_tag_columns = left_tag_columns.clone();
3293
3294 if let Some(modifier) = modifier {
3296 if let Some(matching) = &modifier.matching {
3298 match matching {
3299 LabelModifier::Include(on) => {
3301 let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
3302 left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
3303 right_tag_columns =
3304 right_tag_columns.intersection(&mask).cloned().collect();
3305 }
3306 LabelModifier::Exclude(ignoring) => {
3308 for label in &ignoring.labels {
3310 let _ = left_tag_columns.remove(label);
3311 let _ = right_tag_columns.remove(label);
3312 }
3313 }
3314 }
3315 }
3316 }
3317
3318 if let (Some(left_time_index_column), Some(right_time_index_column)) =
3320 (left_time_index_column, right_time_index_column)
3321 {
3322 left_tag_columns.insert(left_time_index_column);
3323 right_tag_columns.insert(right_time_index_column);
3324 }
3325
3326 let right = LogicalPlanBuilder::from(right)
3327 .alias(right_table_ref)
3328 .context(DataFusionPlanningSnafu)?
3329 .build()
3330 .context(DataFusionPlanningSnafu)?;
3331
3332 LogicalPlanBuilder::from(left)
3334 .alias(left_table_ref)
3335 .context(DataFusionPlanningSnafu)?
3336 .join_detailed(
3337 right,
3338 JoinType::Inner,
3339 (
3340 left_tag_columns
3341 .into_iter()
3342 .map(Column::from_name)
3343 .collect::<Vec<_>>(),
3344 right_tag_columns
3345 .into_iter()
3346 .map(Column::from_name)
3347 .collect::<Vec<_>>(),
3348 ),
3349 None,
3350 NullEquality::NullEqualsNull,
3351 )
3352 .context(DataFusionPlanningSnafu)?
3353 .build()
3354 .context(DataFusionPlanningSnafu)
3355 }
3356
3357 fn set_op_on_non_field_columns(
3359 &mut self,
3360 left: LogicalPlan,
3361 mut right: LogicalPlan,
3362 left_context: PromPlannerContext,
3363 right_context: PromPlannerContext,
3364 op: TokenType,
3365 modifier: &Option<BinModifier>,
3366 ) -> Result<LogicalPlan> {
3367 let mut left_tag_col_set = left_context
3368 .tag_columns
3369 .iter()
3370 .cloned()
3371 .collect::<HashSet<_>>();
3372 let mut right_tag_col_set = right_context
3373 .tag_columns
3374 .iter()
3375 .cloned()
3376 .collect::<HashSet<_>>();
3377
3378 if matches!(op.id(), token::T_LOR) {
3379 return self.or_operator(
3380 left,
3381 right,
3382 left_tag_col_set,
3383 right_tag_col_set,
3384 left_context,
3385 right_context,
3386 modifier,
3387 );
3388 }
3389
3390 if let Some(modifier) = modifier {
3392 ensure!(
3394 matches!(
3395 modifier.card,
3396 VectorMatchCardinality::OneToOne | VectorMatchCardinality::ManyToMany
3397 ),
3398 UnsupportedVectorMatchSnafu {
3399 name: modifier.card.clone(),
3400 },
3401 );
3402 if let Some(matching) = &modifier.matching {
3404 match matching {
3405 LabelModifier::Include(on) => {
3407 let mask = on.labels.iter().cloned().collect::<HashSet<_>>();
3408 left_tag_col_set = left_tag_col_set.intersection(&mask).cloned().collect();
3409 right_tag_col_set =
3410 right_tag_col_set.intersection(&mask).cloned().collect();
3411 }
3412 LabelModifier::Exclude(ignoring) => {
3414 for label in &ignoring.labels {
3416 let _ = left_tag_col_set.remove(label);
3417 let _ = right_tag_col_set.remove(label);
3418 }
3419 }
3420 }
3421 }
3422 }
3423 if !matches!(op.id(), token::T_LOR) {
3425 ensure!(
3426 left_tag_col_set == right_tag_col_set,
3427 CombineTableColumnMismatchSnafu {
3428 left: left_tag_col_set.into_iter().collect::<Vec<_>>(),
3429 right: right_tag_col_set.into_iter().collect::<Vec<_>>(),
3430 }
3431 )
3432 };
3433 let left_time_index = left_context.time_index_column.clone().unwrap();
3434 let right_time_index = right_context.time_index_column.clone().unwrap();
3435 let join_keys = left_tag_col_set
3436 .iter()
3437 .cloned()
3438 .chain([left_time_index.clone()])
3439 .collect::<Vec<_>>();
3440 self.ctx.time_index_column = Some(left_time_index.clone());
3441 self.ctx.use_tsid = left_context.use_tsid;
3442
3443 if left_context.time_index_column != right_context.time_index_column {
3445 let right_project_exprs = right
3446 .schema()
3447 .fields()
3448 .iter()
3449 .map(|field| {
3450 if field.name() == &right_time_index {
3451 DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
3452 } else {
3453 DfExpr::Column(Column::from_name(field.name()))
3454 }
3455 })
3456 .collect::<Vec<_>>();
3457
3458 right = LogicalPlanBuilder::from(right)
3459 .project(right_project_exprs)
3460 .context(DataFusionPlanningSnafu)?
3461 .build()
3462 .context(DataFusionPlanningSnafu)?;
3463 }
3464
3465 ensure!(
3466 left_context.field_columns.len() == 1,
3467 MultiFieldsNotSupportedSnafu {
3468 operator: "AND operator"
3469 }
3470 );
3471 let left_field_col = left_context.field_columns.first().unwrap();
3474 self.ctx.field_columns = vec![left_field_col.clone()];
3475
3476 match op.id() {
3479 token::T_LAND => LogicalPlanBuilder::from(left)
3480 .distinct()
3481 .context(DataFusionPlanningSnafu)?
3482 .join_detailed(
3483 right,
3484 JoinType::LeftSemi,
3485 (join_keys.clone(), join_keys),
3486 None,
3487 NullEquality::NullEqualsNull,
3488 )
3489 .context(DataFusionPlanningSnafu)?
3490 .build()
3491 .context(DataFusionPlanningSnafu),
3492 token::T_LUNLESS => LogicalPlanBuilder::from(left)
3493 .distinct()
3494 .context(DataFusionPlanningSnafu)?
3495 .join_detailed(
3496 right,
3497 JoinType::LeftAnti,
3498 (join_keys.clone(), join_keys),
3499 None,
3500 NullEquality::NullEqualsNull,
3501 )
3502 .context(DataFusionPlanningSnafu)?
3503 .build()
3504 .context(DataFusionPlanningSnafu),
3505 token::T_LOR => {
3506 unreachable!()
3509 }
3510 _ => UnexpectedTokenSnafu { token: op }.fail(),
3511 }
3512 }
3513
3514 #[allow(clippy::too_many_arguments)]
3516 fn or_operator(
3517 &mut self,
3518 left: LogicalPlan,
3519 right: LogicalPlan,
3520 left_tag_cols_set: HashSet<String>,
3521 right_tag_cols_set: HashSet<String>,
3522 left_context: PromPlannerContext,
3523 right_context: PromPlannerContext,
3524 modifier: &Option<BinModifier>,
3525 ) -> Result<LogicalPlan> {
3526 ensure!(
3528 left_context.field_columns.len() == right_context.field_columns.len(),
3529 CombineTableColumnMismatchSnafu {
3530 left: left_context.field_columns.clone(),
3531 right: right_context.field_columns.clone()
3532 }
3533 );
3534 ensure!(
3535 left_context.field_columns.len() == 1,
3536 MultiFieldsNotSupportedSnafu {
3537 operator: "OR operator"
3538 }
3539 );
3540
3541 let all_tags = left_tag_cols_set
3543 .union(&right_tag_cols_set)
3544 .cloned()
3545 .collect::<HashSet<_>>();
3546 let tags_not_in_left = all_tags
3547 .difference(&left_tag_cols_set)
3548 .cloned()
3549 .collect::<Vec<_>>();
3550 let tags_not_in_right = all_tags
3551 .difference(&right_tag_cols_set)
3552 .cloned()
3553 .collect::<Vec<_>>();
3554 let left_qualifier = left.schema().qualified_field(0).0.cloned();
3555 let right_qualifier = right.schema().qualified_field(0).0.cloned();
3556 let left_qualifier_string = left_qualifier
3557 .as_ref()
3558 .map(|l| l.to_string())
3559 .unwrap_or_default();
3560 let right_qualifier_string = right_qualifier
3561 .as_ref()
3562 .map(|r| r.to_string())
3563 .unwrap_or_default();
3564 let left_time_index_column =
3565 left_context
3566 .time_index_column
3567 .clone()
3568 .with_context(|| TimeIndexNotFoundSnafu {
3569 table: left_qualifier_string.clone(),
3570 })?;
3571 let right_time_index_column =
3572 right_context
3573 .time_index_column
3574 .clone()
3575 .with_context(|| TimeIndexNotFoundSnafu {
3576 table: right_qualifier_string.clone(),
3577 })?;
3578 let left_field_col = left_context.field_columns.first().unwrap();
3580 let right_field_col = right_context.field_columns.first().unwrap();
3581 let left_has_tsid = left
3582 .schema()
3583 .fields()
3584 .iter()
3585 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3586 let right_has_tsid = right
3587 .schema()
3588 .fields()
3589 .iter()
3590 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME);
3591
3592 let mut all_columns_set = left
3594 .schema()
3595 .fields()
3596 .iter()
3597 .chain(right.schema().fields().iter())
3598 .map(|field| field.name().clone())
3599 .collect::<HashSet<_>>();
3600 if !(left_has_tsid && right_has_tsid) {
3603 all_columns_set.remove(DATA_SCHEMA_TSID_COLUMN_NAME);
3604 }
3605 all_columns_set.remove(&left_time_index_column);
3607 all_columns_set.remove(&right_time_index_column);
3608 if left_field_col != right_field_col {
3610 all_columns_set.remove(right_field_col);
3611 }
3612 let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
3613 all_columns.sort_unstable();
3615 all_columns.insert(0, left_time_index_column.clone());
3617
3618 let left_proj_exprs = all_columns.iter().map(|col| {
3620 if tags_not_in_left.contains(col) {
3621 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3622 } else {
3623 DfExpr::Column(Column::new(None::<String>, col))
3624 }
3625 });
3626 let right_time_index_expr = DfExpr::Column(Column::new(
3627 right_qualifier.clone(),
3628 right_time_index_column,
3629 ))
3630 .alias(left_time_index_column.clone());
3631 let right_qualifier_for_field = right
3634 .schema()
3635 .iter()
3636 .find(|(_, f)| f.name() == right_field_col)
3637 .map(|(q, _)| q)
3638 .with_context(|| ColumnNotFoundSnafu {
3639 col: right_field_col.clone(),
3640 })?
3641 .cloned();
3642
3643 let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
3645 if col == left_field_col && left_field_col != right_field_col {
3647 DfExpr::Column(Column::new(
3649 right_qualifier_for_field.clone(),
3650 right_field_col,
3651 ))
3652 } else if tags_not_in_right.contains(col) {
3653 DfExpr::Literal(ScalarValue::Utf8(None), None).alias(col.clone())
3654 } else {
3655 DfExpr::Column(Column::new(None::<String>, col))
3656 }
3657 });
3658 let right_proj_exprs = [right_time_index_expr]
3659 .into_iter()
3660 .chain(right_proj_exprs_without_time_index);
3661
3662 let left_projected = LogicalPlanBuilder::from(left)
3663 .project(left_proj_exprs)
3664 .context(DataFusionPlanningSnafu)?
3665 .alias(left_qualifier_string.clone())
3666 .context(DataFusionPlanningSnafu)?
3667 .build()
3668 .context(DataFusionPlanningSnafu)?;
3669 let right_projected = LogicalPlanBuilder::from(right)
3670 .project(right_proj_exprs)
3671 .context(DataFusionPlanningSnafu)?
3672 .alias(right_qualifier_string.clone())
3673 .context(DataFusionPlanningSnafu)?
3674 .build()
3675 .context(DataFusionPlanningSnafu)?;
3676
3677 let mut match_columns = if let Some(modifier) = modifier
3679 && let Some(matching) = &modifier.matching
3680 {
3681 match matching {
3682 LabelModifier::Include(on) => on.labels.clone(),
3684 LabelModifier::Exclude(ignoring) => {
3686 let ignoring = ignoring.labels.iter().cloned().collect::<HashSet<_>>();
3687 all_tags.difference(&ignoring).cloned().collect()
3688 }
3689 }
3690 } else {
3691 all_tags.iter().cloned().collect()
3692 };
3693 match_columns.sort_unstable();
3695 let schema = left_projected.schema().clone();
3697 let union_distinct_on = UnionDistinctOn::new(
3698 left_projected,
3699 right_projected,
3700 match_columns,
3701 left_time_index_column.clone(),
3702 schema,
3703 );
3704 let result = LogicalPlan::Extension(Extension {
3705 node: Arc::new(union_distinct_on),
3706 });
3707
3708 self.ctx.time_index_column = Some(left_time_index_column);
3710 self.ctx.tag_columns = all_tags.into_iter().collect();
3711 self.ctx.field_columns = vec![left_field_col.clone()];
3712 self.ctx.use_tsid = left_has_tsid && right_has_tsid;
3713
3714 Ok(result)
3715 }
3716
3717 fn projection_for_each_field_column<F>(
3725 &mut self,
3726 input: LogicalPlan,
3727 name_to_expr: F,
3728 ) -> Result<LogicalPlan>
3729 where
3730 F: FnMut(&String) -> Result<DfExpr>,
3731 {
3732 let non_field_columns_iter = self
3733 .ctx
3734 .tag_columns
3735 .iter()
3736 .chain(self.ctx.time_index_column.iter())
3737 .map(|col| {
3738 Ok(DfExpr::Column(Column::new(
3739 self.ctx.table_name.clone().map(TableReference::bare),
3740 col,
3741 )))
3742 });
3743
3744 let result_field_columns = self
3746 .ctx
3747 .field_columns
3748 .iter()
3749 .map(name_to_expr)
3750 .collect::<Result<Vec<_>>>()?;
3751
3752 self.ctx.field_columns = result_field_columns
3754 .iter()
3755 .map(|expr| expr.schema_name().to_string())
3756 .collect();
3757 let field_columns_iter = result_field_columns
3758 .into_iter()
3759 .zip(self.ctx.field_columns.iter())
3760 .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
3761
3762 let project_fields = non_field_columns_iter
3764 .chain(field_columns_iter)
3765 .collect::<Result<Vec<_>>>()?;
3766
3767 LogicalPlanBuilder::from(input)
3768 .project(project_fields)
3769 .context(DataFusionPlanningSnafu)?
3770 .build()
3771 .context(DataFusionPlanningSnafu)
3772 }
3773
3774 fn filter_on_field_column<F>(
3777 &self,
3778 input: LogicalPlan,
3779 mut name_to_expr: F,
3780 ) -> Result<LogicalPlan>
3781 where
3782 F: FnMut(&String) -> Result<DfExpr>,
3783 {
3784 ensure!(
3785 self.ctx.field_columns.len() == 1,
3786 UnsupportedExprSnafu {
3787 name: "filter on multi-value input"
3788 }
3789 );
3790
3791 let field_column_filter = name_to_expr(&self.ctx.field_columns[0])?;
3792
3793 LogicalPlanBuilder::from(input)
3794 .filter(field_column_filter)
3795 .context(DataFusionPlanningSnafu)?
3796 .build()
3797 .context(DataFusionPlanningSnafu)
3798 }
3799
3800 fn date_part_on_time_index(&self, date_part: &str) -> Result<DfExpr> {
3803 let input_expr = datafusion::logical_expr::col(
3804 self.ctx
3805 .time_index_column
3806 .as_ref()
3807 .with_context(|| TimeIndexNotFoundSnafu {
3809 table: "<doesn't matter>",
3810 })?,
3811 );
3812 let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
3813 func: datafusion_functions::datetime::date_part(),
3814 args: vec![date_part.lit(), input_expr],
3815 });
3816 Ok(fn_expr)
3817 }
3818
3819 fn strip_tsid_column(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
3820 let schema = plan.schema();
3821 if !schema
3822 .fields()
3823 .iter()
3824 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
3825 {
3826 return Ok(plan);
3827 }
3828
3829 let project_exprs = schema
3830 .fields()
3831 .iter()
3832 .filter(|field| field.name() != DATA_SCHEMA_TSID_COLUMN_NAME)
3833 .map(|field| Ok(DfExpr::Column(Column::from_name(field.name().clone()))))
3834 .collect::<Result<Vec<_>>>()?;
3835
3836 LogicalPlanBuilder::from(plan)
3837 .project(project_exprs)
3838 .context(DataFusionPlanningSnafu)?
3839 .build()
3840 .context(DataFusionPlanningSnafu)
3841 }
3842
3843 fn apply_alias_projection(
3845 &mut self,
3846 plan: LogicalPlan,
3847 alias_name: String,
3848 ) -> Result<LogicalPlan> {
3849 let fields_expr = self.create_field_column_exprs()?;
3850
3851 ensure!(
3853 fields_expr.len() == 1,
3854 UnsupportedExprSnafu {
3855 name: "alias on multi-value result"
3856 }
3857 );
3858
3859 let project_fields = fields_expr
3860 .into_iter()
3861 .map(|expr| expr.alias(&alias_name))
3862 .chain(self.create_tag_column_exprs()?)
3863 .chain(Some(self.create_time_index_column_expr()?));
3864
3865 LogicalPlanBuilder::from(plan)
3866 .project(project_fields)
3867 .context(DataFusionPlanningSnafu)?
3868 .build()
3869 .context(DataFusionPlanningSnafu)
3870 }
3871}
3872
3873#[derive(Default, Debug)]
3874struct FunctionArgs {
3875 input: Option<PromExpr>,
3876 literals: Vec<DfExpr>,
3877}
3878
3879#[derive(Debug, Clone)]
3882enum ScalarFunc {
3883 DataFusionBuiltin(Arc<ScalarUdfDef>),
3887 DataFusionUdf(Arc<ScalarUdfDef>),
3891 Udf(Arc<ScalarUdfDef>),
3896 ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
3903 GeneratedExpr,
3907}
3908
3909#[cfg(test)]
3910mod test {
3911 use std::time::{Duration, UNIX_EPOCH};
3912
3913 use catalog::RegisterTableRequest;
3914 use catalog::memory::{MemoryCatalogManager, new_memory_catalog_manager};
3915 use common_base::Plugins;
3916 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
3917 use common_query::prelude::greptime_timestamp;
3918 use common_query::test_util::DummyDecoder;
3919 use datatypes::prelude::ConcreteDataType;
3920 use datatypes::schema::{ColumnSchema, Schema};
3921 use promql_parser::label::Labels;
3922 use promql_parser::parser;
3923 use session::context::QueryContext;
3924 use table::metadata::{TableInfoBuilder, TableMetaBuilder};
3925 use table::test_util::EmptyTable;
3926
3927 use super::*;
3928 use crate::options::QueryOptions;
3929
3930 fn build_query_engine_state() -> QueryEngineState {
3931 QueryEngineState::new(
3932 new_memory_catalog_manager().unwrap(),
3933 None,
3934 None,
3935 None,
3936 None,
3937 None,
3938 false,
3939 Plugins::default(),
3940 QueryOptions::default(),
3941 )
3942 }
3943
3944 async fn build_test_table_provider(
3945 table_name_tuples: &[(String, String)],
3946 num_tag: usize,
3947 num_field: usize,
3948 ) -> DfTableSourceProvider {
3949 let catalog_list = MemoryCatalogManager::with_default_setup();
3950 for (schema_name, table_name) in table_name_tuples {
3951 let mut columns = vec![];
3952 for i in 0..num_tag {
3953 columns.push(ColumnSchema::new(
3954 format!("tag_{i}"),
3955 ConcreteDataType::string_datatype(),
3956 false,
3957 ));
3958 }
3959 columns.push(
3960 ColumnSchema::new(
3961 "timestamp".to_string(),
3962 ConcreteDataType::timestamp_millisecond_datatype(),
3963 false,
3964 )
3965 .with_time_index(true),
3966 );
3967 for i in 0..num_field {
3968 columns.push(ColumnSchema::new(
3969 format!("field_{i}"),
3970 ConcreteDataType::float64_datatype(),
3971 true,
3972 ));
3973 }
3974 let schema = Arc::new(Schema::new(columns));
3975 let table_meta = TableMetaBuilder::empty()
3976 .schema(schema)
3977 .primary_key_indices((0..num_tag).collect())
3978 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
3979 .next_column_id(1024)
3980 .build()
3981 .unwrap();
3982 let table_info = TableInfoBuilder::default()
3983 .name(table_name.clone())
3984 .meta(table_meta)
3985 .build()
3986 .unwrap();
3987 let table = EmptyTable::from_table_info(&table_info);
3988
3989 assert!(
3990 catalog_list
3991 .register_table_sync(RegisterTableRequest {
3992 catalog: DEFAULT_CATALOG_NAME.to_string(),
3993 schema: schema_name.clone(),
3994 table_name: table_name.clone(),
3995 table_id: 1024,
3996 table,
3997 })
3998 .is_ok()
3999 );
4000 }
4001
4002 DfTableSourceProvider::new(
4003 catalog_list,
4004 false,
4005 QueryContext::arc(),
4006 DummyDecoder::arc(),
4007 false,
4008 )
4009 }
4010
4011 async fn build_test_table_provider_with_tsid(
4012 table_name_tuples: &[(String, String)],
4013 num_tag: usize,
4014 num_field: usize,
4015 ) -> DfTableSourceProvider {
4016 let catalog_list = MemoryCatalogManager::with_default_setup();
4017
4018 let physical_table_name = "phy";
4019 let physical_table_id = 999u32;
4020
4021 {
4023 let mut columns = vec![
4024 ColumnSchema::new(
4025 DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
4026 ConcreteDataType::uint32_datatype(),
4027 false,
4028 ),
4029 ColumnSchema::new(
4030 DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
4031 ConcreteDataType::uint64_datatype(),
4032 false,
4033 ),
4034 ];
4035 for i in 0..num_tag {
4036 columns.push(ColumnSchema::new(
4037 format!("tag_{i}"),
4038 ConcreteDataType::string_datatype(),
4039 false,
4040 ));
4041 }
4042 columns.push(
4043 ColumnSchema::new(
4044 "timestamp".to_string(),
4045 ConcreteDataType::timestamp_millisecond_datatype(),
4046 false,
4047 )
4048 .with_time_index(true),
4049 );
4050 for i in 0..num_field {
4051 columns.push(ColumnSchema::new(
4052 format!("field_{i}"),
4053 ConcreteDataType::float64_datatype(),
4054 true,
4055 ));
4056 }
4057
4058 let schema = Arc::new(Schema::new(columns));
4059 let primary_key_indices = (0..(2 + num_tag)).collect::<Vec<_>>();
4060 let table_meta = TableMetaBuilder::empty()
4061 .schema(schema)
4062 .primary_key_indices(primary_key_indices)
4063 .value_indices((2 + num_tag..2 + num_tag + 1 + num_field).collect())
4064 .engine(METRIC_ENGINE_NAME.to_string())
4065 .next_column_id(1024)
4066 .build()
4067 .unwrap();
4068 let table_info = TableInfoBuilder::default()
4069 .table_id(physical_table_id)
4070 .name(physical_table_name)
4071 .meta(table_meta)
4072 .build()
4073 .unwrap();
4074 let table = EmptyTable::from_table_info(&table_info);
4075
4076 assert!(
4077 catalog_list
4078 .register_table_sync(RegisterTableRequest {
4079 catalog: DEFAULT_CATALOG_NAME.to_string(),
4080 schema: DEFAULT_SCHEMA_NAME.to_string(),
4081 table_name: physical_table_name.to_string(),
4082 table_id: physical_table_id,
4083 table,
4084 })
4085 .is_ok()
4086 );
4087 }
4088
4089 for (idx, (schema_name, table_name)) in table_name_tuples.iter().enumerate() {
4091 let mut columns = vec![];
4092 for i in 0..num_tag {
4093 columns.push(ColumnSchema::new(
4094 format!("tag_{i}"),
4095 ConcreteDataType::string_datatype(),
4096 false,
4097 ));
4098 }
4099 columns.push(
4100 ColumnSchema::new(
4101 "timestamp".to_string(),
4102 ConcreteDataType::timestamp_millisecond_datatype(),
4103 false,
4104 )
4105 .with_time_index(true),
4106 );
4107 for i in 0..num_field {
4108 columns.push(ColumnSchema::new(
4109 format!("field_{i}"),
4110 ConcreteDataType::float64_datatype(),
4111 true,
4112 ));
4113 }
4114
4115 let schema = Arc::new(Schema::new(columns));
4116 let mut options = table::requests::TableOptions::default();
4117 options.extra_options.insert(
4118 LOGICAL_TABLE_METADATA_KEY.to_string(),
4119 physical_table_name.to_string(),
4120 );
4121 let table_id = 1024u32 + idx as u32;
4122 let table_meta = TableMetaBuilder::empty()
4123 .schema(schema)
4124 .primary_key_indices((0..num_tag).collect())
4125 .value_indices((num_tag + 1..num_tag + 1 + num_field).collect())
4126 .engine(METRIC_ENGINE_NAME.to_string())
4127 .options(options)
4128 .next_column_id(1024)
4129 .build()
4130 .unwrap();
4131 let table_info = TableInfoBuilder::default()
4132 .table_id(table_id)
4133 .name(table_name.clone())
4134 .meta(table_meta)
4135 .build()
4136 .unwrap();
4137 let table = EmptyTable::from_table_info(&table_info);
4138
4139 assert!(
4140 catalog_list
4141 .register_table_sync(RegisterTableRequest {
4142 catalog: DEFAULT_CATALOG_NAME.to_string(),
4143 schema: schema_name.clone(),
4144 table_name: table_name.clone(),
4145 table_id,
4146 table,
4147 })
4148 .is_ok()
4149 );
4150 }
4151
4152 DfTableSourceProvider::new(
4153 catalog_list,
4154 false,
4155 QueryContext::arc(),
4156 DummyDecoder::arc(),
4157 false,
4158 )
4159 }
4160
4161 async fn build_test_table_provider_with_fields(
4162 table_name_tuples: &[(String, String)],
4163 tags: &[&str],
4164 ) -> DfTableSourceProvider {
4165 let catalog_list = MemoryCatalogManager::with_default_setup();
4166 for (schema_name, table_name) in table_name_tuples {
4167 let mut columns = vec![];
4168 let num_tag = tags.len();
4169 for tag in tags {
4170 columns.push(ColumnSchema::new(
4171 tag.to_string(),
4172 ConcreteDataType::string_datatype(),
4173 false,
4174 ));
4175 }
4176 columns.push(
4177 ColumnSchema::new(
4178 greptime_timestamp().to_string(),
4179 ConcreteDataType::timestamp_millisecond_datatype(),
4180 false,
4181 )
4182 .with_time_index(true),
4183 );
4184 columns.push(ColumnSchema::new(
4185 greptime_value().to_string(),
4186 ConcreteDataType::float64_datatype(),
4187 true,
4188 ));
4189 let schema = Arc::new(Schema::new(columns));
4190 let table_meta = TableMetaBuilder::empty()
4191 .schema(schema)
4192 .primary_key_indices((0..num_tag).collect())
4193 .next_column_id(1024)
4194 .build()
4195 .unwrap();
4196 let table_info = TableInfoBuilder::default()
4197 .name(table_name.clone())
4198 .meta(table_meta)
4199 .build()
4200 .unwrap();
4201 let table = EmptyTable::from_table_info(&table_info);
4202
4203 assert!(
4204 catalog_list
4205 .register_table_sync(RegisterTableRequest {
4206 catalog: DEFAULT_CATALOG_NAME.to_string(),
4207 schema: schema_name.clone(),
4208 table_name: table_name.clone(),
4209 table_id: 1024,
4210 table,
4211 })
4212 .is_ok()
4213 );
4214 }
4215
4216 DfTableSourceProvider::new(
4217 catalog_list,
4218 false,
4219 QueryContext::arc(),
4220 DummyDecoder::arc(),
4221 false,
4222 )
4223 }
4224
4225 async fn do_single_instant_function_call(fn_name: &'static str, plan_name: &str) {
4241 let prom_expr =
4242 parser::parse(&format!("{fn_name}(some_metric{{tag_0!=\"bar\"}})")).unwrap();
4243 let eval_stmt = EvalStmt {
4244 expr: prom_expr,
4245 start: UNIX_EPOCH,
4246 end: UNIX_EPOCH
4247 .checked_add(Duration::from_secs(100_000))
4248 .unwrap(),
4249 interval: Duration::from_secs(5),
4250 lookback_delta: Duration::from_secs(1),
4251 };
4252
4253 let table_provider = build_test_table_provider(
4254 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4255 1,
4256 1,
4257 )
4258 .await;
4259 let plan =
4260 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4261 .await
4262 .unwrap();
4263
4264 let expected = String::from(
4265 "Filter: TEMPLATE(field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4266 \n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) AS TEMPLATE(field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), TEMPLATE(field_0):Float64;N, tag_0:Utf8]\
4267 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4268 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4269 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4270 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
4271 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
4272 ).replace("TEMPLATE", plan_name);
4273
4274 assert_eq!(plan.display_indent_schema().to_string(), expected);
4275 }
4276
4277 #[tokio::test]
4278 async fn single_abs() {
4279 do_single_instant_function_call("abs", "abs").await;
4280 }
4281
4282 #[tokio::test]
4283 #[should_panic]
4284 async fn single_absent() {
4285 do_single_instant_function_call("absent", "").await;
4286 }
4287
4288 #[tokio::test]
4289 async fn single_ceil() {
4290 do_single_instant_function_call("ceil", "ceil").await;
4291 }
4292
4293 #[tokio::test]
4294 async fn single_exp() {
4295 do_single_instant_function_call("exp", "exp").await;
4296 }
4297
4298 #[tokio::test]
4299 async fn single_ln() {
4300 do_single_instant_function_call("ln", "ln").await;
4301 }
4302
4303 #[tokio::test]
4304 async fn single_log2() {
4305 do_single_instant_function_call("log2", "log2").await;
4306 }
4307
4308 #[tokio::test]
4309 async fn single_log10() {
4310 do_single_instant_function_call("log10", "log10").await;
4311 }
4312
4313 #[tokio::test]
4314 #[should_panic]
4315 async fn single_scalar() {
4316 do_single_instant_function_call("scalar", "").await;
4317 }
4318
4319 #[tokio::test]
4320 #[should_panic]
4321 async fn single_sgn() {
4322 do_single_instant_function_call("sgn", "").await;
4323 }
4324
4325 #[tokio::test]
4326 #[should_panic]
4327 async fn single_sort() {
4328 do_single_instant_function_call("sort", "").await;
4329 }
4330
4331 #[tokio::test]
4332 #[should_panic]
4333 async fn single_sort_desc() {
4334 do_single_instant_function_call("sort_desc", "").await;
4335 }
4336
4337 #[tokio::test]
4338 async fn single_sqrt() {
4339 do_single_instant_function_call("sqrt", "sqrt").await;
4340 }
4341
4342 #[tokio::test]
4343 #[should_panic]
4344 async fn single_timestamp() {
4345 do_single_instant_function_call("timestamp", "").await;
4346 }
4347
4348 #[tokio::test]
4349 async fn single_acos() {
4350 do_single_instant_function_call("acos", "acos").await;
4351 }
4352
4353 #[tokio::test]
4354 #[should_panic]
4355 async fn single_acosh() {
4356 do_single_instant_function_call("acosh", "").await;
4357 }
4358
4359 #[tokio::test]
4360 async fn single_asin() {
4361 do_single_instant_function_call("asin", "asin").await;
4362 }
4363
4364 #[tokio::test]
4365 #[should_panic]
4366 async fn single_asinh() {
4367 do_single_instant_function_call("asinh", "").await;
4368 }
4369
4370 #[tokio::test]
4371 async fn single_atan() {
4372 do_single_instant_function_call("atan", "atan").await;
4373 }
4374
4375 #[tokio::test]
4376 #[should_panic]
4377 async fn single_atanh() {
4378 do_single_instant_function_call("atanh", "").await;
4379 }
4380
4381 #[tokio::test]
4382 async fn single_cos() {
4383 do_single_instant_function_call("cos", "cos").await;
4384 }
4385
4386 #[tokio::test]
4387 #[should_panic]
4388 async fn single_cosh() {
4389 do_single_instant_function_call("cosh", "").await;
4390 }
4391
4392 #[tokio::test]
4393 async fn single_sin() {
4394 do_single_instant_function_call("sin", "sin").await;
4395 }
4396
4397 #[tokio::test]
4398 #[should_panic]
4399 async fn single_sinh() {
4400 do_single_instant_function_call("sinh", "").await;
4401 }
4402
4403 #[tokio::test]
4404 async fn single_tan() {
4405 do_single_instant_function_call("tan", "tan").await;
4406 }
4407
4408 #[tokio::test]
4409 #[should_panic]
4410 async fn single_tanh() {
4411 do_single_instant_function_call("tanh", "").await;
4412 }
4413
4414 #[tokio::test]
4415 #[should_panic]
4416 async fn single_deg() {
4417 do_single_instant_function_call("deg", "").await;
4418 }
4419
4420 #[tokio::test]
4421 #[should_panic]
4422 async fn single_rad() {
4423 do_single_instant_function_call("rad", "").await;
4424 }
4425
4426 async fn do_aggregate_expr_plan(fn_name: &str, plan_name: &str) {
4448 let prom_expr = parser::parse(&format!(
4449 "{fn_name} by (tag_1)(some_metric{{tag_0!=\"bar\"}})",
4450 ))
4451 .unwrap();
4452 let mut eval_stmt = EvalStmt {
4453 expr: prom_expr,
4454 start: UNIX_EPOCH,
4455 end: UNIX_EPOCH
4456 .checked_add(Duration::from_secs(100_000))
4457 .unwrap(),
4458 interval: Duration::from_secs(5),
4459 lookback_delta: Duration::from_secs(1),
4460 };
4461
4462 let table_provider = build_test_table_provider(
4464 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4465 2,
4466 2,
4467 )
4468 .await;
4469 let plan =
4470 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4471 .await
4472 .unwrap();
4473 let expected_no_without = String::from(
4474 "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4475 \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4476 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4477 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4478 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4479 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4480 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
4481 ).replace("TEMPLATE", plan_name);
4482 assert_eq!(
4483 plan.display_indent_schema().to_string(),
4484 expected_no_without
4485 );
4486
4487 if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
4489 *modifier = Some(LabelModifier::Exclude(Labels {
4490 labels: vec![String::from("tag_1")].into_iter().collect(),
4491 }));
4492 }
4493 let table_provider = build_test_table_provider(
4494 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4495 2,
4496 2,
4497 )
4498 .await;
4499 let plan =
4500 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4501 .await
4502 .unwrap();
4503 let expected_without = String::from(
4504 "Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4505 \n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
4506 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4507 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\"] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4508 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.tag_1 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4509 \n Filter: some_metric.tag_0 != Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
4510 \n TableScan: some_metric [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]"
4511 ).replace("TEMPLATE", plan_name);
4512 assert_eq!(plan.display_indent_schema().to_string(), expected_without);
4513 }
4514
4515 #[tokio::test]
4516 async fn aggregate_sum() {
4517 do_aggregate_expr_plan("sum", "sum").await;
4518 }
4519
4520 #[tokio::test]
4521 async fn tsid_is_used_for_series_divide_when_available() {
4522 let prom_expr = parser::parse("some_metric").unwrap();
4523 let eval_stmt = EvalStmt {
4524 expr: prom_expr,
4525 start: UNIX_EPOCH,
4526 end: UNIX_EPOCH
4527 .checked_add(Duration::from_secs(100_000))
4528 .unwrap(),
4529 interval: Duration::from_secs(5),
4530 lookback_delta: Duration::from_secs(1),
4531 };
4532
4533 let table_provider = build_test_table_provider_with_tsid(
4534 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4535 1,
4536 1,
4537 )
4538 .await;
4539 let plan =
4540 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4541 .await
4542 .unwrap();
4543
4544 let plan_str = plan.display_indent_schema().to_string();
4545 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4546 assert!(plan_str.contains("__tsid ASC NULLS FIRST"));
4547 assert!(
4548 !plan
4549 .schema()
4550 .fields()
4551 .iter()
4552 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4553 );
4554 }
4555
4556 #[tokio::test]
4557 async fn physical_table_name_is_not_leaked_in_plan() {
4558 let prom_expr = parser::parse("some_metric").unwrap();
4559 let eval_stmt = EvalStmt {
4560 expr: prom_expr,
4561 start: UNIX_EPOCH,
4562 end: UNIX_EPOCH
4563 .checked_add(Duration::from_secs(100_000))
4564 .unwrap(),
4565 interval: Duration::from_secs(5),
4566 lookback_delta: Duration::from_secs(1),
4567 };
4568
4569 let table_provider = build_test_table_provider_with_tsid(
4570 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4571 1,
4572 1,
4573 )
4574 .await;
4575 let plan =
4576 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4577 .await
4578 .unwrap();
4579
4580 let plan_str = plan.display_indent_schema().to_string();
4581 assert!(plan_str.contains("TableScan: phy"), "{plan}");
4582 assert!(plan_str.contains("SubqueryAlias: some_metric"));
4583 assert!(plan_str.contains("Filter: phy.__table_id = UInt32(1024)"));
4584 assert!(!plan_str.contains("TableScan: some_metric"));
4585 }
4586
4587 #[tokio::test]
4588 async fn sum_without_does_not_group_by_tsid() {
4589 let prom_expr = parser::parse("sum without (tag_0) (some_metric)").unwrap();
4590 let eval_stmt = EvalStmt {
4591 expr: prom_expr,
4592 start: UNIX_EPOCH,
4593 end: UNIX_EPOCH
4594 .checked_add(Duration::from_secs(100_000))
4595 .unwrap(),
4596 interval: Duration::from_secs(5),
4597 lookback_delta: Duration::from_secs(1),
4598 };
4599
4600 let table_provider = build_test_table_provider_with_tsid(
4601 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4602 1,
4603 1,
4604 )
4605 .await;
4606 let plan =
4607 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4608 .await
4609 .unwrap();
4610
4611 let plan_str = plan.display_indent_schema().to_string();
4612 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4613
4614 let aggr_line = plan_str
4615 .lines()
4616 .find(|line| line.contains("Aggregate: groupBy="))
4617 .unwrap();
4618 assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4619 }
4620
4621 #[tokio::test]
4622 async fn topk_without_does_not_partition_by_tsid() {
4623 let prom_expr = parser::parse("topk without (tag_0) (1, some_metric)").unwrap();
4624 let eval_stmt = EvalStmt {
4625 expr: prom_expr,
4626 start: UNIX_EPOCH,
4627 end: UNIX_EPOCH
4628 .checked_add(Duration::from_secs(100_000))
4629 .unwrap(),
4630 interval: Duration::from_secs(5),
4631 lookback_delta: Duration::from_secs(1),
4632 };
4633
4634 let table_provider = build_test_table_provider_with_tsid(
4635 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4636 1,
4637 1,
4638 )
4639 .await;
4640 let plan =
4641 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4642 .await
4643 .unwrap();
4644
4645 let plan_str = plan.display_indent_schema().to_string();
4646 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4647
4648 let window_line = plan_str
4649 .lines()
4650 .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
4651 .unwrap();
4652 let partition_by = window_line
4653 .split("PARTITION BY [")
4654 .nth(1)
4655 .and_then(|s| s.split("] ORDER BY").next())
4656 .unwrap();
4657 assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4658 }
4659
4660 #[tokio::test]
4661 async fn sum_by_does_not_group_by_tsid() {
4662 let prom_expr = parser::parse("sum by (__tsid) (some_metric)").unwrap();
4663 let eval_stmt = EvalStmt {
4664 expr: prom_expr,
4665 start: UNIX_EPOCH,
4666 end: UNIX_EPOCH
4667 .checked_add(Duration::from_secs(100_000))
4668 .unwrap(),
4669 interval: Duration::from_secs(5),
4670 lookback_delta: Duration::from_secs(1),
4671 };
4672
4673 let table_provider = build_test_table_provider_with_tsid(
4674 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4675 1,
4676 1,
4677 )
4678 .await;
4679 let plan =
4680 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4681 .await
4682 .unwrap();
4683
4684 let plan_str = plan.display_indent_schema().to_string();
4685 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4686
4687 let aggr_line = plan_str
4688 .lines()
4689 .find(|line| line.contains("Aggregate: groupBy="))
4690 .unwrap();
4691 assert!(!aggr_line.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4692 }
4693
4694 #[tokio::test]
4695 async fn topk_by_does_not_partition_by_tsid() {
4696 let prom_expr = parser::parse("topk by (__tsid) (1, some_metric)").unwrap();
4697 let eval_stmt = EvalStmt {
4698 expr: prom_expr,
4699 start: UNIX_EPOCH,
4700 end: UNIX_EPOCH
4701 .checked_add(Duration::from_secs(100_000))
4702 .unwrap(),
4703 interval: Duration::from_secs(5),
4704 lookback_delta: Duration::from_secs(1),
4705 };
4706
4707 let table_provider = build_test_table_provider_with_tsid(
4708 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4709 1,
4710 1,
4711 )
4712 .await;
4713 let plan =
4714 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4715 .await
4716 .unwrap();
4717
4718 let plan_str = plan.display_indent_schema().to_string();
4719 assert!(plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4720
4721 let window_line = plan_str
4722 .lines()
4723 .find(|line| line.contains("WindowAggr: windowExpr=[[row_number()"))
4724 .unwrap();
4725 let partition_by = window_line
4726 .split("PARTITION BY [")
4727 .nth(1)
4728 .and_then(|s| s.split("] ORDER BY").next())
4729 .unwrap();
4730 assert!(!partition_by.contains(DATA_SCHEMA_TSID_COLUMN_NAME));
4731 }
4732
4733 #[tokio::test]
4734 async fn selector_matcher_on_tsid_does_not_use_internal_column() {
4735 let prom_expr = parser::parse(r#"some_metric{__tsid="123"}"#).unwrap();
4736 let eval_stmt = EvalStmt {
4737 expr: prom_expr,
4738 start: UNIX_EPOCH,
4739 end: UNIX_EPOCH
4740 .checked_add(Duration::from_secs(100_000))
4741 .unwrap(),
4742 interval: Duration::from_secs(5),
4743 lookback_delta: Duration::from_secs(1),
4744 };
4745
4746 let table_provider = build_test_table_provider_with_tsid(
4747 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4748 1,
4749 1,
4750 )
4751 .await;
4752 let plan =
4753 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4754 .await
4755 .unwrap();
4756
4757 fn collect_filter_cols(plan: &LogicalPlan, out: &mut HashSet<Column>) {
4758 if let LogicalPlan::Filter(filter) = plan {
4759 datafusion_expr::utils::expr_to_columns(&filter.predicate, out).unwrap();
4760 }
4761 for input in plan.inputs() {
4762 collect_filter_cols(input, out);
4763 }
4764 }
4765
4766 let mut filter_cols = HashSet::new();
4767 collect_filter_cols(&plan, &mut filter_cols);
4768 assert!(
4769 !filter_cols
4770 .iter()
4771 .any(|c| c.name == DATA_SCHEMA_TSID_COLUMN_NAME)
4772 );
4773 }
4774
4775 #[tokio::test]
4776 async fn tsid_is_not_used_when_physical_table_is_missing() {
4777 let prom_expr = parser::parse("some_metric").unwrap();
4778 let eval_stmt = EvalStmt {
4779 expr: prom_expr,
4780 start: UNIX_EPOCH,
4781 end: UNIX_EPOCH
4782 .checked_add(Duration::from_secs(100_000))
4783 .unwrap(),
4784 interval: Duration::from_secs(5),
4785 lookback_delta: Duration::from_secs(1),
4786 };
4787
4788 let catalog_list = MemoryCatalogManager::with_default_setup();
4789
4790 let mut columns = vec![ColumnSchema::new(
4792 "tag_0".to_string(),
4793 ConcreteDataType::string_datatype(),
4794 false,
4795 )];
4796 columns.push(
4797 ColumnSchema::new(
4798 "timestamp".to_string(),
4799 ConcreteDataType::timestamp_millisecond_datatype(),
4800 false,
4801 )
4802 .with_time_index(true),
4803 );
4804 columns.push(ColumnSchema::new(
4805 "field_0".to_string(),
4806 ConcreteDataType::float64_datatype(),
4807 true,
4808 ));
4809 let schema = Arc::new(Schema::new(columns));
4810 let mut options = table::requests::TableOptions::default();
4811 options
4812 .extra_options
4813 .insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "phy".to_string());
4814 let table_meta = TableMetaBuilder::empty()
4815 .schema(schema)
4816 .primary_key_indices(vec![0])
4817 .value_indices(vec![2])
4818 .engine(METRIC_ENGINE_NAME.to_string())
4819 .options(options)
4820 .next_column_id(1024)
4821 .build()
4822 .unwrap();
4823 let table_info = TableInfoBuilder::default()
4824 .table_id(1024)
4825 .name("some_metric")
4826 .meta(table_meta)
4827 .build()
4828 .unwrap();
4829 let table = EmptyTable::from_table_info(&table_info);
4830 catalog_list
4831 .register_table_sync(RegisterTableRequest {
4832 catalog: DEFAULT_CATALOG_NAME.to_string(),
4833 schema: DEFAULT_SCHEMA_NAME.to_string(),
4834 table_name: "some_metric".to_string(),
4835 table_id: 1024,
4836 table,
4837 })
4838 .unwrap();
4839
4840 let table_provider = DfTableSourceProvider::new(
4841 catalog_list,
4842 false,
4843 QueryContext::arc(),
4844 DummyDecoder::arc(),
4845 false,
4846 );
4847
4848 let plan =
4849 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4850 .await
4851 .unwrap();
4852
4853 let plan_str = plan.display_indent_schema().to_string();
4854 assert!(plan_str.contains("PromSeriesDivide: tags=[\"tag_0\"]"));
4855 assert!(!plan_str.contains("PromSeriesDivide: tags=[\"__tsid\"]"));
4856 }
4857
4858 #[tokio::test]
4859 async fn tsid_is_carried_only_when_aggregate_preserves_label_set() {
4860 let prom_expr = parser::parse("sum by (tag_0) (some_metric)").unwrap();
4861 let eval_stmt = EvalStmt {
4862 expr: prom_expr,
4863 start: UNIX_EPOCH,
4864 end: UNIX_EPOCH
4865 .checked_add(Duration::from_secs(100_000))
4866 .unwrap(),
4867 interval: Duration::from_secs(5),
4868 lookback_delta: Duration::from_secs(1),
4869 };
4870
4871 let table_provider = build_test_table_provider_with_tsid(
4872 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4873 1,
4874 1,
4875 )
4876 .await;
4877 let plan =
4878 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4879 .await
4880 .unwrap();
4881
4882 let plan_str = plan.display_indent_schema().to_string();
4883 assert!(plan_str.contains("first_value") && plan_str.contains("__tsid"));
4884 assert!(
4885 !plan
4886 .schema()
4887 .fields()
4888 .iter()
4889 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4890 );
4891
4892 let prom_expr = parser::parse("sum(some_metric)").unwrap();
4894 let eval_stmt = EvalStmt {
4895 expr: prom_expr,
4896 start: UNIX_EPOCH,
4897 end: UNIX_EPOCH
4898 .checked_add(Duration::from_secs(100_000))
4899 .unwrap(),
4900 interval: Duration::from_secs(5),
4901 lookback_delta: Duration::from_secs(1),
4902 };
4903 let table_provider = build_test_table_provider_with_tsid(
4904 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4905 1,
4906 1,
4907 )
4908 .await;
4909 let plan =
4910 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4911 .await
4912 .unwrap();
4913 let plan_str = plan.display_indent_schema().to_string();
4914 assert!(!plan_str.contains("first_value"));
4915 }
4916
4917 #[tokio::test]
4918 async fn or_operator_with_unknown_metric_does_not_require_tsid() {
4919 let prom_expr = parser::parse("unknown_metric or some_metric").unwrap();
4920 let eval_stmt = EvalStmt {
4921 expr: prom_expr,
4922 start: UNIX_EPOCH,
4923 end: UNIX_EPOCH
4924 .checked_add(Duration::from_secs(100_000))
4925 .unwrap(),
4926 interval: Duration::from_secs(5),
4927 lookback_delta: Duration::from_secs(1),
4928 };
4929
4930 let table_provider = build_test_table_provider_with_tsid(
4931 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
4932 1,
4933 1,
4934 )
4935 .await;
4936
4937 let plan =
4938 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
4939 .await
4940 .unwrap();
4941
4942 assert!(
4943 !plan
4944 .schema()
4945 .fields()
4946 .iter()
4947 .any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
4948 );
4949 }
4950
4951 #[tokio::test]
4952 async fn aggregate_avg() {
4953 do_aggregate_expr_plan("avg", "avg").await;
4954 }
4955
4956 #[tokio::test]
4957 #[should_panic] async fn aggregate_count() {
4959 do_aggregate_expr_plan("count", "count").await;
4960 }
4961
4962 #[tokio::test]
4963 async fn aggregate_min() {
4964 do_aggregate_expr_plan("min", "min").await;
4965 }
4966
4967 #[tokio::test]
4968 async fn aggregate_max() {
4969 do_aggregate_expr_plan("max", "max").await;
4970 }
4971
4972 #[tokio::test]
4973 #[should_panic] async fn aggregate_group() {
4975 do_aggregate_expr_plan("grouping", "GROUPING").await;
4976 }
4977
4978 #[tokio::test]
4979 async fn aggregate_stddev() {
4980 do_aggregate_expr_plan("stddev", "stddev_pop").await;
4981 }
4982
4983 #[tokio::test]
4984 async fn aggregate_stdvar() {
4985 do_aggregate_expr_plan("stdvar", "var_pop").await;
4986 }
4987
4988 #[tokio::test]
5012 async fn binary_op_column_column() {
5013 let prom_expr =
5014 parser::parse(r#"some_metric{tag_0="foo"} + some_metric{tag_0="bar"}"#).unwrap();
5015 let eval_stmt = EvalStmt {
5016 expr: prom_expr,
5017 start: UNIX_EPOCH,
5018 end: UNIX_EPOCH
5019 .checked_add(Duration::from_secs(100_000))
5020 .unwrap(),
5021 interval: Duration::from_secs(5),
5022 lookback_delta: Duration::from_secs(1),
5023 };
5024
5025 let table_provider = build_test_table_provider(
5026 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5027 1,
5028 1,
5029 )
5030 .await;
5031 let plan =
5032 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5033 .await
5034 .unwrap();
5035
5036 let expected = String::from(
5037 "Projection: rhs.tag_0, rhs.timestamp, lhs.field_0 + rhs.field_0 AS lhs.field_0 + rhs.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), lhs.field_0 + rhs.field_0:Float64;N]\
5038 \n Inner Join: lhs.tag_0 = rhs.tag_0, lhs.timestamp = rhs.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5039 \n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5040 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5041 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5042 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5043 \n Filter: some_metric.tag_0 = Utf8(\"foo\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5044 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5045 \n SubqueryAlias: rhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5046 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5047 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5048 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5049 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5050 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5051 );
5052
5053 assert_eq!(plan.display_indent_schema().to_string(), expected);
5054 }
5055
5056 async fn indie_query_plan_compare<T: AsRef<str>>(query: &str, expected: T) {
5057 let prom_expr = parser::parse(query).unwrap();
5058 let eval_stmt = EvalStmt {
5059 expr: prom_expr,
5060 start: UNIX_EPOCH,
5061 end: UNIX_EPOCH
5062 .checked_add(Duration::from_secs(100_000))
5063 .unwrap(),
5064 interval: Duration::from_secs(5),
5065 lookback_delta: Duration::from_secs(1),
5066 };
5067
5068 let table_provider = build_test_table_provider(
5069 &[
5070 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5071 (
5072 "greptime_private".to_string(),
5073 "some_alt_metric".to_string(),
5074 ),
5075 ],
5076 1,
5077 1,
5078 )
5079 .await;
5080 let plan =
5081 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5082 .await
5083 .unwrap();
5084
5085 assert_eq!(plan.display_indent_schema().to_string(), expected.as_ref());
5086 }
5087
5088 #[tokio::test]
5089 async fn binary_op_literal_column() {
5090 let query = r#"1 + some_metric{tag_0="bar"}"#;
5091 let expected = String::from(
5092 "Projection: some_metric.tag_0, some_metric.timestamp, Float64(1) + some_metric.field_0 AS Float64(1) + field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), Float64(1) + field_0:Float64;N]\
5093 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5094 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5095 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5096 \n Filter: some_metric.tag_0 = Utf8(\"bar\") AND some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5097 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5098 );
5099
5100 indie_query_plan_compare(query, expected).await;
5101 }
5102
5103 #[tokio::test]
5104 async fn binary_op_literal_literal() {
5105 let query = r#"1 + 1"#;
5106 let expected = r#"EmptyMetric: range=[0..100000000], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
5107 TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
5108 indie_query_plan_compare(query, expected).await;
5109 }
5110
5111 #[tokio::test]
5112 async fn simple_bool_grammar() {
5113 let query = "some_metric != bool 1.2345";
5114 let expected = String::from(
5115 "Projection: some_metric.tag_0, some_metric.timestamp, CAST(some_metric.field_0 != Float64(1.2345) AS Float64) AS field_0 != Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 != Float64(1.2345):Float64;N]\
5116 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5117 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5118 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5119 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5120 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5121 );
5122
5123 indie_query_plan_compare(query, expected).await;
5124 }
5125
5126 #[tokio::test]
5127 async fn bool_with_additional_arithmetic() {
5128 let query = "some_metric + (1 == bool 2)";
5129 let expected = String::from(
5130 "Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + CAST(Float64(1) = Float64(2) AS Float64) AS field_0 + Float64(1) = Float64(2) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0 + Float64(1) = Float64(2):Float64;N]\
5131 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5132 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5133 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5134 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5135 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5136 );
5137
5138 indie_query_plan_compare(query, expected).await;
5139 }
5140
5141 #[tokio::test]
5142 async fn simple_unary() {
5143 let query = "-some_metric";
5144 let expected = String::from(
5145 "Projection: some_metric.tag_0, some_metric.timestamp, (- some_metric.field_0) AS (- field_0) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), (- field_0):Float64;N]\
5146 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5147 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5148 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5149 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5150 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5151 );
5152
5153 indie_query_plan_compare(query, expected).await;
5154 }
5155
5156 #[tokio::test]
5157 async fn increase_aggr() {
5158 let query = "increase(some_metric[5m])";
5159 let expected = String::from(
5160 "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5161 \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
5162 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
5163 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5164 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5165 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5166 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5167 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5168 );
5169
5170 indie_query_plan_compare(query, expected).await;
5171 }
5172
5173 #[tokio::test]
5174 async fn less_filter_on_value() {
5175 let query = "some_metric < 1.2345";
5176 let expected = String::from(
5177 "Filter: some_metric.field_0 < Float64(1.2345) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5178 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5179 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5180 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5181 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5182 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5183 );
5184
5185 indie_query_plan_compare(query, expected).await;
5186 }
5187
5188 #[tokio::test]
5189 async fn count_over_time() {
5190 let query = "count_over_time(some_metric[5m])";
5191 let expected = String::from(
5192 "Filter: prom_count_over_time(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5193 \n Projection: some_metric.timestamp, prom_count_over_time(timestamp_range, field_0) AS prom_count_over_time(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_count_over_time(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
5194 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
5195 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5196 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5197 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5198 \n Filter: some_metric.timestamp >= TimestampMillisecond(-301000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5199 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5200 );
5201
5202 indie_query_plan_compare(query, expected).await;
5203 }
5204
5205 #[tokio::test]
5206 async fn test_hash_join() {
5207 let mut eval_stmt = EvalStmt {
5208 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5209 start: UNIX_EPOCH,
5210 end: UNIX_EPOCH
5211 .checked_add(Duration::from_secs(100_000))
5212 .unwrap(),
5213 interval: Duration::from_secs(5),
5214 lookback_delta: Duration::from_secs(1),
5215 };
5216
5217 let case = r#"http_server_requests_seconds_sum{uri="/accounts/login"} / ignoring(kubernetes_pod_name,kubernetes_namespace) http_server_requests_seconds_count{uri="/accounts/login"}"#;
5218
5219 let prom_expr = parser::parse(case).unwrap();
5220 eval_stmt.expr = prom_expr;
5221 let table_provider = build_test_table_provider_with_fields(
5222 &[
5223 (
5224 DEFAULT_SCHEMA_NAME.to_string(),
5225 "http_server_requests_seconds_sum".to_string(),
5226 ),
5227 (
5228 DEFAULT_SCHEMA_NAME.to_string(),
5229 "http_server_requests_seconds_count".to_string(),
5230 ),
5231 ],
5232 &["uri", "kubernetes_namespace", "kubernetes_pod_name"],
5233 )
5234 .await;
5235 let plan =
5237 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5238 .await
5239 .unwrap();
5240 let expected = "Projection: http_server_requests_seconds_count.uri, http_server_requests_seconds_count.kubernetes_namespace, http_server_requests_seconds_count.kubernetes_pod_name, http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value AS http_server_requests_seconds_sum.greptime_value / http_server_requests_seconds_count.greptime_value\
5241 \n Inner Join: http_server_requests_seconds_sum.greptime_timestamp = http_server_requests_seconds_count.greptime_timestamp, http_server_requests_seconds_sum.uri = http_server_requests_seconds_count.uri\
5242 \n SubqueryAlias: http_server_requests_seconds_sum\
5243 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5244 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5245 \n Sort: http_server_requests_seconds_sum.uri ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_sum.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_sum.greptime_timestamp ASC NULLS FIRST\
5246 \n Filter: http_server_requests_seconds_sum.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_sum.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_sum.greptime_timestamp <= TimestampMillisecond(100001000, None)\
5247 \n TableScan: http_server_requests_seconds_sum\
5248 \n SubqueryAlias: http_server_requests_seconds_count\
5249 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp]\
5250 \n PromSeriesDivide: tags=[\"uri\", \"kubernetes_namespace\", \"kubernetes_pod_name\"]\
5251 \n Sort: http_server_requests_seconds_count.uri ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_namespace ASC NULLS FIRST, http_server_requests_seconds_count.kubernetes_pod_name ASC NULLS FIRST, http_server_requests_seconds_count.greptime_timestamp ASC NULLS FIRST\
5252 \n Filter: http_server_requests_seconds_count.uri = Utf8(\"/accounts/login\") AND http_server_requests_seconds_count.greptime_timestamp >= TimestampMillisecond(-1000, None) AND http_server_requests_seconds_count.greptime_timestamp <= TimestampMillisecond(100001000, None)\
5253 \n TableScan: http_server_requests_seconds_count";
5254 assert_eq!(plan.to_string(), expected);
5255 }
5256
5257 #[tokio::test]
5258 async fn test_nested_histogram_quantile() {
5259 let mut eval_stmt = EvalStmt {
5260 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5261 start: UNIX_EPOCH,
5262 end: UNIX_EPOCH
5263 .checked_add(Duration::from_secs(100_000))
5264 .unwrap(),
5265 interval: Duration::from_secs(5),
5266 lookback_delta: Duration::from_secs(1),
5267 };
5268
5269 let case = r#"label_replace(histogram_quantile(0.99, sum by(pod, le, path, code) (rate(greptime_servers_grpc_requests_elapsed_bucket{container="frontend"}[1m0s]))), "pod_new", "$1", "pod", "greptimedb-frontend-[0-9a-z]*-(.*)")"#;
5270
5271 let prom_expr = parser::parse(case).unwrap();
5272 eval_stmt.expr = prom_expr;
5273 let table_provider = build_test_table_provider_with_fields(
5274 &[(
5275 DEFAULT_SCHEMA_NAME.to_string(),
5276 "greptime_servers_grpc_requests_elapsed_bucket".to_string(),
5277 )],
5278 &["pod", "le", "path", "code", "container"],
5279 )
5280 .await;
5281 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5283 .await
5284 .unwrap();
5285 }
5286
5287 #[tokio::test]
5288 async fn test_parse_and_operator() {
5289 let mut eval_stmt = EvalStmt {
5290 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5291 start: UNIX_EPOCH,
5292 end: UNIX_EPOCH
5293 .checked_add(Duration::from_secs(100_000))
5294 .unwrap(),
5295 interval: Duration::from_secs(5),
5296 lookback_delta: Duration::from_secs(1),
5297 };
5298
5299 let cases = [
5300 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) and (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5301 r#"count (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} ) unless (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_used_bytes{namespace=~".+"} )) / (max by (persistentvolumeclaim,namespace) (kubelet_volume_stats_capacity_bytes{namespace=~".+"} )) >= (80 / 100)) or vector (0)"#,
5302 ];
5303
5304 for case in cases {
5305 let prom_expr = parser::parse(case).unwrap();
5306 eval_stmt.expr = prom_expr;
5307 let table_provider = build_test_table_provider_with_fields(
5308 &[
5309 (
5310 DEFAULT_SCHEMA_NAME.to_string(),
5311 "kubelet_volume_stats_used_bytes".to_string(),
5312 ),
5313 (
5314 DEFAULT_SCHEMA_NAME.to_string(),
5315 "kubelet_volume_stats_capacity_bytes".to_string(),
5316 ),
5317 ],
5318 &["namespace", "persistentvolumeclaim"],
5319 )
5320 .await;
5321 let _ =
5323 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5324 .await
5325 .unwrap();
5326 }
5327 }
5328
5329 #[tokio::test]
5330 async fn test_nested_binary_op() {
5331 let mut eval_stmt = EvalStmt {
5332 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5333 start: UNIX_EPOCH,
5334 end: UNIX_EPOCH
5335 .checked_add(Duration::from_secs(100_000))
5336 .unwrap(),
5337 interval: Duration::from_secs(5),
5338 lookback_delta: Duration::from_secs(1),
5339 };
5340
5341 let case = r#"sum(rate(nginx_ingress_controller_requests{job=~".*"}[2m])) -
5342 (
5343 sum(rate(nginx_ingress_controller_requests{namespace=~".*"}[2m]))
5344 or
5345 vector(0)
5346 )"#;
5347
5348 let prom_expr = parser::parse(case).unwrap();
5349 eval_stmt.expr = prom_expr;
5350 let table_provider = build_test_table_provider_with_fields(
5351 &[(
5352 DEFAULT_SCHEMA_NAME.to_string(),
5353 "nginx_ingress_controller_requests".to_string(),
5354 )],
5355 &["namespace", "job"],
5356 )
5357 .await;
5358 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5360 .await
5361 .unwrap();
5362 }
5363
5364 #[tokio::test]
5365 async fn test_parse_or_operator() {
5366 let mut eval_stmt = EvalStmt {
5367 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5368 start: UNIX_EPOCH,
5369 end: UNIX_EPOCH
5370 .checked_add(Duration::from_secs(100_000))
5371 .unwrap(),
5372 interval: Duration::from_secs(5),
5373 lookback_delta: Duration::from_secs(1),
5374 };
5375
5376 let case = r#"
5377 sum(rate(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}[120s])) by (cluster_name,tenant_name) /
5378 (sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) * 100)
5379 or
5380 200 * sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) /
5381 sum(sysstat{tenant_name=~"tenant1",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)"#;
5382
5383 let table_provider = build_test_table_provider_with_fields(
5384 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5385 &["tenant_name", "cluster_name"],
5386 )
5387 .await;
5388 eval_stmt.expr = parser::parse(case).unwrap();
5389 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5390 .await
5391 .unwrap();
5392
5393 let case = r#"sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5394 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) +
5395 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5396 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5397 or
5398 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5399 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0
5400 or
5401 sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) /
5402 (sum(delta(sysstat{tenant_name=~"sys",cluster_name=~"cluster1"}[2m])/120) by (cluster_name,tenant_name) *1000) >= 0"#;
5403 let table_provider = build_test_table_provider_with_fields(
5404 &[(DEFAULT_SCHEMA_NAME.to_string(), "sysstat".to_string())],
5405 &["tenant_name", "cluster_name"],
5406 )
5407 .await;
5408 eval_stmt.expr = parser::parse(case).unwrap();
5409 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5410 .await
5411 .unwrap();
5412
5413 let case = r#"(sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name) +
5414 sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5415 (sum(background_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name)) or
5416 (sum(foreground_waitevent_cnt{tenant_name=~"sys",cluster_name=~"cluster1"}) by (cluster_name,tenant_name))"#;
5417 let table_provider = build_test_table_provider_with_fields(
5418 &[
5419 (
5420 DEFAULT_SCHEMA_NAME.to_string(),
5421 "background_waitevent_cnt".to_string(),
5422 ),
5423 (
5424 DEFAULT_SCHEMA_NAME.to_string(),
5425 "foreground_waitevent_cnt".to_string(),
5426 ),
5427 ],
5428 &["tenant_name", "cluster_name"],
5429 )
5430 .await;
5431 eval_stmt.expr = parser::parse(case).unwrap();
5432 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5433 .await
5434 .unwrap();
5435
5436 let case = r#"avg(node_load1{cluster_name=~"cluster1"}) by (cluster_name,host_name) or max(container_cpu_load_average_10s{cluster_name=~"cluster1"}) by (cluster_name,host_name) * 100 / max(container_spec_cpu_quota{cluster_name=~"cluster1"}) by (cluster_name,host_name)"#;
5437 let table_provider = build_test_table_provider_with_fields(
5438 &[
5439 (DEFAULT_SCHEMA_NAME.to_string(), "node_load1".to_string()),
5440 (
5441 DEFAULT_SCHEMA_NAME.to_string(),
5442 "container_cpu_load_average_10s".to_string(),
5443 ),
5444 (
5445 DEFAULT_SCHEMA_NAME.to_string(),
5446 "container_spec_cpu_quota".to_string(),
5447 ),
5448 ],
5449 &["cluster_name", "host_name"],
5450 )
5451 .await;
5452 eval_stmt.expr = parser::parse(case).unwrap();
5453 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5454 .await
5455 .unwrap();
5456 }
5457
5458 #[tokio::test]
5459 async fn value_matcher() {
5460 let mut eval_stmt = EvalStmt {
5462 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5463 start: UNIX_EPOCH,
5464 end: UNIX_EPOCH
5465 .checked_add(Duration::from_secs(100_000))
5466 .unwrap(),
5467 interval: Duration::from_secs(5),
5468 lookback_delta: Duration::from_secs(1),
5469 };
5470
5471 let cases = [
5472 (
5474 r#"some_metric{__field__="field_1"}"#,
5475 vec![
5476 "some_metric.field_1",
5477 "some_metric.tag_0",
5478 "some_metric.tag_1",
5479 "some_metric.tag_2",
5480 "some_metric.timestamp",
5481 ],
5482 ),
5483 (
5485 r#"some_metric{__field__="field_1", __field__="field_0"}"#,
5486 vec![
5487 "some_metric.field_0",
5488 "some_metric.field_1",
5489 "some_metric.tag_0",
5490 "some_metric.tag_1",
5491 "some_metric.tag_2",
5492 "some_metric.timestamp",
5493 ],
5494 ),
5495 (
5497 r#"some_metric{__field__!="field_1"}"#,
5498 vec![
5499 "some_metric.field_0",
5500 "some_metric.field_2",
5501 "some_metric.tag_0",
5502 "some_metric.tag_1",
5503 "some_metric.tag_2",
5504 "some_metric.timestamp",
5505 ],
5506 ),
5507 (
5509 r#"some_metric{__field__!="field_1", __field__!="field_2"}"#,
5510 vec![
5511 "some_metric.field_0",
5512 "some_metric.tag_0",
5513 "some_metric.tag_1",
5514 "some_metric.tag_2",
5515 "some_metric.timestamp",
5516 ],
5517 ),
5518 (
5520 r#"some_metric{__field__="field_1", __field__!="field_0"}"#,
5521 vec![
5522 "some_metric.field_1",
5523 "some_metric.tag_0",
5524 "some_metric.tag_1",
5525 "some_metric.tag_2",
5526 "some_metric.timestamp",
5527 ],
5528 ),
5529 (
5531 r#"some_metric{__field__="field_2", __field__!="field_2"}"#,
5532 vec![
5533 "some_metric.tag_0",
5534 "some_metric.tag_1",
5535 "some_metric.tag_2",
5536 "some_metric.timestamp",
5537 ],
5538 ),
5539 (
5541 r#"some_metric{__field__=~"field_1|field_2"}"#,
5542 vec![
5543 "some_metric.field_1",
5544 "some_metric.field_2",
5545 "some_metric.tag_0",
5546 "some_metric.tag_1",
5547 "some_metric.tag_2",
5548 "some_metric.timestamp",
5549 ],
5550 ),
5551 (
5553 r#"some_metric{__field__!~"field_1|field_2"}"#,
5554 vec![
5555 "some_metric.field_0",
5556 "some_metric.tag_0",
5557 "some_metric.tag_1",
5558 "some_metric.tag_2",
5559 "some_metric.timestamp",
5560 ],
5561 ),
5562 ];
5563
5564 for case in cases {
5565 let prom_expr = parser::parse(case.0).unwrap();
5566 eval_stmt.expr = prom_expr;
5567 let table_provider = build_test_table_provider(
5568 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5569 3,
5570 3,
5571 )
5572 .await;
5573 let plan =
5574 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5575 .await
5576 .unwrap();
5577 let mut fields = plan.schema().field_names();
5578 let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
5579 fields.sort();
5580 expected.sort();
5581 assert_eq!(fields, expected, "case: {:?}", case.0);
5582 }
5583
5584 let bad_cases = [
5585 r#"some_metric{__field__="nonexistent"}"#,
5586 r#"some_metric{__field__!="nonexistent"}"#,
5587 ];
5588
5589 for case in bad_cases {
5590 let prom_expr = parser::parse(case).unwrap();
5591 eval_stmt.expr = prom_expr;
5592 let table_provider = build_test_table_provider(
5593 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5594 3,
5595 3,
5596 )
5597 .await;
5598 let plan =
5599 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5600 .await;
5601 assert!(plan.is_err(), "case: {:?}", case);
5602 }
5603 }
5604
5605 #[tokio::test]
5606 async fn custom_schema() {
5607 let query = "some_alt_metric{__schema__=\"greptime_private\"}";
5608 let expected = String::from(
5609 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5610 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5611 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5612 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5613 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5614 );
5615
5616 indie_query_plan_compare(query, expected).await;
5617
5618 let query = "some_alt_metric{__database__=\"greptime_private\"}";
5619 let expected = String::from(
5620 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5621 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5622 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5623 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5624 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5625 );
5626
5627 indie_query_plan_compare(query, expected).await;
5628
5629 let query = "some_alt_metric{__schema__=\"greptime_private\"} / some_metric";
5630 let expected = String::from(
5631 "Projection: some_metric.tag_0, some_metric.timestamp, greptime_private.some_alt_metric.field_0 / some_metric.field_0 AS greptime_private.some_alt_metric.field_0 / some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), greptime_private.some_alt_metric.field_0 / some_metric.field_0:Float64;N]\
5632 \n Inner Join: greptime_private.some_alt_metric.tag_0 = some_metric.tag_0, greptime_private.some_alt_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5633 \n SubqueryAlias: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5634 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5635 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5636 \n Sort: greptime_private.some_alt_metric.tag_0 ASC NULLS FIRST, greptime_private.some_alt_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5637 \n Filter: greptime_private.some_alt_metric.timestamp >= TimestampMillisecond(-1000, None) AND greptime_private.some_alt_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5638 \n TableScan: greptime_private.some_alt_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5639 \n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5640 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5641 \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5642 \n Sort: some_metric.tag_0 ASC NULLS FIRST, some_metric.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5643 \n Filter: some_metric.timestamp >= TimestampMillisecond(-1000, None) AND some_metric.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
5644 \n TableScan: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
5645 );
5646
5647 indie_query_plan_compare(query, expected).await;
5648 }
5649
5650 #[tokio::test]
5651 async fn only_equals_is_supported_for_special_matcher() {
5652 let queries = &[
5653 "some_alt_metric{__schema__!=\"greptime_private\"}",
5654 "some_alt_metric{__schema__=~\"lalala\"}",
5655 "some_alt_metric{__database__!=\"greptime_private\"}",
5656 "some_alt_metric{__database__=~\"lalala\"}",
5657 ];
5658
5659 for query in queries {
5660 let prom_expr = parser::parse(query).unwrap();
5661 let eval_stmt = EvalStmt {
5662 expr: prom_expr,
5663 start: UNIX_EPOCH,
5664 end: UNIX_EPOCH
5665 .checked_add(Duration::from_secs(100_000))
5666 .unwrap(),
5667 interval: Duration::from_secs(5),
5668 lookback_delta: Duration::from_secs(1),
5669 };
5670
5671 let table_provider = build_test_table_provider(
5672 &[
5673 (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
5674 (
5675 "greptime_private".to_string(),
5676 "some_alt_metric".to_string(),
5677 ),
5678 ],
5679 1,
5680 1,
5681 )
5682 .await;
5683
5684 let plan =
5685 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5686 .await;
5687 assert!(plan.is_err(), "query: {:?}", query);
5688 }
5689 }
5690
5691 #[tokio::test]
5692 async fn test_non_ms_precision() {
5693 let catalog_list = MemoryCatalogManager::with_default_setup();
5694 let columns = vec![
5695 ColumnSchema::new(
5696 "tag".to_string(),
5697 ConcreteDataType::string_datatype(),
5698 false,
5699 ),
5700 ColumnSchema::new(
5701 "timestamp".to_string(),
5702 ConcreteDataType::timestamp_nanosecond_datatype(),
5703 false,
5704 )
5705 .with_time_index(true),
5706 ColumnSchema::new(
5707 "field".to_string(),
5708 ConcreteDataType::float64_datatype(),
5709 true,
5710 ),
5711 ];
5712 let schema = Arc::new(Schema::new(columns));
5713 let table_meta = TableMetaBuilder::empty()
5714 .schema(schema)
5715 .primary_key_indices(vec![0])
5716 .value_indices(vec![2])
5717 .next_column_id(1024)
5718 .build()
5719 .unwrap();
5720 let table_info = TableInfoBuilder::default()
5721 .name("metrics".to_string())
5722 .meta(table_meta)
5723 .build()
5724 .unwrap();
5725 let table = EmptyTable::from_table_info(&table_info);
5726 assert!(
5727 catalog_list
5728 .register_table_sync(RegisterTableRequest {
5729 catalog: DEFAULT_CATALOG_NAME.to_string(),
5730 schema: DEFAULT_SCHEMA_NAME.to_string(),
5731 table_name: "metrics".to_string(),
5732 table_id: 1024,
5733 table,
5734 })
5735 .is_ok()
5736 );
5737
5738 let plan = PromPlanner::stmt_to_plan(
5739 DfTableSourceProvider::new(
5740 catalog_list.clone(),
5741 false,
5742 QueryContext::arc(),
5743 DummyDecoder::arc(),
5744 true,
5745 ),
5746 &EvalStmt {
5747 expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
5748 start: UNIX_EPOCH,
5749 end: UNIX_EPOCH
5750 .checked_add(Duration::from_secs(100_000))
5751 .unwrap(),
5752 interval: Duration::from_secs(5),
5753 lookback_delta: Duration::from_secs(1),
5754 },
5755 &build_query_engine_state(),
5756 )
5757 .await
5758 .unwrap();
5759 assert_eq!(
5760 plan.display_indent_schema().to_string(),
5761 "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5762 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5763 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5764 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5765 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5766 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
5767 );
5768 let plan = PromPlanner::stmt_to_plan(
5769 DfTableSourceProvider::new(
5770 catalog_list.clone(),
5771 false,
5772 QueryContext::arc(),
5773 DummyDecoder::arc(),
5774 true,
5775 ),
5776 &EvalStmt {
5777 expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
5778 start: UNIX_EPOCH,
5779 end: UNIX_EPOCH
5780 .checked_add(Duration::from_secs(100_000))
5781 .unwrap(),
5782 interval: Duration::from_secs(5),
5783 lookback_delta: Duration::from_secs(1),
5784 },
5785 &build_query_engine_state(),
5786 )
5787 .await
5788 .unwrap();
5789 assert_eq!(
5790 plan.display_indent_schema().to_string(),
5791 "Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
5792 \n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
5793 \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
5794 \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5795 \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5796 \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5797 \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5798 \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
5799 \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
5800 );
5801 }
5802
5803 #[tokio::test]
5804 async fn test_nonexistent_label() {
5805 let mut eval_stmt = EvalStmt {
5807 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5808 start: UNIX_EPOCH,
5809 end: UNIX_EPOCH
5810 .checked_add(Duration::from_secs(100_000))
5811 .unwrap(),
5812 interval: Duration::from_secs(5),
5813 lookback_delta: Duration::from_secs(1),
5814 };
5815
5816 let case = r#"some_metric{nonexistent="hi"}"#;
5817 let prom_expr = parser::parse(case).unwrap();
5818 eval_stmt.expr = prom_expr;
5819 let table_provider = build_test_table_provider(
5820 &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
5821 3,
5822 3,
5823 )
5824 .await;
5825 let _ = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5827 .await
5828 .unwrap();
5829 }
5830
5831 #[tokio::test]
5832 async fn test_label_join() {
5833 let prom_expr = parser::parse(
5834 "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
5835 )
5836 .unwrap();
5837 let eval_stmt = EvalStmt {
5838 expr: prom_expr,
5839 start: UNIX_EPOCH,
5840 end: UNIX_EPOCH
5841 .checked_add(Duration::from_secs(100_000))
5842 .unwrap(),
5843 interval: Duration::from_secs(5),
5844 lookback_delta: Duration::from_secs(1),
5845 };
5846
5847 let table_provider =
5848 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
5849 .await;
5850 let plan =
5851 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5852 .await
5853 .unwrap();
5854
5855 let expected = r#"
5856Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
5857 Projection: up.timestamp, up.field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
5858 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5859 PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5860 Sort: up.tag_0 ASC NULLS FIRST, up.tag_1 ASC NULLS FIRST, up.tag_2 ASC NULLS FIRST, up.tag_3 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5861 Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5862 TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
5863
5864 let ret = plan.display_indent_schema().to_string();
5865 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
5866 }
5867
5868 #[tokio::test]
5869 async fn test_label_replace() {
5870 let prom_expr = parser::parse(
5871 "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
5872 )
5873 .unwrap();
5874 let eval_stmt = EvalStmt {
5875 expr: prom_expr,
5876 start: UNIX_EPOCH,
5877 end: UNIX_EPOCH
5878 .checked_add(Duration::from_secs(100_000))
5879 .unwrap(),
5880 interval: Duration::from_secs(5),
5881 lookback_delta: Duration::from_secs(1),
5882 };
5883
5884 let table_provider =
5885 build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
5886 .await;
5887 let plan =
5888 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5889 .await
5890 .unwrap();
5891
5892 let expected = r#"
5893Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
5894 Projection: up.timestamp, up.field_0, regexp_replace(up.tag_0, Utf8("^(?s:(.*):.*)$"), Utf8("$1")) AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
5895 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5896 PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5897 Sort: up.tag_0 ASC NULLS FIRST, up.timestamp ASC NULLS FIRST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5898 Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
5899 TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;
5900
5901 let ret = plan.display_indent_schema().to_string();
5902 assert_eq!(format!("\n{ret}"), expected, "\n{}", ret);
5903 }
5904
5905 #[tokio::test]
5906 async fn test_matchers_to_expr() {
5907 let mut eval_stmt = EvalStmt {
5908 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5909 start: UNIX_EPOCH,
5910 end: UNIX_EPOCH
5911 .checked_add(Duration::from_secs(100_000))
5912 .unwrap(),
5913 interval: Duration::from_secs(5),
5914 lookback_delta: Duration::from_secs(1),
5915 };
5916 let case =
5917 r#"sum(prometheus_tsdb_head_series{tag_1=~"(10.0.160.237:8080|10.0.160.237:9090)"})"#;
5918
5919 let prom_expr = parser::parse(case).unwrap();
5920 eval_stmt.expr = prom_expr;
5921 let table_provider = build_test_table_provider(
5922 &[(
5923 DEFAULT_SCHEMA_NAME.to_string(),
5924 "prometheus_tsdb_head_series".to_string(),
5925 )],
5926 3,
5927 3,
5928 )
5929 .await;
5930 let plan =
5931 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5932 .await
5933 .unwrap();
5934 let expected = "Sort: prometheus_tsdb_head_series.timestamp ASC NULLS LAST [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
5935 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.timestamp]], aggr=[[sum(prometheus_tsdb_head_series.field_0), sum(prometheus_tsdb_head_series.field_1), sum(prometheus_tsdb_head_series.field_2)]] [timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.field_0):Float64;N, sum(prometheus_tsdb_head_series.field_1):Float64;N, sum(prometheus_tsdb_head_series.field_2):Float64;N]\
5936 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
5937 \n PromSeriesDivide: tags=[\"tag_0\", \"tag_1\", \"tag_2\"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
5938 \n Sort: prometheus_tsdb_head_series.tag_0 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_1 ASC NULLS FIRST, prometheus_tsdb_head_series.tag_2 ASC NULLS FIRST, prometheus_tsdb_head_series.timestamp ASC NULLS FIRST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
5939 \n Filter: prometheus_tsdb_head_series.tag_1 ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]\
5940 \n TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]";
5941 assert_eq!(plan.display_indent_schema().to_string(), expected);
5942 }
5943
5944 #[tokio::test]
5945 async fn test_topk_expr() {
5946 let mut eval_stmt = EvalStmt {
5947 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5948 start: UNIX_EPOCH,
5949 end: UNIX_EPOCH
5950 .checked_add(Duration::from_secs(100_000))
5951 .unwrap(),
5952 interval: Duration::from_secs(5),
5953 lookback_delta: Duration::from_secs(1),
5954 };
5955 let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
5956
5957 let prom_expr = parser::parse(case).unwrap();
5958 eval_stmt.expr = prom_expr;
5959 let table_provider = build_test_table_provider_with_fields(
5960 &[
5961 (
5962 DEFAULT_SCHEMA_NAME.to_string(),
5963 "prometheus_tsdb_head_series".to_string(),
5964 ),
5965 (
5966 DEFAULT_SCHEMA_NAME.to_string(),
5967 "http_server_requests_seconds_count".to_string(),
5968 ),
5969 ],
5970 &["ip"],
5971 )
5972 .await;
5973
5974 let plan =
5975 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
5976 .await
5977 .unwrap();
5978 let expected = "Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]\
5979 \n Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
5980 \n Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
5981 \n WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS FIRST, prometheus_tsdb_head_series.ip DESC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]\
5982 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
5983 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
5984 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5985 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5986 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5987 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
5988 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
5989
5990 assert_eq!(plan.display_indent_schema().to_string(), expected);
5991 }
5992
5993 #[tokio::test]
5994 async fn test_count_values_expr() {
5995 let mut eval_stmt = EvalStmt {
5996 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
5997 start: UNIX_EPOCH,
5998 end: UNIX_EPOCH
5999 .checked_add(Duration::from_secs(100_000))
6000 .unwrap(),
6001 interval: Duration::from_secs(5),
6002 lookback_delta: Duration::from_secs(1),
6003 };
6004 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6005
6006 let prom_expr = parser::parse(case).unwrap();
6007 eval_stmt.expr = prom_expr;
6008 let table_provider = build_test_table_provider_with_fields(
6009 &[
6010 (
6011 DEFAULT_SCHEMA_NAME.to_string(),
6012 "prometheus_tsdb_head_series".to_string(),
6013 ),
6014 (
6015 DEFAULT_SCHEMA_NAME.to_string(),
6016 "http_server_requests_seconds_count".to_string(),
6017 ),
6018 ],
6019 &["ip"],
6020 )
6021 .await;
6022
6023 let plan =
6024 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6025 .await
6026 .unwrap();
6027 let expected = "Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]\
6028 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
6029 \n Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]\
6030 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]\
6031 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6032 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6033 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6034 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6035 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
6036
6037 assert_eq!(plan.display_indent_schema().to_string(), expected);
6038 }
6039
6040 #[tokio::test]
6041 async fn test_value_alias() {
6042 let mut eval_stmt = EvalStmt {
6043 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6044 start: UNIX_EPOCH,
6045 end: UNIX_EPOCH
6046 .checked_add(Duration::from_secs(100_000))
6047 .unwrap(),
6048 interval: Duration::from_secs(5),
6049 lookback_delta: Duration::from_secs(1),
6050 };
6051 let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip)"#;
6052
6053 let prom_expr = parser::parse(case).unwrap();
6054 eval_stmt.expr = prom_expr;
6055 let table_provider = build_test_table_provider_with_fields(
6056 &[
6057 (
6058 DEFAULT_SCHEMA_NAME.to_string(),
6059 "prometheus_tsdb_head_series".to_string(),
6060 ),
6061 (
6062 DEFAULT_SCHEMA_NAME.to_string(),
6063 "http_server_requests_seconds_count".to_string(),
6064 ),
6065 ],
6066 &["ip"],
6067 )
6068 .await;
6069
6070 let alias = Some("my_series".to_string());
6071 let plan = PromPlanner::stmt_to_plan_with_alias(
6072 table_provider,
6073 &eval_stmt,
6074 alias,
6075 &build_query_engine_state(),
6076 )
6077 .await
6078 .unwrap();
6079 let expected = r#"
6080Projection: count(prometheus_tsdb_head_series.greptime_value) AS my_series, prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [my_series:Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]
6081 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]
6082 Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
6083 Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
6084 Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
6085 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6086 PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6087 Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6088 Filter: prometheus_tsdb_head_series.ip ~ Utf8("^(?:(10.0.160.237:8080|10.0.160.237:9090))$") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6089 TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
6090 assert_eq!(format!("\n{}", plan.display_indent_schema()), expected);
6091 }
6092
6093 #[tokio::test]
6094 async fn test_quantile_expr() {
6095 let mut eval_stmt = EvalStmt {
6096 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6097 start: UNIX_EPOCH,
6098 end: UNIX_EPOCH
6099 .checked_add(Duration::from_secs(100_000))
6100 .unwrap(),
6101 interval: Duration::from_secs(5),
6102 lookback_delta: Duration::from_secs(1),
6103 };
6104 let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10.0.160.237:8080|10.0.160.237:9090)"}) by (ip))"#;
6105
6106 let prom_expr = parser::parse(case).unwrap();
6107 eval_stmt.expr = prom_expr;
6108 let table_provider = build_test_table_provider_with_fields(
6109 &[
6110 (
6111 DEFAULT_SCHEMA_NAME.to_string(),
6112 "prometheus_tsdb_head_series".to_string(),
6113 ),
6114 (
6115 DEFAULT_SCHEMA_NAME.to_string(),
6116 "http_server_requests_seconds_count".to_string(),
6117 ),
6118 ],
6119 &["ip"],
6120 )
6121 .await;
6122
6123 let plan =
6124 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6125 .await
6126 .unwrap();
6127 let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6128 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
6129 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6130 \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
6131 \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6132 \n PromSeriesDivide: tags=[\"ip\"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6133 \n Sort: prometheus_tsdb_head_series.ip ASC NULLS FIRST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS FIRST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6134 \n Filter: prometheus_tsdb_head_series.ip ~ Utf8(\"^(?:(10.0.160.237:8080|10.0.160.237:9090))$\") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\
6135 \n TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]";
6136
6137 assert_eq!(plan.display_indent_schema().to_string(), expected);
6138 }
6139
6140 #[tokio::test]
6141 async fn test_or_not_exists_table_label() {
6142 let mut eval_stmt = EvalStmt {
6143 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6144 start: UNIX_EPOCH,
6145 end: UNIX_EPOCH
6146 .checked_add(Duration::from_secs(100_000))
6147 .unwrap(),
6148 interval: Duration::from_secs(5),
6149 lookback_delta: Duration::from_secs(1),
6150 };
6151 let case = r#"sum by (job, tag0, tag2) (metric_exists) or sum by (job, tag0, tag2) (metric_not_exists)"#;
6152
6153 let prom_expr = parser::parse(case).unwrap();
6154 eval_stmt.expr = prom_expr;
6155 let table_provider = build_test_table_provider_with_fields(
6156 &[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
6157 &["job"],
6158 )
6159 .await;
6160
6161 let plan =
6162 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6163 .await
6164 .unwrap();
6165 let expected = r#"UnionDistinctOn: on col=[["job"]], ts_col=[greptime_timestamp] [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6166 SubqueryAlias: metric_exists [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6167 Projection: metric_exists.greptime_timestamp, metric_exists.job, sum(metric_exists.greptime_value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8, sum(metric_exists.greptime_value):Float64;N]
6168 Sort: metric_exists.job ASC NULLS LAST, metric_exists.greptime_timestamp ASC NULLS LAST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
6169 Aggregate: groupBy=[[metric_exists.job, metric_exists.greptime_timestamp]], aggr=[[sum(metric_exists.greptime_value)]] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(metric_exists.greptime_value):Float64;N]
6170 PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6171 PromSeriesDivide: tags=["job"] [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6172 Sort: metric_exists.job ASC NULLS FIRST, metric_exists.greptime_timestamp ASC NULLS FIRST [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6173 Filter: metric_exists.greptime_timestamp >= TimestampMillisecond(-1000, None) AND metric_exists.greptime_timestamp <= TimestampMillisecond(100001000, None) [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6174 TableScan: metric_exists [job:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
6175 SubqueryAlias: [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
6176 Projection: .time AS greptime_timestamp, Utf8(NULL) AS job, sum(.value) [greptime_timestamp:Timestamp(Millisecond, None), job:Utf8;N, sum(.value):Float64;N]
6177 Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
6178 Aggregate: groupBy=[[.time]], aggr=[[sum(.value)]] [time:Timestamp(Millisecond, None), sum(.value):Float64;N]
6179 EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
6180 TableScan: dummy [time:Timestamp(Millisecond, None), value:Float64;N]"#;
6181
6182 assert_eq!(plan.display_indent_schema().to_string(), expected);
6183 }
6184
6185 #[tokio::test]
6186 async fn test_histogram_quantile_missing_le_column() {
6187 let mut eval_stmt = EvalStmt {
6188 expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
6189 start: UNIX_EPOCH,
6190 end: UNIX_EPOCH
6191 .checked_add(Duration::from_secs(100_000))
6192 .unwrap(),
6193 interval: Duration::from_secs(5),
6194 lookback_delta: Duration::from_secs(1),
6195 };
6196
6197 let case = r#"histogram_quantile(0.99, sum by(pod,instance,le) (rate(non_existent_histogram_bucket{instance=~"xxx"}[1m])))"#;
6199
6200 let prom_expr = parser::parse(case).unwrap();
6201 eval_stmt.expr = prom_expr;
6202
6203 let table_provider = build_test_table_provider_with_fields(
6205 &[(
6206 DEFAULT_SCHEMA_NAME.to_string(),
6207 "non_existent_histogram_bucket".to_string(),
6208 )],
6209 &["pod", "instance"], )
6211 .await;
6212
6213 let result =
6215 PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
6216 .await;
6217
6218 assert!(
6220 result.is_ok(),
6221 "Expected successful plan creation with empty result, but got error: {:?}",
6222 result.err()
6223 );
6224
6225 let plan = result.unwrap();
6227 match plan {
6228 LogicalPlan::EmptyRelation(_) => {
6229 }
6231 _ => panic!("Expected EmptyRelation, but got: {:?}", plan),
6232 }
6233 }
6234}