servers/
prometheus.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use catalog::system_schema::information_schema::tables::{
16    ENGINE as TABLE_ENGINE, TABLE_CATALOG, TABLE_NAME, TABLE_SCHEMA,
17};
18use common_telemetry::tracing;
19use datafusion::prelude::{col, lit, regexp_match, Expr};
20use datafusion_expr::LogicalPlan;
21use promql_parser::label::{MatchOp, Matcher};
22use query::dataframe::DataFrame;
23use session::context::QueryContextRef;
24use snafu::ResultExt;
25
26use crate::error::{self, Result};
27
28/// The maximum number of metrics at one time.
29const MAX_METRICS_NUM: usize = 1024;
30
31/// Create a DataFrame from promql `__name__` matchers.
32/// # Panics
33///  Panic when the machers contains `MatchOp::Equal`.
34#[tracing::instrument(skip_all)]
35pub fn metric_name_matchers_to_plan(
36    dataframe: DataFrame,
37    matchers: Vec<Matcher>,
38    ctx: &QueryContextRef,
39) -> Result<LogicalPlan> {
40    assert!(!matchers.is_empty());
41
42    let mut conditions = Vec::with_capacity(matchers.len() + 3);
43
44    conditions.push(col(TABLE_CATALOG).eq(lit(ctx.current_catalog())));
45    conditions.push(col(TABLE_SCHEMA).eq(lit(ctx.current_schema())));
46    // Must be metric engine
47    conditions.push(col(TABLE_ENGINE).eq(lit("metric")));
48
49    for m in matchers {
50        let value = &m.value;
51
52        match &m.op {
53            MatchOp::NotEqual => {
54                conditions.push(col(TABLE_NAME).not_eq(lit(value)));
55            }
56            // Case sensitive regexp match
57            MatchOp::Re(regex) => {
58                conditions.push(
59                    regexp_match(col(TABLE_NAME), lit(regex.to_string()), None).is_not_null(),
60                );
61            }
62            // Case sensitive regexp not match
63            MatchOp::NotRe(regex) => {
64                conditions
65                    .push(regexp_match(col(TABLE_NAME), lit(regex.to_string()), None).is_null());
66            }
67            _ => unreachable!("checked outside"),
68        }
69    }
70
71    // Safety: conditions MUST not be empty, reduce always return Some(expr).
72    let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
73
74    let DataFrame::DataFusion(dataframe) = dataframe;
75    let dataframe = dataframe
76        .filter(conditions)
77        .context(error::DataFrameSnafu)?
78        .select(vec![col(TABLE_NAME)])
79        .context(error::DataFrameSnafu)?
80        .limit(0, Some(MAX_METRICS_NUM))
81        .context(error::DataFrameSnafu)?;
82
83    Ok(dataframe.into_parts().1)
84}