frontend/instance/
promql.rs1use std::time::SystemTime;
16
17use catalog::information_schema::TABLES;
18use client::OutputData;
19use common_catalog::consts::INFORMATION_SCHEMA_NAME;
20use common_catalog::format_full_table_name;
21use common_recordbatch::util;
22use common_telemetry::tracing;
23use datatypes::prelude::Value;
24use promql_parser::label::{Matcher, Matchers};
25use query::promql;
26use query::promql::planner::PromPlanner;
27use servers::prometheus;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30
31use crate::error::{
32 CatalogSnafu, CollectRecordbatchSnafu, ExecLogicalPlanSnafu,
33 PrometheusLabelValuesQueryPlanSnafu, PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu,
34 Result, TableNotFoundSnafu,
35};
36use crate::instance::Instance;
37
38impl Instance {
39 #[tracing::instrument(skip_all)]
41 pub(crate) async fn handle_query_metric_names(
42 &self,
43 matchers: Vec<Matcher>,
44 ctx: &QueryContextRef,
45 ) -> Result<Vec<String>> {
46 let _timer = crate::metrics::PROMQL_QUERY_METRICS_ELAPSED
47 .with_label_values(&[ctx.get_db_string().as_str()])
48 .start_timer();
49
50 let table = self
51 .catalog_manager
52 .table(
53 ctx.current_catalog(),
54 INFORMATION_SCHEMA_NAME,
55 TABLES,
56 Some(ctx),
57 )
58 .await
59 .context(CatalogSnafu)?
60 .with_context(|| TableNotFoundSnafu {
61 table_name: "greptime.information_schema.tables",
62 })?;
63
64 let dataframe = self
65 .query_engine
66 .read_table(table)
67 .with_context(|_| ReadTableSnafu {
68 table_name: "greptime.information_schema.tables",
69 })?;
70
71 let logical_plan = prometheus::metric_name_matchers_to_plan(dataframe, matchers, ctx)
72 .context(PrometheusMetricNamesQueryPlanSnafu)?;
73
74 let results = self
75 .query_engine
76 .execute(logical_plan, ctx.clone())
77 .await
78 .context(ExecLogicalPlanSnafu)?;
79
80 let batches = match results.data {
81 OutputData::Stream(stream) => util::collect(stream)
82 .await
83 .context(CollectRecordbatchSnafu)?,
84 OutputData::RecordBatches(rbs) => rbs.take(),
85 _ => unreachable!("should not happen"),
86 };
87
88 let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
89
90 for batch in batches {
91 let names = batch.column(0);
93
94 for i in 0..names.len() {
95 let Value::String(name) = names.get(i) else {
96 unreachable!();
97 };
98
99 results.push(name.into_string());
100 }
101 }
102
103 Ok(results)
104 }
105
106 #[tracing::instrument(skip_all)]
108 pub(crate) async fn handle_query_label_values(
109 &self,
110 metric: String,
111 label_name: String,
112 matchers: Vec<Matcher>,
113 start: SystemTime,
114 end: SystemTime,
115 ctx: &QueryContextRef,
116 ) -> Result<Vec<String>> {
117 let table_schema = ctx.current_schema();
118 let table = self
119 .catalog_manager
120 .table(ctx.current_catalog(), &table_schema, &metric, Some(ctx))
121 .await
122 .context(CatalogSnafu)?
123 .with_context(|| TableNotFoundSnafu {
124 table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
125 })?;
126
127 let dataframe = self
128 .query_engine
129 .read_table(table.clone())
130 .with_context(|_| ReadTableSnafu {
131 table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
132 })?;
133
134 let scan_plan = dataframe.into_logical_plan();
135 let filter_conditions =
136 PromPlanner::matchers_to_expr(Matchers::new(matchers), scan_plan.schema())
137 .context(PrometheusLabelValuesQueryPlanSnafu)?;
138 let logical_plan = promql::label_values::rewrite_label_values_query(
139 table,
140 scan_plan,
141 filter_conditions,
142 label_name,
143 start,
144 end,
145 )
146 .context(PrometheusLabelValuesQueryPlanSnafu)?;
147
148 let results = self
149 .query_engine
150 .execute(logical_plan, ctx.clone())
151 .await
152 .context(ExecLogicalPlanSnafu)?;
153
154 let batches = match results.data {
155 OutputData::Stream(stream) => util::collect(stream)
156 .await
157 .context(CollectRecordbatchSnafu)?,
158 OutputData::RecordBatches(rbs) => rbs.take(),
159 _ => unreachable!("should not happen"),
160 };
161
162 let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
163 for batch in batches {
164 let names = batch.column(0);
166
167 for i in 0..names.len() {
168 results.push(names.get(i).to_string());
169 }
170 }
171
172 Ok(results)
173 }
174}