1use std::collections::{BTreeMap, HashMap, HashSet};
17use std::sync::Arc;
18
19use axum::extract::{Path, Query, State};
20use axum::{Extension, Form};
21use catalog::CatalogManagerRef;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_query::{Output, OutputData};
26use common_recordbatch::RecordBatches;
27use common_telemetry::{debug, tracing};
28use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
29use common_version::OwnedBuildInfo;
30use datatypes::prelude::ConcreteDataType;
31use datatypes::scalars::ScalarVector;
32use datatypes::vectors::Float64Vector;
33use futures::future::join_all;
34use futures::StreamExt;
35use itertools::Itertools;
36use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
37use promql_parser::parser::value::ValueType;
38use promql_parser::parser::{
39 AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
40 UnaryExpr, VectorSelector,
41};
42use query::parser::{PromQuery, QueryLanguageParser, DEFAULT_LOOKBACK_STRING};
43use query::promql::planner::normalize_matcher;
44use serde::de::{self, MapAccess, Visitor};
45use serde::{Deserialize, Serialize};
46use serde_json::Value;
47use session::context::{QueryContext, QueryContextRef};
48use snafu::{Location, OptionExt, ResultExt};
49use store_api::metric_engine_consts::{
50 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY,
51};
52
53pub use super::result::prometheus_resp::PrometheusJsonResponse;
54use crate::error::{
55 CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result,
56 TableNotFoundSnafu, UnexpectedResultSnafu,
57};
58use crate::http::header::collect_plan_metrics;
59use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL};
60use crate::prometheus_handler::PrometheusHandlerRef;
61
62#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
64pub struct PromSeriesVector {
65 pub metric: BTreeMap<String, String>,
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub value: Option<(f64, String)>,
68}
69
70#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
72pub struct PromSeriesMatrix {
73 pub metric: BTreeMap<String, String>,
74 pub values: Vec<(f64, String)>,
75}
76
77#[derive(Debug, Serialize, Deserialize, PartialEq)]
79#[serde(untagged)]
80pub enum PromQueryResult {
81 Matrix(Vec<PromSeriesMatrix>),
82 Vector(Vec<PromSeriesVector>),
83 Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
84 String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
85}
86
87impl Default for PromQueryResult {
88 fn default() -> Self {
89 PromQueryResult::Matrix(Default::default())
90 }
91}
92
93#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
94pub struct PromData {
95 #[serde(rename = "resultType")]
96 pub result_type: String,
97 pub result: PromQueryResult,
98}
99
100#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
101#[serde(untagged)]
102pub enum PrometheusResponse {
103 PromData(PromData),
104 Labels(Vec<String>),
105 Series(Vec<HashMap<String, String>>),
106 LabelValues(Vec<String>),
107 FormatQuery(String),
108 BuildInfo(OwnedBuildInfo),
109 #[serde(skip_deserializing)]
110 ParseResult(promql_parser::parser::Expr),
111 #[default]
112 None,
113}
114
115impl PrometheusResponse {
116 fn append(&mut self, other: PrometheusResponse) {
120 match (self, other) {
121 (
122 PrometheusResponse::PromData(PromData {
123 result: PromQueryResult::Matrix(lhs),
124 ..
125 }),
126 PrometheusResponse::PromData(PromData {
127 result: PromQueryResult::Matrix(rhs),
128 ..
129 }),
130 ) => {
131 lhs.extend(rhs);
132 }
133
134 (
135 PrometheusResponse::PromData(PromData {
136 result: PromQueryResult::Vector(lhs),
137 ..
138 }),
139 PrometheusResponse::PromData(PromData {
140 result: PromQueryResult::Vector(rhs),
141 ..
142 }),
143 ) => {
144 lhs.extend(rhs);
145 }
146 _ => {
147 }
149 }
150 }
151
152 pub fn is_none(&self) -> bool {
153 matches!(self, PrometheusResponse::None)
154 }
155}
156
157#[derive(Debug, Default, Serialize, Deserialize)]
158pub struct FormatQuery {
159 query: Option<String>,
160}
161
162#[axum_macros::debug_handler]
163#[tracing::instrument(
164 skip_all,
165 fields(protocol = "prometheus", request_type = "format_query")
166)]
167pub async fn format_query(
168 State(_handler): State<PrometheusHandlerRef>,
169 Query(params): Query<InstantQuery>,
170 Extension(_query_ctx): Extension<QueryContext>,
171 Form(form_params): Form<InstantQuery>,
172) -> PrometheusJsonResponse {
173 let query = params.query.or(form_params.query).unwrap_or_default();
174 match promql_parser::parser::parse(&query) {
175 Ok(expr) => {
176 let pretty = expr.prettify();
177 PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
178 }
179 Err(reason) => {
180 let err = InvalidQuerySnafu { reason }.build();
181 PrometheusJsonResponse::error(err.status_code(), err.output_msg())
182 }
183 }
184}
185
186#[derive(Debug, Default, Serialize, Deserialize)]
187pub struct BuildInfoQuery {}
188
189#[axum_macros::debug_handler]
190#[tracing::instrument(
191 skip_all,
192 fields(protocol = "prometheus", request_type = "build_info_query")
193)]
194pub async fn build_info_query() -> PrometheusJsonResponse {
195 let build_info = common_version::build_info().clone();
196 PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
197}
198
199#[derive(Debug, Default, Serialize, Deserialize)]
200pub struct InstantQuery {
201 query: Option<String>,
202 lookback: Option<String>,
203 time: Option<String>,
204 timeout: Option<String>,
205 db: Option<String>,
206}
207
208macro_rules! try_call_return_response {
211 ($handle: expr) => {
212 match $handle {
213 Ok(res) => res,
214 Err(err) => {
215 let msg = err.to_string();
216 return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
217 }
218 }
219 };
220}
221
222#[axum_macros::debug_handler]
223#[tracing::instrument(
224 skip_all,
225 fields(protocol = "prometheus", request_type = "instant_query")
226)]
227pub async fn instant_query(
228 State(handler): State<PrometheusHandlerRef>,
229 Query(params): Query<InstantQuery>,
230 Extension(mut query_ctx): Extension<QueryContext>,
231 Form(form_params): Form<InstantQuery>,
232) -> PrometheusJsonResponse {
233 let time = params
235 .time
236 .or(form_params.time)
237 .unwrap_or_else(current_time_rfc3339);
238 let prom_query = PromQuery {
239 query: params.query.or(form_params.query).unwrap_or_default(),
240 start: time.clone(),
241 end: time,
242 step: "1s".to_string(),
243 lookback: params
244 .lookback
245 .or(form_params.lookback)
246 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
247 };
248
249 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
250
251 if let Some(db) = ¶ms.db {
253 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
254 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
255 }
256 let query_ctx = Arc::new(query_ctx);
257
258 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
259 .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
260 .start_timer();
261
262 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
263 && !name_matchers.is_empty()
264 {
265 debug!("Find metric name matchers: {:?}", name_matchers);
266
267 let metric_names =
268 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
269
270 debug!("Find metric names: {:?}", metric_names);
271
272 if metric_names.is_empty() {
273 let result_type = promql_expr.value_type();
274
275 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
276 result_type: result_type.to_string(),
277 ..Default::default()
278 }));
279 }
280
281 let responses = join_all(metric_names.into_iter().map(|metric| {
282 let mut prom_query = prom_query.clone();
283 let mut promql_expr = promql_expr.clone();
284 let query_ctx = query_ctx.clone();
285 let handler = handler.clone();
286
287 async move {
288 update_metric_name_matcher(&mut promql_expr, &metric);
289 let new_query = promql_expr.to_string();
290 debug!(
291 "Updated promql, before: {}, after: {}",
292 &prom_query.query, new_query
293 );
294 prom_query.query = new_query;
295
296 do_instant_query(&handler, &prom_query, query_ctx).await
297 }
298 }))
299 .await;
300
301 responses
302 .into_iter()
303 .reduce(|mut acc, resp| {
304 acc.data.append(resp.data);
305 acc
306 })
307 .unwrap()
308 } else {
309 do_instant_query(&handler, &prom_query, query_ctx).await
310 }
311}
312
313async fn do_instant_query(
315 handler: &PrometheusHandlerRef,
316 prom_query: &PromQuery,
317 query_ctx: QueryContextRef,
318) -> PrometheusJsonResponse {
319 let result = handler.do_query(prom_query, query_ctx).await;
320 let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
321 Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
322 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
323 };
324 PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
325}
326
327#[derive(Debug, Default, Serialize, Deserialize)]
328pub struct RangeQuery {
329 query: Option<String>,
330 start: Option<String>,
331 end: Option<String>,
332 step: Option<String>,
333 lookback: Option<String>,
334 timeout: Option<String>,
335 db: Option<String>,
336}
337
338#[axum_macros::debug_handler]
339#[tracing::instrument(
340 skip_all,
341 fields(protocol = "prometheus", request_type = "range_query")
342)]
343pub async fn range_query(
344 State(handler): State<PrometheusHandlerRef>,
345 Query(params): Query<RangeQuery>,
346 Extension(mut query_ctx): Extension<QueryContext>,
347 Form(form_params): Form<RangeQuery>,
348) -> PrometheusJsonResponse {
349 let prom_query = PromQuery {
350 query: params.query.or(form_params.query).unwrap_or_default(),
351 start: params.start.or(form_params.start).unwrap_or_default(),
352 end: params.end.or(form_params.end).unwrap_or_default(),
353 step: params.step.or(form_params.step).unwrap_or_default(),
354 lookback: params
355 .lookback
356 .or(form_params.lookback)
357 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
358 };
359
360 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
361
362 if let Some(db) = ¶ms.db {
364 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
365 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
366 }
367 let query_ctx = Arc::new(query_ctx);
368 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
369 .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
370 .start_timer();
371
372 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
373 && !name_matchers.is_empty()
374 {
375 debug!("Find metric name matchers: {:?}", name_matchers);
376
377 let metric_names =
378 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
379
380 debug!("Find metric names: {:?}", metric_names);
381
382 if metric_names.is_empty() {
383 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
384 result_type: ValueType::Matrix.to_string(),
385 ..Default::default()
386 }));
387 }
388
389 let responses = join_all(metric_names.into_iter().map(|metric| {
390 let mut prom_query = prom_query.clone();
391 let mut promql_expr = promql_expr.clone();
392 let query_ctx = query_ctx.clone();
393 let handler = handler.clone();
394
395 async move {
396 update_metric_name_matcher(&mut promql_expr, &metric);
397 let new_query = promql_expr.to_string();
398 debug!(
399 "Updated promql, before: {}, after: {}",
400 &prom_query.query, new_query
401 );
402 prom_query.query = new_query;
403
404 do_range_query(&handler, &prom_query, query_ctx).await
405 }
406 }))
407 .await;
408
409 responses
411 .into_iter()
412 .reduce(|mut acc, resp| {
413 acc.data.append(resp.data);
414 acc
415 })
416 .unwrap()
417 } else {
418 do_range_query(&handler, &prom_query, query_ctx).await
419 }
420}
421
422async fn do_range_query(
424 handler: &PrometheusHandlerRef,
425 prom_query: &PromQuery,
426 query_ctx: QueryContextRef,
427) -> PrometheusJsonResponse {
428 let result = handler.do_query(prom_query, query_ctx).await;
429 let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
430 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
431 Ok((metric_name, _)) => metric_name.unwrap_or_default(),
432 };
433 PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
434}
435
436#[derive(Debug, Default, Serialize)]
437struct Matches(Vec<String>);
438
439#[derive(Debug, Default, Serialize, Deserialize)]
440pub struct LabelsQuery {
441 start: Option<String>,
442 end: Option<String>,
443 lookback: Option<String>,
444 #[serde(flatten)]
445 matches: Matches,
446 db: Option<String>,
447}
448
449impl<'de> Deserialize<'de> for Matches {
451 fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
452 where
453 D: de::Deserializer<'de>,
454 {
455 struct MatchesVisitor;
456
457 impl<'d> Visitor<'d> for MatchesVisitor {
458 type Value = Vec<String>;
459
460 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
461 formatter.write_str("a string")
462 }
463
464 fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
465 where
466 M: MapAccess<'d>,
467 {
468 let mut matches = Vec::new();
469 while let Some((key, value)) = access.next_entry::<String, String>()? {
470 if key == "match[]" {
471 matches.push(value);
472 }
473 }
474 Ok(matches)
475 }
476 }
477 Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
478 }
479}
480
481macro_rules! handle_schema_err {
488 ($result:expr) => {
489 match $result {
490 Ok(v) => Some(v),
491 Err(err) => {
492 if err.status_code() == StatusCode::TableNotFound
493 || err.status_code() == StatusCode::TableColumnNotFound
494 {
495 None
497 } else {
498 return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
499 }
500 }
501 }
502 };
503}
504
505#[axum_macros::debug_handler]
506#[tracing::instrument(
507 skip_all,
508 fields(protocol = "prometheus", request_type = "labels_query")
509)]
510pub async fn labels_query(
511 State(handler): State<PrometheusHandlerRef>,
512 Query(params): Query<LabelsQuery>,
513 Extension(mut query_ctx): Extension<QueryContext>,
514 Form(form_params): Form<LabelsQuery>,
515) -> PrometheusJsonResponse {
516 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
517 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
518 let query_ctx = Arc::new(query_ctx);
519
520 let mut queries = params.matches.0;
521 if queries.is_empty() {
522 queries = form_params.matches.0;
523 }
524
525 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
526 .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
527 .start_timer();
528
529 let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
531 {
532 Ok(labels) => labels,
533 Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
534 };
535 let _ = labels.insert(METRIC_NAME.to_string());
537
538 if queries.is_empty() {
540 let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
541 labels_vec.sort_unstable();
542 return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
543 }
544
545 let start = params
547 .start
548 .or(form_params.start)
549 .unwrap_or_else(yesterday_rfc3339);
550 let end = params
551 .end
552 .or(form_params.end)
553 .unwrap_or_else(current_time_rfc3339);
554 let lookback = params
555 .lookback
556 .or(form_params.lookback)
557 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
558
559 let mut fetched_labels = HashSet::new();
560 let _ = fetched_labels.insert(METRIC_NAME.to_string());
561
562 let mut merge_map = HashMap::new();
563 for query in queries {
564 let prom_query = PromQuery {
565 query,
566 start: start.clone(),
567 end: end.clone(),
568 step: DEFAULT_LOOKBACK_STRING.to_string(),
569 lookback: lookback.clone(),
570 };
571
572 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
573 handle_schema_err!(
574 retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
575 .await
576 );
577 }
578
579 fetched_labels.retain(|l| labels.contains(l));
581 let _ = labels.insert(METRIC_NAME.to_string());
582
583 let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
584 sorted_labels.sort();
585 let merge_map = merge_map
586 .into_iter()
587 .map(|(k, v)| (k, Value::from(v)))
588 .collect();
589 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
590 resp.resp_metrics = merge_map;
591 resp
592}
593
594async fn get_all_column_names(
596 catalog: &str,
597 schema: &str,
598 manager: &CatalogManagerRef,
599) -> std::result::Result<HashSet<String>, catalog::error::Error> {
600 let table_names = manager.table_names(catalog, schema, None).await?;
601
602 let mut labels = HashSet::new();
603 for table_name in table_names {
604 let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
605 continue;
606 };
607 for column in table.primary_key_columns() {
608 if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
609 && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
610 {
611 labels.insert(column.name);
612 }
613 }
614 }
615
616 Ok(labels)
617}
618
619async fn retrieve_series_from_query_result(
620 result: Result<Output>,
621 series: &mut Vec<HashMap<String, String>>,
622 query_ctx: &QueryContext,
623 table_name: &str,
624 manager: &CatalogManagerRef,
625 metrics: &mut HashMap<String, u64>,
626) -> Result<()> {
627 let result = result?;
628
629 let table = manager
631 .table(
632 query_ctx.current_catalog(),
633 &query_ctx.current_schema(),
634 table_name,
635 Some(query_ctx),
636 )
637 .await
638 .context(CatalogSnafu)?
639 .with_context(|| TableNotFoundSnafu {
640 catalog: query_ctx.current_catalog(),
641 schema: query_ctx.current_schema(),
642 table: table_name,
643 })?;
644 let tag_columns = table
645 .primary_key_columns()
646 .map(|c| c.name)
647 .collect::<HashSet<_>>();
648
649 match result.data {
650 OutputData::RecordBatches(batches) => {
651 record_batches_to_series(batches, series, table_name, &tag_columns)
652 }
653 OutputData::Stream(stream) => {
654 let batches = RecordBatches::try_collect(stream)
655 .await
656 .context(CollectRecordbatchSnafu)?;
657 record_batches_to_series(batches, series, table_name, &tag_columns)
658 }
659 OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
660 reason: "expected data result, but got affected rows".to_string(),
661 location: Location::default(),
662 }),
663 }?;
664
665 if let Some(ref plan) = result.meta.plan {
666 collect_plan_metrics(plan, &mut [metrics]);
667 }
668 Ok(())
669}
670
671async fn retrieve_labels_name_from_query_result(
673 result: Result<Output>,
674 labels: &mut HashSet<String>,
675 metrics: &mut HashMap<String, u64>,
676) -> Result<()> {
677 let result = result?;
678 match result.data {
679 OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
680 OutputData::Stream(stream) => {
681 let batches = RecordBatches::try_collect(stream)
682 .await
683 .context(CollectRecordbatchSnafu)?;
684 record_batches_to_labels_name(batches, labels)
685 }
686 OutputData::AffectedRows(_) => UnexpectedResultSnafu {
687 reason: "expected data result, but got affected rows".to_string(),
688 }
689 .fail(),
690 }?;
691 if let Some(ref plan) = result.meta.plan {
692 collect_plan_metrics(plan, &mut [metrics]);
693 }
694 Ok(())
695}
696
697fn record_batches_to_series(
698 batches: RecordBatches,
699 series: &mut Vec<HashMap<String, String>>,
700 table_name: &str,
701 tag_columns: &HashSet<String>,
702) -> Result<()> {
703 for batch in batches.iter() {
704 let projection = batch
706 .schema
707 .column_schemas()
708 .iter()
709 .enumerate()
710 .filter_map(|(idx, col)| {
711 if tag_columns.contains(&col.name) {
712 Some(idx)
713 } else {
714 None
715 }
716 })
717 .collect::<Vec<_>>();
718 let batch = batch
719 .try_project(&projection)
720 .context(CollectRecordbatchSnafu)?;
721
722 for row in batch.rows() {
723 let mut element: HashMap<String, String> = row
724 .iter()
725 .enumerate()
726 .map(|(idx, column)| {
727 let column_name = batch.schema.column_name_by_index(idx);
728 (column_name.to_string(), column.to_string())
729 })
730 .collect();
731 let _ = element.insert("__name__".to_string(), table_name.to_string());
732 series.push(element);
733 }
734 }
735 Ok(())
736}
737
738fn record_batches_to_labels_name(
740 batches: RecordBatches,
741 labels: &mut HashSet<String>,
742) -> Result<()> {
743 let mut column_indices = Vec::new();
744 let mut field_column_indices = Vec::new();
745 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
746 if let ConcreteDataType::Float64(_) = column.data_type {
747 field_column_indices.push(i);
748 }
749 column_indices.push(i);
750 }
751
752 if field_column_indices.is_empty() {
753 return Err(Error::Internal {
754 err_msg: "no value column found".to_string(),
755 });
756 }
757
758 for batch in batches.iter() {
759 let names = column_indices
760 .iter()
761 .map(|c| batches.schema().column_name_by_index(*c).to_string())
762 .collect::<Vec<_>>();
763
764 let field_columns = field_column_indices
765 .iter()
766 .map(|i| {
767 batch
768 .column(*i)
769 .as_any()
770 .downcast_ref::<Float64Vector>()
771 .unwrap()
772 })
773 .collect::<Vec<_>>();
774
775 for row_index in 0..batch.num_rows() {
776 if field_columns
778 .iter()
779 .all(|c| c.get_data(row_index).is_none())
780 {
781 continue;
782 }
783
784 names.iter().for_each(|name| {
786 let _ = labels.insert(name.to_string());
787 });
788 return Ok(());
789 }
790 }
791 Ok(())
792}
793
794pub(crate) fn retrieve_metric_name_and_result_type(
795 promql: &str,
796) -> Result<(Option<String>, ValueType)> {
797 let promql_expr = promql_parser::parser::parse(promql)
798 .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
799 let metric_name = promql_expr_to_metric_name(&promql_expr);
800 let result_type = promql_expr.value_type();
801
802 Ok((metric_name, result_type))
803}
804
805pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
808 if let Some(db) = db {
809 parse_catalog_and_schema_from_db_string(db)
810 } else {
811 (
812 ctx.current_catalog().to_string(),
813 ctx.current_schema().to_string(),
814 )
815 }
816}
817
818pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
820 if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
821 ctx.set_current_catalog(catalog);
822 ctx.set_current_schema(schema);
823 }
824}
825
826fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
827 find_metric_name_and_matchers(expr, |name, matchers| {
828 name.clone().or(matchers
829 .find_matchers(METRIC_NAME)
830 .into_iter()
831 .next()
832 .map(|m| m.value))
833 })
834}
835
836fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
837where
838 F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
839{
840 match expr {
841 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
842 PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
843 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
844 find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
845 }
846 PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
847 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
848 PromqlExpr::NumberLiteral(_) => None,
849 PromqlExpr::StringLiteral(_) => None,
850 PromqlExpr::Extension(_) => None,
851 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
852 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
853 let VectorSelector { name, matchers, .. } = vs;
854
855 f(name, matchers)
856 }
857 PromqlExpr::Call(Call { args, .. }) => args
858 .args
859 .iter()
860 .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
861 }
862}
863
864fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
866 find_metric_name_and_matchers(expr, |name, matchers| {
867 if name.is_some() {
869 return None;
870 }
871
872 Some(matchers.find_matchers(METRIC_NAME))
874 })
875 .map(|matchers| {
876 matchers
877 .into_iter()
878 .filter(|m| !matches!(m.op, MatchOp::Equal))
879 .map(normalize_matcher)
880 .collect::<Vec<_>>()
881 })
882}
883
884fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
887 match expr {
888 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
889 update_metric_name_matcher(expr, metric_name)
890 }
891 PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
892 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
893 update_metric_name_matcher(lhs, metric_name);
894 update_metric_name_matcher(rhs, metric_name);
895 }
896 PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
897 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
898 update_metric_name_matcher(expr, metric_name)
899 }
900 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
901 if name.is_some() {
902 return;
903 }
904
905 for m in &mut matchers.matchers {
906 if m.name == METRIC_NAME {
907 m.op = MatchOp::Equal;
908 m.value = metric_name.to_string();
909 }
910 }
911 }
912 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
913 let VectorSelector { name, matchers, .. } = vs;
914 if name.is_some() {
915 return;
916 }
917
918 for m in &mut matchers.matchers {
919 if m.name == METRIC_NAME {
920 m.op = MatchOp::Equal;
921 m.value = metric_name.to_string();
922 }
923 }
924 }
925 PromqlExpr::Call(Call { args, .. }) => {
926 args.args.iter_mut().for_each(|e| {
927 update_metric_name_matcher(e, metric_name);
928 });
929 }
930 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
931 }
932}
933
934#[derive(Debug, Default, Serialize, Deserialize)]
935pub struct LabelValueQuery {
936 start: Option<String>,
937 end: Option<String>,
938 lookback: Option<String>,
939 #[serde(flatten)]
940 matches: Matches,
941 db: Option<String>,
942}
943
944#[axum_macros::debug_handler]
945#[tracing::instrument(
946 skip_all,
947 fields(protocol = "prometheus", request_type = "label_values_query")
948)]
949pub async fn label_values_query(
950 State(handler): State<PrometheusHandlerRef>,
951 Path(label_name): Path<String>,
952 Extension(mut query_ctx): Extension<QueryContext>,
953 Query(params): Query<LabelValueQuery>,
954) -> PrometheusJsonResponse {
955 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
956 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
957 let query_ctx = Arc::new(query_ctx);
958
959 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
960 .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
961 .start_timer();
962
963 if label_name == METRIC_NAME_LABEL {
964 let catalog_manager = handler.catalog_manager();
965 let mut tables_stream = catalog_manager.tables(&catalog, &schema, Some(&query_ctx));
966 let mut table_names = Vec::new();
967 while let Some(table) = tables_stream.next().await {
968 match table {
970 Ok(table) => {
971 if table
972 .table_info()
973 .meta
974 .options
975 .extra_options
976 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
977 {
978 continue;
979 }
980
981 table_names.push(table.table_info().name.clone());
982 }
983 Err(e) => {
984 return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
985 }
986 }
987 }
988 table_names.sort_unstable();
989 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
990 } else if label_name == FIELD_NAME_LABEL {
991 let field_columns = handle_schema_err!(
992 retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
993 )
994 .unwrap_or_default();
995 let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
996 field_columns.sort_unstable();
997 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
998 }
999
1000 let queries = params.matches.0;
1001 if queries.is_empty() {
1002 return PrometheusJsonResponse::error(
1003 StatusCode::InvalidArguments,
1004 "match[] parameter is required",
1005 );
1006 }
1007
1008 let start = params.start.unwrap_or_else(yesterday_rfc3339);
1009 let end = params.end.unwrap_or_else(current_time_rfc3339);
1010 let mut label_values = HashSet::new();
1011
1012 let start = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&start)
1013 .context(ParseTimestampSnafu { timestamp: &start }));
1014 let end = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&end)
1015 .context(ParseTimestampSnafu { timestamp: &end }));
1016
1017 for query in queries {
1018 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1019 let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1020 return PrometheusJsonResponse::error(
1021 StatusCode::InvalidArguments,
1022 "expected vector selector",
1023 );
1024 };
1025 let Some(name) = take_metric_name(&mut vector_selector) else {
1026 return PrometheusJsonResponse::error(
1027 StatusCode::InvalidArguments,
1028 "expected metric name",
1029 );
1030 };
1031 let VectorSelector { matchers, .. } = vector_selector;
1032 let matchers = matchers.matchers;
1034 let result = handler
1035 .query_label_values(
1036 name,
1037 label_name.to_string(),
1038 matchers,
1039 start,
1040 end,
1041 &query_ctx,
1042 )
1043 .await;
1044 if let Some(result) = handle_schema_err!(result) {
1045 label_values.extend(result.into_iter());
1046 }
1047 }
1048
1049 let mut label_values: Vec<_> = label_values.into_iter().collect();
1050 label_values.sort_unstable();
1051 PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1052}
1053
1054fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1057 if let Some(name) = selector.name.take() {
1058 return Some(name);
1059 }
1060
1061 let (pos, matcher) = selector
1062 .matchers
1063 .matchers
1064 .iter()
1065 .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1066 let name = matcher.value.clone();
1067 selector.matchers.matchers.remove(pos);
1069
1070 Some(name)
1071}
1072
1073async fn retrieve_field_names(
1074 query_ctx: &QueryContext,
1075 manager: CatalogManagerRef,
1076 matches: Vec<String>,
1077) -> Result<HashSet<String>> {
1078 let mut field_columns = HashSet::new();
1079 let catalog = query_ctx.current_catalog();
1080 let schema = query_ctx.current_schema();
1081
1082 if matches.is_empty() {
1083 while let Some(table) = manager
1085 .tables(catalog, &schema, Some(query_ctx))
1086 .next()
1087 .await
1088 {
1089 let table = table.context(CatalogSnafu)?;
1090 for column in table.field_columns() {
1091 field_columns.insert(column.name);
1092 }
1093 }
1094 return Ok(field_columns);
1095 }
1096
1097 for table_name in matches {
1098 let table = manager
1099 .table(catalog, &schema, &table_name, Some(query_ctx))
1100 .await
1101 .context(CatalogSnafu)?
1102 .with_context(|| TableNotFoundSnafu {
1103 catalog: catalog.to_string(),
1104 schema: schema.to_string(),
1105 table: table_name.to_string(),
1106 })?;
1107
1108 for column in table.field_columns() {
1109 field_columns.insert(column.name);
1110 }
1111 }
1112 Ok(field_columns)
1113}
1114
1115fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1119 let promql_expr = promql_parser::parser::parse(query).ok()?;
1120
1121 struct MetricNameVisitor {
1122 metric_name: Option<String>,
1123 }
1124
1125 impl promql_parser::util::ExprVisitor for MetricNameVisitor {
1126 type Error = ();
1127
1128 fn pre_visit(&mut self, plan: &PromqlExpr) -> std::result::Result<bool, Self::Error> {
1129 let query_metric_name = match plan {
1130 PromqlExpr::VectorSelector(vs) => vs
1131 .matchers
1132 .find_matchers(METRIC_NAME)
1133 .into_iter()
1134 .next()
1135 .map(|m| m.value)
1136 .or_else(|| vs.name.clone()),
1137 PromqlExpr::MatrixSelector(ms) => ms
1138 .vs
1139 .matchers
1140 .find_matchers(METRIC_NAME)
1141 .into_iter()
1142 .next()
1143 .map(|m| m.value)
1144 .or_else(|| ms.vs.name.clone()),
1145 _ => return Ok(true),
1146 };
1147
1148 if self.metric_name.is_some() && query_metric_name.is_some() {
1150 self.metric_name = Some(String::new());
1151 } else {
1152 self.metric_name = query_metric_name.or_else(|| self.metric_name.clone());
1153 }
1154
1155 Ok(true)
1156 }
1157 }
1158
1159 let mut visitor = MetricNameVisitor { metric_name: None };
1160 promql_parser::util::walk_expr(&mut visitor, &promql_expr).ok()?;
1161 visitor.metric_name
1162}
1163
1164#[derive(Debug, Default, Serialize, Deserialize)]
1165pub struct SeriesQuery {
1166 start: Option<String>,
1167 end: Option<String>,
1168 lookback: Option<String>,
1169 #[serde(flatten)]
1170 matches: Matches,
1171 db: Option<String>,
1172}
1173
1174#[axum_macros::debug_handler]
1175#[tracing::instrument(
1176 skip_all,
1177 fields(protocol = "prometheus", request_type = "series_query")
1178)]
1179pub async fn series_query(
1180 State(handler): State<PrometheusHandlerRef>,
1181 Query(params): Query<SeriesQuery>,
1182 Extension(mut query_ctx): Extension<QueryContext>,
1183 Form(form_params): Form<SeriesQuery>,
1184) -> PrometheusJsonResponse {
1185 let mut queries: Vec<String> = params.matches.0;
1186 if queries.is_empty() {
1187 queries = form_params.matches.0;
1188 }
1189 if queries.is_empty() {
1190 return PrometheusJsonResponse::error(
1191 StatusCode::Unsupported,
1192 "match[] parameter is required",
1193 );
1194 }
1195 let start = params
1196 .start
1197 .or(form_params.start)
1198 .unwrap_or_else(yesterday_rfc3339);
1199 let end = params
1200 .end
1201 .or(form_params.end)
1202 .unwrap_or_else(current_time_rfc3339);
1203 let lookback = params
1204 .lookback
1205 .or(form_params.lookback)
1206 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1207
1208 if let Some(db) = ¶ms.db {
1210 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1211 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1212 }
1213 let query_ctx = Arc::new(query_ctx);
1214
1215 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1216 .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1217 .start_timer();
1218
1219 let mut series = Vec::new();
1220 let mut merge_map = HashMap::new();
1221 for query in queries {
1222 let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1223 let prom_query = PromQuery {
1224 query,
1225 start: start.clone(),
1226 end: end.clone(),
1227 step: DEFAULT_LOOKBACK_STRING.to_string(),
1229 lookback: lookback.clone(),
1230 };
1231 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1232
1233 handle_schema_err!(
1234 retrieve_series_from_query_result(
1235 result,
1236 &mut series,
1237 &query_ctx,
1238 &table_name,
1239 &handler.catalog_manager(),
1240 &mut merge_map,
1241 )
1242 .await
1243 );
1244 }
1245 let merge_map = merge_map
1246 .into_iter()
1247 .map(|(k, v)| (k, Value::from(v)))
1248 .collect();
1249 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1250 resp.resp_metrics = merge_map;
1251 resp
1252}
1253
1254#[derive(Debug, Default, Serialize, Deserialize)]
1255pub struct ParseQuery {
1256 query: Option<String>,
1257 db: Option<String>,
1258}
1259
1260#[axum_macros::debug_handler]
1261#[tracing::instrument(
1262 skip_all,
1263 fields(protocol = "prometheus", request_type = "parse_query")
1264)]
1265pub async fn parse_query(
1266 State(_handler): State<PrometheusHandlerRef>,
1267 Query(params): Query<ParseQuery>,
1268 Extension(_query_ctx): Extension<QueryContext>,
1269 Form(form_params): Form<ParseQuery>,
1270) -> PrometheusJsonResponse {
1271 if let Some(query) = params.query.or(form_params.query) {
1272 let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1273 PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1274 } else {
1275 PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1276 }
1277}