frontend/instance/
promql.rs1use std::time::SystemTime;
16
17use catalog::information_schema::TABLES;
18use client::OutputData;
19use common_catalog::consts::INFORMATION_SCHEMA_NAME;
20use common_catalog::format_full_table_name;
21use common_recordbatch::util;
22use common_telemetry::tracing;
23use promql_parser::label::{MatchOp, Matcher, Matchers};
24use query::promql;
25use query::promql::planner::PromPlanner;
26use servers::prom_store::{DATABASE_LABEL, SCHEMA_LABEL};
27use servers::prometheus;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30
31use crate::error::{
32 CatalogSnafu, CollectRecordbatchSnafu, ExecLogicalPlanSnafu,
33 PrometheusLabelValuesQueryPlanSnafu, PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu,
34 Result, TableNotFoundSnafu,
35};
36use crate::instance::Instance;
37
38impl Instance {
39 #[tracing::instrument(skip_all)]
41 pub(crate) async fn handle_query_metric_names(
42 &self,
43 matchers: Vec<Matcher>,
44 ctx: &QueryContextRef,
45 ) -> Result<Vec<String>> {
46 let _timer = crate::metrics::PROMQL_QUERY_METRICS_ELAPSED
47 .with_label_values(&[ctx.get_db_string().as_str()])
48 .start_timer();
49
50 let table = self
51 .catalog_manager
52 .table(
53 ctx.current_catalog(),
54 INFORMATION_SCHEMA_NAME,
55 TABLES,
56 Some(ctx),
57 )
58 .await
59 .context(CatalogSnafu)?
60 .with_context(|| TableNotFoundSnafu {
61 table_name: "greptime.information_schema.tables",
62 })?;
63
64 let dataframe = self
65 .query_engine
66 .read_table(table)
67 .with_context(|_| ReadTableSnafu {
68 table_name: "greptime.information_schema.tables",
69 })?;
70
71 let logical_plan = prometheus::metric_name_matchers_to_plan(dataframe, matchers, ctx)
72 .context(PrometheusMetricNamesQueryPlanSnafu)?;
73
74 let results = self
75 .query_engine
76 .execute(logical_plan, ctx.clone())
77 .await
78 .context(ExecLogicalPlanSnafu)?;
79
80 let batches = match results.data {
81 OutputData::Stream(stream) => util::collect(stream)
82 .await
83 .context(CollectRecordbatchSnafu)?,
84 OutputData::RecordBatches(rbs) => rbs.take(),
85 _ => unreachable!("should not happen"),
86 };
87
88 let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
89
90 for batch in batches {
91 batch
93 .iter_column_as_string(0)
94 .flatten()
95 .for_each(|x| results.push(x))
96 }
97
98 Ok(results)
99 }
100
101 #[tracing::instrument(skip_all)]
103 pub(crate) async fn handle_query_label_values(
104 &self,
105 metric: String,
106 label_name: String,
107 matchers: Vec<Matcher>,
108 start: SystemTime,
109 end: SystemTime,
110 ctx: &QueryContextRef,
111 ) -> Result<Vec<String>> {
112 let table_schema = matchers
113 .iter()
114 .find_map(|m| {
115 if (m.name == SCHEMA_LABEL || m.name == DATABASE_LABEL) && m.op == MatchOp::Equal {
116 Some(m.value.clone())
117 } else {
118 None
119 }
120 })
121 .unwrap_or_else(|| ctx.current_schema());
122
123 let table = self
124 .catalog_manager
125 .table(ctx.current_catalog(), &table_schema, &metric, Some(ctx))
126 .await
127 .context(CatalogSnafu)?
128 .with_context(|| TableNotFoundSnafu {
129 table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
130 })?;
131
132 let dataframe = self
133 .query_engine
134 .read_table(table.clone())
135 .with_context(|_| ReadTableSnafu {
136 table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
137 })?;
138
139 let scan_plan = dataframe.into_unoptimized_plan();
140 let filter_conditions =
141 PromPlanner::matchers_to_expr(Matchers::new(matchers), scan_plan.schema())
142 .context(PrometheusLabelValuesQueryPlanSnafu)?;
143 let logical_plan = promql::label_values::rewrite_label_values_query(
144 table,
145 scan_plan,
146 filter_conditions,
147 label_name,
148 start,
149 end,
150 )
151 .context(PrometheusLabelValuesQueryPlanSnafu)?;
152
153 let results = self
154 .query_engine
155 .execute(logical_plan, ctx.clone())
156 .await
157 .context(ExecLogicalPlanSnafu)?;
158
159 let batches = match results.data {
160 OutputData::Stream(stream) => util::collect(stream)
161 .await
162 .context(CollectRecordbatchSnafu)?,
163 OutputData::RecordBatches(rbs) => rbs.take(),
164 _ => unreachable!("should not happen"),
165 };
166
167 let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
168 for batch in batches {
169 batch
171 .iter_column_as_string(0)
172 .flatten()
173 .for_each(|x| results.push(x))
174 }
175
176 Ok(results)
177 }
178}