query/promql/
label_values.rs1use std::time::{SystemTime, UNIX_EPOCH};
16
17use common_time::timestamp::TimeUnit;
18use common_time::Timestamp;
19use datafusion_common::{Column, ScalarValue};
20use datafusion_expr::utils::conjunction;
21use datafusion_expr::{col, Expr, LogicalPlan, LogicalPlanBuilder};
22use snafu::{OptionExt, ResultExt};
23use table::TableRef;
24
25use crate::promql::error::{
26 DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, TimestampOutOfRangeSnafu,
27};
28
29fn build_time_filter(time_index_expr: Expr, start: Timestamp, end: Timestamp) -> Expr {
30 time_index_expr
31 .clone()
32 .gt_eq(Expr::Literal(timestamp_to_scalar_value(start)))
33 .and(time_index_expr.lt_eq(Expr::Literal(timestamp_to_scalar_value(end))))
34}
35
36fn timestamp_to_scalar_value(timestamp: Timestamp) -> ScalarValue {
37 let value = timestamp.value();
38 match timestamp.unit() {
39 TimeUnit::Second => ScalarValue::TimestampSecond(Some(value), None),
40 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(value), None),
41 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(value), None),
42 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(value), None),
43 }
44}
45
46pub fn rewrite_label_values_query(
48 table: TableRef,
49 scan_plan: LogicalPlan,
50 mut conditions: Vec<Expr>,
51 label_name: String,
52 start: SystemTime,
53 end: SystemTime,
54) -> Result<LogicalPlan> {
55 let schema = table.schema();
56 let ts_column = schema
57 .timestamp_column()
58 .with_context(|| TimeIndexNotFoundSnafu {
59 table: table.table_info().full_table_name(),
60 })?;
61 let unit = ts_column
62 .data_type
63 .as_timestamp()
64 .map(|data_type| data_type.unit())
65 .with_context(|| TimeIndexNotFoundSnafu {
66 table: table.table_info().full_table_name(),
67 })?;
68
69 let start =
71 Timestamp::new_millisecond(start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
72 let start = start.convert_to(unit).context(TimestampOutOfRangeSnafu {
73 timestamp: start.value(),
74 unit,
75 })?;
76 let end =
77 Timestamp::new_millisecond(end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
78 let end = end.convert_to(unit).context(TimestampOutOfRangeSnafu {
79 timestamp: end.value(),
80 unit,
81 })?;
82 let time_index_expr = col(Column::from_name(ts_column.name.clone()));
83
84 conditions.push(build_time_filter(time_index_expr, start, end));
85 let filter = conjunction(conditions).unwrap();
87
88 let logical_plan = LogicalPlanBuilder::from(scan_plan)
90 .filter(filter)
91 .context(DataFusionPlanningSnafu)?
92 .project(vec![col(Column::from_name(label_name))])
93 .context(DataFusionPlanningSnafu)?
94 .distinct()
95 .context(DataFusionPlanningSnafu)?
96 .build()
97 .context(DataFusionPlanningSnafu)?;
98
99 Ok(logical_plan)
100}