use catalog::system_schema::information_schema::tables::{
ENGINE as TABLE_ENGINE, TABLE_CATALOG, TABLE_NAME, TABLE_SCHEMA,
};
use common_telemetry::tracing;
use datafusion::prelude::{col, lit, regexp_match, Expr};
use datafusion_expr::LogicalPlan;
use promql_parser::label::{MatchOp, Matcher};
use query::dataframe::DataFrame;
use session::context::QueryContextRef;
use snafu::ResultExt;
use crate::error::{self, Result};
const MAX_METRICS_NUM: usize = 1024;
#[tracing::instrument(skip_all)]
pub fn metric_name_matchers_to_plan(
dataframe: DataFrame,
matchers: Vec<Matcher>,
ctx: &QueryContextRef,
) -> Result<LogicalPlan> {
assert!(!matchers.is_empty());
let mut conditions = Vec::with_capacity(matchers.len() + 3);
conditions.push(col(TABLE_CATALOG).eq(lit(ctx.current_catalog())));
conditions.push(col(TABLE_SCHEMA).eq(lit(ctx.current_schema())));
conditions.push(col(TABLE_ENGINE).eq(lit("metric")));
for m in matchers {
let value = &m.value;
match &m.op {
MatchOp::NotEqual => {
conditions.push(col(TABLE_NAME).not_eq(lit(value)));
}
MatchOp::Re(regex) => {
conditions.push(
regexp_match(col(TABLE_NAME), lit(regex.to_string()), None).is_not_null(),
);
}
MatchOp::NotRe(regex) => {
conditions
.push(regexp_match(col(TABLE_NAME), lit(regex.to_string()), None).is_null());
}
_ => unreachable!("checked outside"),
}
}
let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
let DataFrame::DataFusion(dataframe) = dataframe;
let dataframe = dataframe
.filter(conditions)
.context(error::DataFrameSnafu)?
.select(vec![col(TABLE_NAME)])
.context(error::DataFrameSnafu)?
.limit(0, Some(MAX_METRICS_NUM))
.context(error::DataFrameSnafu)?;
Ok(dataframe.into_parts().1)
}