frontend/instance/
promql.rs1use std::time::SystemTime;
16
17use catalog::information_schema::TABLES;
18use client::OutputData;
19use common_catalog::consts::INFORMATION_SCHEMA_NAME;
20use common_catalog::format_full_table_name;
21use common_recordbatch::util;
22use common_telemetry::tracing;
23use promql_parser::label::{MatchOp, Matcher, Matchers};
24use query::promql;
25use query::promql::planner::PromPlanner;
26use servers::prom_store::is_database_selection_label;
27use servers::prometheus;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30
31use crate::error::{
32 CatalogSnafu, CollectRecordbatchSnafu, ExecLogicalPlanSnafu,
33 PrometheusLabelValuesQueryPlanSnafu, PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu,
34 Result, TableNotFoundSnafu, TableSnafu,
35};
36use crate::instance::Instance;
37
38impl Instance {
39 #[tracing::instrument(skip_all)]
41 pub(crate) async fn handle_query_metric_names(
42 &self,
43 matchers: Vec<Matcher>,
44 ctx: &QueryContextRef,
45 ) -> Result<Vec<String>> {
46 let _timer = crate::metrics::PROMQL_QUERY_METRICS_ELAPSED
47 .with_label_values(&[ctx.get_db_string().as_str()])
48 .start_timer();
49
50 let table = self
51 .catalog_manager
52 .table(
53 ctx.current_catalog(),
54 INFORMATION_SCHEMA_NAME,
55 TABLES,
56 Some(ctx),
57 )
58 .await
59 .context(CatalogSnafu)?
60 .with_context(|| TableNotFoundSnafu {
61 table_name: "greptime.information_schema.tables",
62 })?;
63
64 let dataframe = self
65 .query_engine
66 .read_table(table)
67 .with_context(|_| ReadTableSnafu {
68 table_name: "greptime.information_schema.tables",
69 })?;
70
71 let logical_plan = prometheus::metric_name_matchers_to_plan(dataframe, matchers, ctx)
72 .context(PrometheusMetricNamesQueryPlanSnafu)?;
73
74 let results = self
75 .query_engine
76 .execute(logical_plan, ctx.clone())
77 .await
78 .context(ExecLogicalPlanSnafu)?;
79
80 let batches = match results.data {
81 OutputData::Stream(stream) => util::collect(stream)
82 .await
83 .context(CollectRecordbatchSnafu)?,
84 OutputData::RecordBatches(rbs) => rbs.take(),
85 _ => unreachable!("should not happen"),
86 };
87
88 let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
89
90 for batch in batches {
91 batch
93 .iter_column_as_string(0)
94 .flatten()
95 .for_each(|x| results.push(x))
96 }
97
98 Ok(results)
99 }
100
101 #[tracing::instrument(skip_all)]
103 pub(crate) async fn handle_query_label_values(
104 &self,
105 metric: String,
106 label_name: String,
107 matchers: Vec<Matcher>,
108 start: SystemTime,
109 end: SystemTime,
110 ctx: &QueryContextRef,
111 ) -> Result<Vec<String>> {
112 let table_schema = matchers
113 .iter()
114 .find_map(|m| {
115 if is_database_selection_label(&m.name) && m.op == MatchOp::Equal {
116 Some(m.value.clone())
117 } else {
118 None
119 }
120 })
121 .unwrap_or_else(|| ctx.current_schema());
122
123 let full_table_name = format_full_table_name(ctx.current_catalog(), &table_schema, &metric);
124 let table = self
125 .catalog_manager
126 .table(ctx.current_catalog(), &table_schema, &metric, Some(ctx))
127 .await
128 .context(CatalogSnafu)?
129 .with_context(|| TableNotFoundSnafu {
130 table_name: full_table_name.clone(),
131 })?;
132
133 if table.schema().column_schema_by_name(&label_name).is_none() {
136 return table::error::ColumnNotExistsSnafu {
137 column_name: label_name,
138 table_name: full_table_name,
139 }
140 .fail()
141 .context(TableSnafu);
142 }
143
144 let dataframe = self
145 .query_engine
146 .read_table(table.clone())
147 .with_context(|_| ReadTableSnafu {
148 table_name: full_table_name,
149 })?;
150
151 let scan_plan = dataframe.into_unoptimized_plan();
152 let filter_conditions =
153 PromPlanner::matchers_to_expr(Matchers::new(matchers), scan_plan.schema())
154 .context(PrometheusLabelValuesQueryPlanSnafu)?;
155 let logical_plan = promql::label_values::rewrite_label_values_query(
156 table,
157 scan_plan,
158 filter_conditions,
159 label_name,
160 start,
161 end,
162 )
163 .context(PrometheusLabelValuesQueryPlanSnafu)?;
164
165 let results = self
166 .query_engine
167 .execute(logical_plan, ctx.clone())
168 .await
169 .context(ExecLogicalPlanSnafu)?;
170
171 let batches = match results.data {
172 OutputData::Stream(stream) => util::collect(stream)
173 .await
174 .context(CollectRecordbatchSnafu)?,
175 OutputData::RecordBatches(rbs) => rbs.take(),
176 _ => unreachable!("should not happen"),
177 };
178
179 let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
180 for batch in batches {
181 batch
183 .iter_column_as_string(0)
184 .flatten()
185 .for_each(|x| results.push(x))
186 }
187
188 Ok(results)
189 }
190}