query/promql/
label_values.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::{SystemTime, UNIX_EPOCH};
16
17use common_time::timestamp::TimeUnit;
18use common_time::Timestamp;
19use datafusion_common::{Column, ScalarValue};
20use datafusion_expr::utils::conjunction;
21use datafusion_expr::{col, Expr, LogicalPlan, LogicalPlanBuilder};
22use snafu::{OptionExt, ResultExt};
23use table::TableRef;
24
25use crate::promql::error::{
26    DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, TimestampOutOfRangeSnafu,
27};
28
29fn build_time_filter(time_index_expr: Expr, start: Timestamp, end: Timestamp) -> Expr {
30    time_index_expr
31        .clone()
32        .gt_eq(Expr::Literal(timestamp_to_scalar_value(start)))
33        .and(time_index_expr.lt_eq(Expr::Literal(timestamp_to_scalar_value(end))))
34}
35
36fn timestamp_to_scalar_value(timestamp: Timestamp) -> ScalarValue {
37    let value = timestamp.value();
38    match timestamp.unit() {
39        TimeUnit::Second => ScalarValue::TimestampSecond(Some(value), None),
40        TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(value), None),
41        TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(value), None),
42        TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(value), None),
43    }
44}
45
46/// Rewrite label values query to DataFusion logical plan.
47pub fn rewrite_label_values_query(
48    table: TableRef,
49    scan_plan: LogicalPlan,
50    mut conditions: Vec<Expr>,
51    label_name: String,
52    start: SystemTime,
53    end: SystemTime,
54) -> Result<LogicalPlan> {
55    let schema = table.schema();
56    let ts_column = schema
57        .timestamp_column()
58        .with_context(|| TimeIndexNotFoundSnafu {
59            table: table.table_info().full_table_name(),
60        })?;
61    let unit = ts_column
62        .data_type
63        .as_timestamp()
64        .map(|data_type| data_type.unit())
65        .with_context(|| TimeIndexNotFoundSnafu {
66            table: table.table_info().full_table_name(),
67        })?;
68
69    // We only support millisecond precision at most.
70    let start =
71        Timestamp::new_millisecond(start.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
72    let start = start.convert_to(unit).context(TimestampOutOfRangeSnafu {
73        timestamp: start.value(),
74        unit,
75    })?;
76    let end =
77        Timestamp::new_millisecond(end.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64);
78    let end = end.convert_to(unit).context(TimestampOutOfRangeSnafu {
79        timestamp: end.value(),
80        unit,
81    })?;
82    let time_index_expr = col(Column::from_name(ts_column.name.clone()));
83
84    conditions.push(build_time_filter(time_index_expr, start, end));
85    // Safety: `conditions` is not empty.
86    let filter = conjunction(conditions).unwrap();
87
88    // Builds time filter
89    let logical_plan = LogicalPlanBuilder::from(scan_plan)
90        .filter(filter)
91        .context(DataFusionPlanningSnafu)?
92        .project(vec![col(Column::from_name(label_name))])
93        .context(DataFusionPlanningSnafu)?
94        .distinct()
95        .context(DataFusionPlanningSnafu)?
96        .build()
97        .context(DataFusionPlanningSnafu)?;
98
99    Ok(logical_plan)
100}