1use catalog::system_schema::information_schema::tables::{
16 ENGINE as TABLE_ENGINE, TABLE_CATALOG, TABLE_NAME, TABLE_SCHEMA,
17};
18use common_telemetry::tracing;
19use datafusion::prelude::{col, lit, regexp_match, Expr};
20use datafusion_expr::LogicalPlan;
21use promql_parser::label::{MatchOp, Matcher};
22use query::dataframe::DataFrame;
23use session::context::QueryContextRef;
24use snafu::ResultExt;
25
26use crate::error::{self, Result};
27
28const MAX_METRICS_NUM: usize = 1024;
30
31#[tracing::instrument(skip_all)]
35pub fn metric_name_matchers_to_plan(
36 dataframe: DataFrame,
37 matchers: Vec<Matcher>,
38 ctx: &QueryContextRef,
39) -> Result<LogicalPlan> {
40 assert!(!matchers.is_empty());
41
42 let mut conditions = Vec::with_capacity(matchers.len() + 3);
43
44 conditions.push(col(TABLE_CATALOG).eq(lit(ctx.current_catalog())));
45 conditions.push(col(TABLE_SCHEMA).eq(lit(ctx.current_schema())));
46 conditions.push(col(TABLE_ENGINE).eq(lit("metric")));
48
49 for m in matchers {
50 let value = &m.value;
51
52 match &m.op {
53 MatchOp::NotEqual => {
54 conditions.push(col(TABLE_NAME).not_eq(lit(value)));
55 }
56 MatchOp::Re(regex) => {
58 conditions.push(
59 regexp_match(col(TABLE_NAME), lit(regex.to_string()), None).is_not_null(),
60 );
61 }
62 MatchOp::NotRe(regex) => {
64 conditions
65 .push(regexp_match(col(TABLE_NAME), lit(regex.to_string()), None).is_null());
66 }
67 _ => unreachable!("checked outside"),
68 }
69 }
70
71 let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
73
74 let DataFrame::DataFusion(dataframe) = dataframe;
75 let dataframe = dataframe
76 .filter(conditions)
77 .context(error::DataFrameSnafu)?
78 .select(vec![col(TABLE_NAME)])
79 .context(error::DataFrameSnafu)?
80 .limit(0, Some(MAX_METRICS_NUM))
81 .context(error::DataFrameSnafu)?;
82
83 Ok(dataframe.into_parts().1)
84}