frontend/instance/
promql.rs1use std::time::SystemTime;
16
17use catalog::information_schema::TABLES;
18use client::OutputData;
19use common_catalog::consts::INFORMATION_SCHEMA_NAME;
20use common_catalog::format_full_table_name;
21use common_recordbatch::util;
22use common_telemetry::tracing;
23use datatypes::prelude::Value;
24use promql_parser::label::{MatchOp, Matcher, Matchers};
25use query::promql;
26use query::promql::planner::PromPlanner;
27use servers::prom_store::{DATABASE_LABEL, SCHEMA_LABEL};
28use servers::prometheus;
29use session::context::QueryContextRef;
30use snafu::{OptionExt, ResultExt};
31
32use crate::error::{
33 CatalogSnafu, CollectRecordbatchSnafu, ExecLogicalPlanSnafu,
34 PrometheusLabelValuesQueryPlanSnafu, PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu,
35 Result, TableNotFoundSnafu,
36};
37use crate::instance::Instance;
38
39impl Instance {
40 #[tracing::instrument(skip_all)]
42 pub(crate) async fn handle_query_metric_names(
43 &self,
44 matchers: Vec<Matcher>,
45 ctx: &QueryContextRef,
46 ) -> Result<Vec<String>> {
47 let _timer = crate::metrics::PROMQL_QUERY_METRICS_ELAPSED
48 .with_label_values(&[ctx.get_db_string().as_str()])
49 .start_timer();
50
51 let table = self
52 .catalog_manager
53 .table(
54 ctx.current_catalog(),
55 INFORMATION_SCHEMA_NAME,
56 TABLES,
57 Some(ctx),
58 )
59 .await
60 .context(CatalogSnafu)?
61 .with_context(|| TableNotFoundSnafu {
62 table_name: "greptime.information_schema.tables",
63 })?;
64
65 let dataframe = self
66 .query_engine
67 .read_table(table)
68 .with_context(|_| ReadTableSnafu {
69 table_name: "greptime.information_schema.tables",
70 })?;
71
72 let logical_plan = prometheus::metric_name_matchers_to_plan(dataframe, matchers, ctx)
73 .context(PrometheusMetricNamesQueryPlanSnafu)?;
74
75 let results = self
76 .query_engine
77 .execute(logical_plan, ctx.clone())
78 .await
79 .context(ExecLogicalPlanSnafu)?;
80
81 let batches = match results.data {
82 OutputData::Stream(stream) => util::collect(stream)
83 .await
84 .context(CollectRecordbatchSnafu)?,
85 OutputData::RecordBatches(rbs) => rbs.take(),
86 _ => unreachable!("should not happen"),
87 };
88
89 let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
90
91 for batch in batches {
92 let names = batch.column(0);
94
95 for i in 0..names.len() {
96 let Value::String(name) = names.get(i) else {
97 unreachable!();
98 };
99
100 results.push(name.into_string());
101 }
102 }
103
104 Ok(results)
105 }
106
107 #[tracing::instrument(skip_all)]
109 pub(crate) async fn handle_query_label_values(
110 &self,
111 metric: String,
112 label_name: String,
113 matchers: Vec<Matcher>,
114 start: SystemTime,
115 end: SystemTime,
116 ctx: &QueryContextRef,
117 ) -> Result<Vec<String>> {
118 let table_schema = matchers
119 .iter()
120 .find_map(|m| {
121 if (m.name == SCHEMA_LABEL || m.name == DATABASE_LABEL) && m.op == MatchOp::Equal {
122 Some(m.value.clone())
123 } else {
124 None
125 }
126 })
127 .unwrap_or_else(|| ctx.current_schema());
128
129 let table = self
130 .catalog_manager
131 .table(ctx.current_catalog(), &table_schema, &metric, Some(ctx))
132 .await
133 .context(CatalogSnafu)?
134 .with_context(|| TableNotFoundSnafu {
135 table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
136 })?;
137
138 let dataframe = self
139 .query_engine
140 .read_table(table.clone())
141 .with_context(|_| ReadTableSnafu {
142 table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
143 })?;
144
145 let scan_plan = dataframe.into_logical_plan();
146 let filter_conditions =
147 PromPlanner::matchers_to_expr(Matchers::new(matchers), scan_plan.schema())
148 .context(PrometheusLabelValuesQueryPlanSnafu)?;
149 let logical_plan = promql::label_values::rewrite_label_values_query(
150 table,
151 scan_plan,
152 filter_conditions,
153 label_name,
154 start,
155 end,
156 )
157 .context(PrometheusLabelValuesQueryPlanSnafu)?;
158
159 let results = self
160 .query_engine
161 .execute(logical_plan, ctx.clone())
162 .await
163 .context(ExecLogicalPlanSnafu)?;
164
165 let batches = match results.data {
166 OutputData::Stream(stream) => util::collect(stream)
167 .await
168 .context(CollectRecordbatchSnafu)?,
169 OutputData::RecordBatches(rbs) => rbs.take(),
170 _ => unreachable!("should not happen"),
171 };
172
173 let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
174 for batch in batches {
175 let names = batch.column(0);
177
178 for i in 0..names.len() {
179 results.push(names.get(i).to_string());
180 }
181 }
182
183 Ok(results)
184 }
185}