1use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use axum::extract::{Path, Query, State};
20use axum::{Extension, Form};
21use catalog::CatalogManagerRef;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_query::{Output, OutputData};
26use common_recordbatch::RecordBatches;
27use common_telemetry::{debug, tracing};
28use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
29use common_version::OwnedBuildInfo;
30use datatypes::prelude::ConcreteDataType;
31use datatypes::scalars::ScalarVector;
32use datatypes::vectors::Float64Vector;
33use futures::future::join_all;
34use futures::StreamExt;
35use itertools::Itertools;
36use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
37use promql_parser::parser::value::ValueType;
38use promql_parser::parser::{
39 AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
40 UnaryExpr, VectorSelector,
41};
42use query::parser::{PromQuery, QueryLanguageParser, DEFAULT_LOOKBACK_STRING};
43use query::promql::planner::normalize_matcher;
44use serde::de::{self, MapAccess, Visitor};
45use serde::{Deserialize, Serialize};
46use serde_json::Value;
47use session::context::{QueryContext, QueryContextRef};
48use snafu::{Location, OptionExt, ResultExt};
49use store_api::metric_engine_consts::{
50 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY,
51};
52
53pub use super::result::prometheus_resp::PrometheusJsonResponse;
54use crate::error::{
55 CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result,
56 TableNotFoundSnafu, UnexpectedResultSnafu,
57};
58use crate::http::header::collect_plan_metrics;
59use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL};
60use crate::prometheus_handler::PrometheusHandlerRef;
61
62#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
64pub struct PromSeriesVector {
65 pub metric: HashMap<String, String>,
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub value: Option<(f64, String)>,
68}
69
70#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
72pub struct PromSeriesMatrix {
73 pub metric: HashMap<String, String>,
74 pub values: Vec<(f64, String)>,
75}
76
77#[derive(Debug, Serialize, Deserialize, PartialEq)]
79#[serde(untagged)]
80pub enum PromQueryResult {
81 Matrix(Vec<PromSeriesMatrix>),
82 Vector(Vec<PromSeriesVector>),
83 Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
84 String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
85}
86
87impl Default for PromQueryResult {
88 fn default() -> Self {
89 PromQueryResult::Matrix(Default::default())
90 }
91}
92
93#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
94pub struct PromData {
95 #[serde(rename = "resultType")]
96 pub result_type: String,
97 pub result: PromQueryResult,
98}
99
100#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
101#[serde(untagged)]
102pub enum PrometheusResponse {
103 PromData(PromData),
104 Labels(Vec<String>),
105 Series(Vec<HashMap<String, String>>),
106 LabelValues(Vec<String>),
107 FormatQuery(String),
108 BuildInfo(OwnedBuildInfo),
109 #[serde(skip_deserializing)]
110 ParseResult(promql_parser::parser::Expr),
111 #[default]
112 None,
113}
114
115impl PrometheusResponse {
116 fn append(&mut self, other: PrometheusResponse) {
120 match (self, other) {
121 (
122 PrometheusResponse::PromData(PromData {
123 result: PromQueryResult::Matrix(lhs),
124 ..
125 }),
126 PrometheusResponse::PromData(PromData {
127 result: PromQueryResult::Matrix(rhs),
128 ..
129 }),
130 ) => {
131 lhs.extend(rhs);
132 }
133
134 (
135 PrometheusResponse::PromData(PromData {
136 result: PromQueryResult::Vector(lhs),
137 ..
138 }),
139 PrometheusResponse::PromData(PromData {
140 result: PromQueryResult::Vector(rhs),
141 ..
142 }),
143 ) => {
144 lhs.extend(rhs);
145 }
146 _ => {
147 }
149 }
150 }
151
152 pub fn is_none(&self) -> bool {
153 matches!(self, PrometheusResponse::None)
154 }
155}
156
157#[derive(Debug, Default, Serialize, Deserialize)]
158pub struct FormatQuery {
159 query: Option<String>,
160}
161
162#[axum_macros::debug_handler]
163#[tracing::instrument(
164 skip_all,
165 fields(protocol = "prometheus", request_type = "format_query")
166)]
167pub async fn format_query(
168 State(_handler): State<PrometheusHandlerRef>,
169 Query(params): Query<InstantQuery>,
170 Extension(_query_ctx): Extension<QueryContext>,
171 Form(form_params): Form<InstantQuery>,
172) -> PrometheusJsonResponse {
173 let query = params.query.or(form_params.query).unwrap_or_default();
174 match promql_parser::parser::parse(&query) {
175 Ok(expr) => {
176 let pretty = expr.prettify();
177 PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
178 }
179 Err(reason) => {
180 let err = InvalidQuerySnafu { reason }.build();
181 PrometheusJsonResponse::error(err.status_code(), err.output_msg())
182 }
183 }
184}
185
186#[derive(Debug, Default, Serialize, Deserialize)]
187pub struct BuildInfoQuery {}
188
189#[axum_macros::debug_handler]
190#[tracing::instrument(
191 skip_all,
192 fields(protocol = "prometheus", request_type = "build_info_query")
193)]
194pub async fn build_info_query() -> PrometheusJsonResponse {
195 let build_info = common_version::build_info().clone();
196 PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
197}
198
199#[derive(Debug, Default, Serialize, Deserialize)]
200pub struct InstantQuery {
201 query: Option<String>,
202 lookback: Option<String>,
203 time: Option<String>,
204 timeout: Option<String>,
205 db: Option<String>,
206}
207
208macro_rules! try_call_return_response {
211 ($handle: expr) => {
212 match $handle {
213 Ok(res) => res,
214 Err(err) => {
215 let msg = err.to_string();
216 return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
217 }
218 }
219 };
220}
221
222#[axum_macros::debug_handler]
223#[tracing::instrument(
224 skip_all,
225 fields(protocol = "prometheus", request_type = "instant_query")
226)]
227pub async fn instant_query(
228 State(handler): State<PrometheusHandlerRef>,
229 Query(params): Query<InstantQuery>,
230 Extension(mut query_ctx): Extension<QueryContext>,
231 Form(form_params): Form<InstantQuery>,
232) -> PrometheusJsonResponse {
233 let time = params
235 .time
236 .or(form_params.time)
237 .unwrap_or_else(current_time_rfc3339);
238 let prom_query = PromQuery {
239 query: params.query.or(form_params.query).unwrap_or_default(),
240 start: time.clone(),
241 end: time,
242 step: "1s".to_string(),
243 lookback: params
244 .lookback
245 .or(form_params.lookback)
246 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
247 };
248
249 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
250
251 if let Some(db) = ¶ms.db {
253 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
254 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
255 }
256 let query_ctx = Arc::new(query_ctx);
257
258 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
259 .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
260 .start_timer();
261
262 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
263 && !name_matchers.is_empty()
264 {
265 debug!("Find metric name matchers: {:?}", name_matchers);
266
267 let metric_names =
268 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
269
270 debug!("Find metric names: {:?}", metric_names);
271
272 if metric_names.is_empty() {
273 let result_type = promql_expr.value_type();
274
275 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
276 result_type: result_type.to_string(),
277 ..Default::default()
278 }));
279 }
280
281 let responses = join_all(metric_names.into_iter().map(|metric| {
282 let mut prom_query = prom_query.clone();
283 let mut promql_expr = promql_expr.clone();
284 let query_ctx = query_ctx.clone();
285 let handler = handler.clone();
286
287 async move {
288 update_metric_name_matcher(&mut promql_expr, &metric);
289 let new_query = promql_expr.to_string();
290 debug!(
291 "Updated promql, before: {}, after: {}",
292 &prom_query.query, new_query
293 );
294 prom_query.query = new_query;
295
296 do_instant_query(&handler, &prom_query, query_ctx).await
297 }
298 }))
299 .await;
300
301 responses
302 .into_iter()
303 .reduce(|mut acc, resp| {
304 acc.data.append(resp.data);
305 acc
306 })
307 .unwrap()
308 } else {
309 do_instant_query(&handler, &prom_query, query_ctx).await
310 }
311}
312
313async fn do_instant_query(
315 handler: &PrometheusHandlerRef,
316 prom_query: &PromQuery,
317 query_ctx: QueryContextRef,
318) -> PrometheusJsonResponse {
319 let result = handler.do_query(prom_query, query_ctx).await;
320 let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
321 Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
322 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
323 };
324 PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
325}
326
327#[derive(Debug, Default, Serialize, Deserialize)]
328pub struct RangeQuery {
329 query: Option<String>,
330 start: Option<String>,
331 end: Option<String>,
332 step: Option<String>,
333 lookback: Option<String>,
334 timeout: Option<String>,
335 db: Option<String>,
336}
337
338#[axum_macros::debug_handler]
339#[tracing::instrument(
340 skip_all,
341 fields(protocol = "prometheus", request_type = "range_query")
342)]
343pub async fn range_query(
344 State(handler): State<PrometheusHandlerRef>,
345 Query(params): Query<RangeQuery>,
346 Extension(mut query_ctx): Extension<QueryContext>,
347 Form(form_params): Form<RangeQuery>,
348) -> PrometheusJsonResponse {
349 let prom_query = PromQuery {
350 query: params.query.or(form_params.query).unwrap_or_default(),
351 start: params.start.or(form_params.start).unwrap_or_default(),
352 end: params.end.or(form_params.end).unwrap_or_default(),
353 step: params.step.or(form_params.step).unwrap_or_default(),
354 lookback: params
355 .lookback
356 .or(form_params.lookback)
357 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
358 };
359
360 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
361
362 if let Some(db) = ¶ms.db {
364 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
365 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
366 }
367 let query_ctx = Arc::new(query_ctx);
368 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
369 .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
370 .start_timer();
371
372 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
373 && !name_matchers.is_empty()
374 {
375 debug!("Find metric name matchers: {:?}", name_matchers);
376
377 let metric_names =
378 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
379
380 debug!("Find metric names: {:?}", metric_names);
381
382 if metric_names.is_empty() {
383 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
384 result_type: ValueType::Matrix.to_string(),
385 ..Default::default()
386 }));
387 }
388
389 let responses = join_all(metric_names.into_iter().map(|metric| {
390 let mut prom_query = prom_query.clone();
391 let mut promql_expr = promql_expr.clone();
392 let query_ctx = query_ctx.clone();
393 let handler = handler.clone();
394
395 async move {
396 update_metric_name_matcher(&mut promql_expr, &metric);
397 let new_query = promql_expr.to_string();
398 debug!(
399 "Updated promql, before: {}, after: {}",
400 &prom_query.query, new_query
401 );
402 prom_query.query = new_query;
403
404 do_range_query(&handler, &prom_query, query_ctx).await
405 }
406 }))
407 .await;
408
409 responses
411 .into_iter()
412 .reduce(|mut acc, resp| {
413 acc.data.append(resp.data);
414 acc
415 })
416 .unwrap()
417 } else {
418 do_range_query(&handler, &prom_query, query_ctx).await
419 }
420}
421
422async fn do_range_query(
424 handler: &PrometheusHandlerRef,
425 prom_query: &PromQuery,
426 query_ctx: QueryContextRef,
427) -> PrometheusJsonResponse {
428 let result = handler.do_query(prom_query, query_ctx).await;
429 let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
430 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
431 Ok((metric_name, _)) => metric_name.unwrap_or_default(),
432 };
433 PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
434}
435
436#[derive(Debug, Default, Serialize)]
437struct Matches(Vec<String>);
438
439#[derive(Debug, Default, Serialize, Deserialize)]
440pub struct LabelsQuery {
441 start: Option<String>,
442 end: Option<String>,
443 lookback: Option<String>,
444 #[serde(flatten)]
445 matches: Matches,
446 db: Option<String>,
447}
448
449impl<'de> Deserialize<'de> for Matches {
451 fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
452 where
453 D: de::Deserializer<'de>,
454 {
455 struct MatchesVisitor;
456
457 impl<'d> Visitor<'d> for MatchesVisitor {
458 type Value = Vec<String>;
459
460 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
461 formatter.write_str("a string")
462 }
463
464 fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
465 where
466 M: MapAccess<'d>,
467 {
468 let mut matches = Vec::new();
469 while let Some((key, value)) = access.next_entry::<String, String>()? {
470 if key == "match[]" {
471 matches.push(value);
472 }
473 }
474 Ok(matches)
475 }
476 }
477 Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
478 }
479}
480
481#[axum_macros::debug_handler]
482#[tracing::instrument(
483 skip_all,
484 fields(protocol = "prometheus", request_type = "labels_query")
485)]
486pub async fn labels_query(
487 State(handler): State<PrometheusHandlerRef>,
488 Query(params): Query<LabelsQuery>,
489 Extension(mut query_ctx): Extension<QueryContext>,
490 Form(form_params): Form<LabelsQuery>,
491) -> PrometheusJsonResponse {
492 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
493 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
494 let query_ctx = Arc::new(query_ctx);
495
496 let mut queries = params.matches.0;
497 if queries.is_empty() {
498 queries = form_params.matches.0;
499 }
500
501 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
502 .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
503 .start_timer();
504
505 let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
507 {
508 Ok(labels) => labels,
509 Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
510 };
511 let _ = labels.insert(METRIC_NAME.to_string());
513
514 if queries.is_empty() {
516 let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
517 labels_vec.sort_unstable();
518 return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
519 }
520
521 let start = params
523 .start
524 .or(form_params.start)
525 .unwrap_or_else(yesterday_rfc3339);
526 let end = params
527 .end
528 .or(form_params.end)
529 .unwrap_or_else(current_time_rfc3339);
530 let lookback = params
531 .lookback
532 .or(form_params.lookback)
533 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
534
535 let mut fetched_labels = HashSet::new();
536 let _ = fetched_labels.insert(METRIC_NAME.to_string());
537
538 let mut merge_map = HashMap::new();
539 for query in queries {
540 let prom_query = PromQuery {
541 query,
542 start: start.clone(),
543 end: end.clone(),
544 step: DEFAULT_LOOKBACK_STRING.to_string(),
545 lookback: lookback.clone(),
546 };
547
548 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
549 if let Err(err) =
550 retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
551 .await
552 {
553 if err.status_code() != StatusCode::TableNotFound
555 && err.status_code() != StatusCode::TableColumnNotFound
556 {
557 return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
558 }
559 }
560 }
561
562 fetched_labels.retain(|l| labels.contains(l));
564 let _ = labels.insert(METRIC_NAME.to_string());
565
566 let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
567 sorted_labels.sort();
568 let merge_map = merge_map
569 .into_iter()
570 .map(|(k, v)| (k, Value::from(v)))
571 .collect();
572 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
573 resp.resp_metrics = merge_map;
574 resp
575}
576
577async fn get_all_column_names(
579 catalog: &str,
580 schema: &str,
581 manager: &CatalogManagerRef,
582) -> std::result::Result<HashSet<String>, catalog::error::Error> {
583 let table_names = manager.table_names(catalog, schema, None).await?;
584
585 let mut labels = HashSet::new();
586 for table_name in table_names {
587 let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
588 continue;
589 };
590 for column in table.primary_key_columns() {
591 if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
592 && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
593 {
594 labels.insert(column.name);
595 }
596 }
597 }
598
599 Ok(labels)
600}
601
602async fn retrieve_series_from_query_result(
603 result: Result<Output>,
604 series: &mut Vec<HashMap<String, String>>,
605 query_ctx: &QueryContext,
606 table_name: &str,
607 manager: &CatalogManagerRef,
608 metrics: &mut HashMap<String, u64>,
609) -> Result<()> {
610 let result = result?;
611
612 let table = manager
614 .table(
615 query_ctx.current_catalog(),
616 &query_ctx.current_schema(),
617 table_name,
618 Some(query_ctx),
619 )
620 .await
621 .context(CatalogSnafu)?
622 .with_context(|| TableNotFoundSnafu {
623 catalog: query_ctx.current_catalog(),
624 schema: query_ctx.current_schema(),
625 table: table_name,
626 })?;
627 let tag_columns = table
628 .primary_key_columns()
629 .map(|c| c.name)
630 .collect::<HashSet<_>>();
631
632 match result.data {
633 OutputData::RecordBatches(batches) => {
634 record_batches_to_series(batches, series, table_name, &tag_columns)
635 }
636 OutputData::Stream(stream) => {
637 let batches = RecordBatches::try_collect(stream)
638 .await
639 .context(CollectRecordbatchSnafu)?;
640 record_batches_to_series(batches, series, table_name, &tag_columns)
641 }
642 OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
643 reason: "expected data result, but got affected rows".to_string(),
644 location: Location::default(),
645 }),
646 }?;
647
648 if let Some(ref plan) = result.meta.plan {
649 collect_plan_metrics(plan, &mut [metrics]);
650 }
651 Ok(())
652}
653
654async fn retrieve_labels_name_from_query_result(
656 result: Result<Output>,
657 labels: &mut HashSet<String>,
658 metrics: &mut HashMap<String, u64>,
659) -> Result<()> {
660 let result = result?;
661 match result.data {
662 OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
663 OutputData::Stream(stream) => {
664 let batches = RecordBatches::try_collect(stream)
665 .await
666 .context(CollectRecordbatchSnafu)?;
667 record_batches_to_labels_name(batches, labels)
668 }
669 OutputData::AffectedRows(_) => UnexpectedResultSnafu {
670 reason: "expected data result, but got affected rows".to_string(),
671 }
672 .fail(),
673 }?;
674 if let Some(ref plan) = result.meta.plan {
675 collect_plan_metrics(plan, &mut [metrics]);
676 }
677 Ok(())
678}
679
680fn record_batches_to_series(
681 batches: RecordBatches,
682 series: &mut Vec<HashMap<String, String>>,
683 table_name: &str,
684 tag_columns: &HashSet<String>,
685) -> Result<()> {
686 for batch in batches.iter() {
687 let projection = batch
689 .schema
690 .column_schemas()
691 .iter()
692 .enumerate()
693 .filter_map(|(idx, col)| {
694 if tag_columns.contains(&col.name) {
695 Some(idx)
696 } else {
697 None
698 }
699 })
700 .collect::<Vec<_>>();
701 let batch = batch
702 .try_project(&projection)
703 .context(CollectRecordbatchSnafu)?;
704
705 for row in batch.rows() {
706 let mut element: HashMap<String, String> = row
707 .iter()
708 .enumerate()
709 .map(|(idx, column)| {
710 let column_name = batch.schema.column_name_by_index(idx);
711 (column_name.to_string(), column.to_string())
712 })
713 .collect();
714 let _ = element.insert("__name__".to_string(), table_name.to_string());
715 series.push(element);
716 }
717 }
718 Ok(())
719}
720
721fn record_batches_to_labels_name(
723 batches: RecordBatches,
724 labels: &mut HashSet<String>,
725) -> Result<()> {
726 let mut column_indices = Vec::new();
727 let mut field_column_indices = Vec::new();
728 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
729 if let ConcreteDataType::Float64(_) = column.data_type {
730 field_column_indices.push(i);
731 }
732 column_indices.push(i);
733 }
734
735 if field_column_indices.is_empty() {
736 return Err(Error::Internal {
737 err_msg: "no value column found".to_string(),
738 });
739 }
740
741 for batch in batches.iter() {
742 let names = column_indices
743 .iter()
744 .map(|c| batches.schema().column_name_by_index(*c).to_string())
745 .collect::<Vec<_>>();
746
747 let field_columns = field_column_indices
748 .iter()
749 .map(|i| {
750 batch
751 .column(*i)
752 .as_any()
753 .downcast_ref::<Float64Vector>()
754 .unwrap()
755 })
756 .collect::<Vec<_>>();
757
758 for row_index in 0..batch.num_rows() {
759 if field_columns
761 .iter()
762 .all(|c| c.get_data(row_index).is_none())
763 {
764 continue;
765 }
766
767 names.iter().for_each(|name| {
769 let _ = labels.insert(name.to_string());
770 });
771 return Ok(());
772 }
773 }
774 Ok(())
775}
776
777pub(crate) fn retrieve_metric_name_and_result_type(
778 promql: &str,
779) -> Result<(Option<String>, ValueType)> {
780 let promql_expr = promql_parser::parser::parse(promql)
781 .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
782 let metric_name = promql_expr_to_metric_name(&promql_expr);
783 let result_type = promql_expr.value_type();
784
785 Ok((metric_name, result_type))
786}
787
788pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
791 if let Some(db) = db {
792 parse_catalog_and_schema_from_db_string(db)
793 } else {
794 (
795 ctx.current_catalog().to_string(),
796 ctx.current_schema().to_string(),
797 )
798 }
799}
800
801pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
803 if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
804 ctx.set_current_catalog(catalog);
805 ctx.set_current_schema(schema);
806 }
807}
808
809fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
810 find_metric_name_and_matchers(expr, |name, matchers| {
811 name.clone().or(matchers
812 .find_matchers(METRIC_NAME)
813 .into_iter()
814 .next()
815 .map(|m| m.value))
816 })
817}
818
819fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
820where
821 F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
822{
823 match expr {
824 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
825 PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
826 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
827 find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
828 }
829 PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
830 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
831 PromqlExpr::NumberLiteral(_) => None,
832 PromqlExpr::StringLiteral(_) => None,
833 PromqlExpr::Extension(_) => None,
834 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
835 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
836 let VectorSelector { name, matchers, .. } = vs;
837
838 f(name, matchers)
839 }
840 PromqlExpr::Call(Call { args, .. }) => args
841 .args
842 .iter()
843 .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
844 }
845}
846
847fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
849 find_metric_name_and_matchers(expr, |name, matchers| {
850 if name.is_some() {
852 return None;
853 }
854
855 Some(matchers.find_matchers(METRIC_NAME))
857 })
858 .map(|matchers| {
859 matchers
860 .into_iter()
861 .filter(|m| !matches!(m.op, MatchOp::Equal))
862 .map(normalize_matcher)
863 .collect::<Vec<_>>()
864 })
865}
866
867fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
870 match expr {
871 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
872 update_metric_name_matcher(expr, metric_name)
873 }
874 PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
875 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
876 update_metric_name_matcher(lhs, metric_name);
877 update_metric_name_matcher(rhs, metric_name);
878 }
879 PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
880 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
881 update_metric_name_matcher(expr, metric_name)
882 }
883 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
884 if name.is_some() {
885 return;
886 }
887
888 for m in &mut matchers.matchers {
889 if m.name == METRIC_NAME {
890 m.op = MatchOp::Equal;
891 m.value = metric_name.to_string();
892 }
893 }
894 }
895 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
896 let VectorSelector { name, matchers, .. } = vs;
897 if name.is_some() {
898 return;
899 }
900
901 for m in &mut matchers.matchers {
902 if m.name == METRIC_NAME {
903 m.op = MatchOp::Equal;
904 m.value = metric_name.to_string();
905 }
906 }
907 }
908 PromqlExpr::Call(Call { args, .. }) => {
909 args.args.iter_mut().for_each(|e| {
910 update_metric_name_matcher(e, metric_name);
911 });
912 }
913 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
914 }
915}
916
917#[derive(Debug, Default, Serialize, Deserialize)]
918pub struct LabelValueQuery {
919 start: Option<String>,
920 end: Option<String>,
921 lookback: Option<String>,
922 #[serde(flatten)]
923 matches: Matches,
924 db: Option<String>,
925}
926
927#[axum_macros::debug_handler]
928#[tracing::instrument(
929 skip_all,
930 fields(protocol = "prometheus", request_type = "label_values_query")
931)]
932pub async fn label_values_query(
933 State(handler): State<PrometheusHandlerRef>,
934 Path(label_name): Path<String>,
935 Extension(mut query_ctx): Extension<QueryContext>,
936 Query(params): Query<LabelValueQuery>,
937) -> PrometheusJsonResponse {
938 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
939 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
940 let query_ctx = Arc::new(query_ctx);
941
942 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
943 .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
944 .start_timer();
945
946 if label_name == METRIC_NAME_LABEL {
947 let catalog_manager = handler.catalog_manager();
948 let mut tables_stream = catalog_manager.tables(&catalog, &schema, Some(&query_ctx));
949 let mut table_names = Vec::new();
950 while let Some(table) = tables_stream.next().await {
951 match table {
953 Ok(table) => {
954 if table
955 .table_info()
956 .meta
957 .options
958 .extra_options
959 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
960 {
961 continue;
962 }
963
964 table_names.push(table.table_info().name.clone());
965 }
966 Err(e) => {
967 return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
968 }
969 }
970 }
971 table_names.sort_unstable();
972 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
973 } else if label_name == FIELD_NAME_LABEL {
974 let field_columns =
975 match retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0)
976 .await
977 {
978 Ok(table_names) => table_names,
979 Err(e) => {
980 return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
981 }
982 };
983 let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
984 field_columns.sort_unstable();
985 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
986 }
987
988 let queries = params.matches.0;
989 if queries.is_empty() {
990 return PrometheusJsonResponse::error(
991 StatusCode::InvalidArguments,
992 "match[] parameter is required",
993 );
994 }
995
996 let start = params.start.unwrap_or_else(yesterday_rfc3339);
997 let end = params.end.unwrap_or_else(current_time_rfc3339);
998 let mut label_values = HashSet::new();
999
1000 let start = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&start)
1001 .context(ParseTimestampSnafu { timestamp: &start }));
1002 let end = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&end)
1003 .context(ParseTimestampSnafu { timestamp: &end }));
1004
1005 for query in queries {
1006 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1007 let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1008 return PrometheusJsonResponse::error(
1009 StatusCode::InvalidArguments,
1010 "expected vector selector",
1011 );
1012 };
1013 let Some(name) = take_metric_name(&mut vector_selector) else {
1014 return PrometheusJsonResponse::error(
1015 StatusCode::InvalidArguments,
1016 "expected metric name",
1017 );
1018 };
1019 let VectorSelector { matchers, .. } = vector_selector;
1020 let matchers = matchers.matchers;
1022 let result = handler
1023 .query_label_values(
1024 name,
1025 label_name.to_string(),
1026 matchers,
1027 start,
1028 end,
1029 &query_ctx,
1030 )
1031 .await;
1032
1033 match result {
1034 Ok(result) => {
1035 label_values.extend(result.into_iter());
1036 }
1037 Err(err) => {
1038 if err.status_code() != StatusCode::TableNotFound
1040 && err.status_code() != StatusCode::TableColumnNotFound
1041 {
1042 return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
1043 }
1044 }
1045 }
1046 }
1047
1048 let mut label_values: Vec<_> = label_values.into_iter().collect();
1049 label_values.sort_unstable();
1050 PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1051}
1052
1053fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1056 if let Some(name) = selector.name.take() {
1057 return Some(name);
1058 }
1059
1060 let (pos, matcher) = selector
1061 .matchers
1062 .matchers
1063 .iter()
1064 .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1065 let name = matcher.value.clone();
1066 selector.matchers.matchers.remove(pos);
1068
1069 Some(name)
1070}
1071
1072async fn retrieve_field_names(
1073 query_ctx: &QueryContext,
1074 manager: CatalogManagerRef,
1075 matches: Vec<String>,
1076) -> Result<HashSet<String>> {
1077 let mut field_columns = HashSet::new();
1078 let catalog = query_ctx.current_catalog();
1079 let schema = query_ctx.current_schema();
1080
1081 if matches.is_empty() {
1082 while let Some(table) = manager
1084 .tables(catalog, &schema, Some(query_ctx))
1085 .next()
1086 .await
1087 {
1088 let table = table.context(CatalogSnafu)?;
1089 for column in table.field_columns() {
1090 field_columns.insert(column.name);
1091 }
1092 }
1093 return Ok(field_columns);
1094 }
1095
1096 for table_name in matches {
1097 let table = manager
1098 .table(catalog, &schema, &table_name, Some(query_ctx))
1099 .await
1100 .context(CatalogSnafu)?
1101 .with_context(|| TableNotFoundSnafu {
1102 catalog: catalog.to_string(),
1103 schema: schema.to_string(),
1104 table: table_name.to_string(),
1105 })?;
1106
1107 for column in table.field_columns() {
1108 field_columns.insert(column.name);
1109 }
1110 }
1111 Ok(field_columns)
1112}
1113
1114fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1118 let promql_expr = promql_parser::parser::parse(query).ok()?;
1119
1120 struct MetricNameVisitor {
1121 metric_name: Option<String>,
1122 }
1123
1124 impl promql_parser::util::ExprVisitor for MetricNameVisitor {
1125 type Error = ();
1126
1127 fn pre_visit(&mut self, plan: &PromqlExpr) -> std::result::Result<bool, Self::Error> {
1128 let query_metric_name = match plan {
1129 PromqlExpr::VectorSelector(vs) => vs
1130 .matchers
1131 .find_matchers(METRIC_NAME)
1132 .into_iter()
1133 .next()
1134 .map(|m| m.value)
1135 .or_else(|| vs.name.clone()),
1136 PromqlExpr::MatrixSelector(ms) => ms
1137 .vs
1138 .matchers
1139 .find_matchers(METRIC_NAME)
1140 .into_iter()
1141 .next()
1142 .map(|m| m.value)
1143 .or_else(|| ms.vs.name.clone()),
1144 _ => return Ok(true),
1145 };
1146
1147 if self.metric_name.is_some() && query_metric_name.is_some() {
1149 self.metric_name = Some(String::new());
1150 } else {
1151 self.metric_name = query_metric_name.or_else(|| self.metric_name.clone());
1152 }
1153
1154 Ok(true)
1155 }
1156 }
1157
1158 let mut visitor = MetricNameVisitor { metric_name: None };
1159 promql_parser::util::walk_expr(&mut visitor, &promql_expr).ok()?;
1160 visitor.metric_name
1161}
1162
1163#[derive(Debug, Default, Serialize, Deserialize)]
1164pub struct SeriesQuery {
1165 start: Option<String>,
1166 end: Option<String>,
1167 lookback: Option<String>,
1168 #[serde(flatten)]
1169 matches: Matches,
1170 db: Option<String>,
1171}
1172
1173#[axum_macros::debug_handler]
1174#[tracing::instrument(
1175 skip_all,
1176 fields(protocol = "prometheus", request_type = "series_query")
1177)]
1178pub async fn series_query(
1179 State(handler): State<PrometheusHandlerRef>,
1180 Query(params): Query<SeriesQuery>,
1181 Extension(mut query_ctx): Extension<QueryContext>,
1182 Form(form_params): Form<SeriesQuery>,
1183) -> PrometheusJsonResponse {
1184 let mut queries: Vec<String> = params.matches.0;
1185 if queries.is_empty() {
1186 queries = form_params.matches.0;
1187 }
1188 if queries.is_empty() {
1189 return PrometheusJsonResponse::error(
1190 StatusCode::Unsupported,
1191 "match[] parameter is required",
1192 );
1193 }
1194 let start = params
1195 .start
1196 .or(form_params.start)
1197 .unwrap_or_else(yesterday_rfc3339);
1198 let end = params
1199 .end
1200 .or(form_params.end)
1201 .unwrap_or_else(current_time_rfc3339);
1202 let lookback = params
1203 .lookback
1204 .or(form_params.lookback)
1205 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1206
1207 if let Some(db) = ¶ms.db {
1209 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1210 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1211 }
1212 let query_ctx = Arc::new(query_ctx);
1213
1214 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1215 .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1216 .start_timer();
1217
1218 let mut series = Vec::new();
1219 let mut merge_map = HashMap::new();
1220 for query in queries {
1221 let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1222 let prom_query = PromQuery {
1223 query,
1224 start: start.clone(),
1225 end: end.clone(),
1226 step: DEFAULT_LOOKBACK_STRING.to_string(),
1228 lookback: lookback.clone(),
1229 };
1230 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1231
1232 if let Err(err) = retrieve_series_from_query_result(
1233 result,
1234 &mut series,
1235 &query_ctx,
1236 &table_name,
1237 &handler.catalog_manager(),
1238 &mut merge_map,
1239 )
1240 .await
1241 {
1242 return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
1243 }
1244 }
1245 let merge_map = merge_map
1246 .into_iter()
1247 .map(|(k, v)| (k, Value::from(v)))
1248 .collect();
1249 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1250 resp.resp_metrics = merge_map;
1251 resp
1252}
1253
1254#[derive(Debug, Default, Serialize, Deserialize)]
1255pub struct ParseQuery {
1256 query: Option<String>,
1257 db: Option<String>,
1258}
1259
1260#[axum_macros::debug_handler]
1261#[tracing::instrument(
1262 skip_all,
1263 fields(protocol = "prometheus", request_type = "parse_query")
1264)]
1265pub async fn parse_query(
1266 State(_handler): State<PrometheusHandlerRef>,
1267 Query(params): Query<ParseQuery>,
1268 Extension(_query_ctx): Extension<QueryContext>,
1269 Form(form_params): Form<ParseQuery>,
1270) -> PrometheusJsonResponse {
1271 if let Some(query) = params.query.or(form_params.query) {
1272 let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1273 PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1274 } else {
1275 PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1276 }
1277}