1use std::collections::{BTreeMap, HashMap, HashSet};
17use std::sync::Arc;
18
19use axum::extract::{Path, Query, State};
20use axum::{Extension, Form};
21use catalog::CatalogManagerRef;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_query::{Output, OutputData};
26use common_recordbatch::RecordBatches;
27use common_telemetry::{debug, tracing};
28use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
29use common_version::OwnedBuildInfo;
30use datatypes::prelude::ConcreteDataType;
31use datatypes::scalars::ScalarVector;
32use datatypes::vectors::Float64Vector;
33use futures::StreamExt;
34use futures::future::join_all;
35use itertools::Itertools;
36use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
37use promql_parser::parser::token::{self};
38use promql_parser::parser::value::ValueType;
39use promql_parser::parser::{
40 AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
41 SubqueryExpr, UnaryExpr, VectorSelector,
42};
43use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
44use query::promql::planner::normalize_matcher;
45use serde::de::{self, MapAccess, Visitor};
46use serde::{Deserialize, Serialize};
47use serde_json::Value;
48use session::context::{QueryContext, QueryContextRef};
49use snafu::{Location, OptionExt, ResultExt};
50use store_api::metric_engine_consts::{
51 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
52};
53
54pub use super::result::prometheus_resp::PrometheusJsonResponse;
55use crate::error::{
56 CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result,
57 TableNotFoundSnafu, UnexpectedResultSnafu,
58};
59use crate::http::header::collect_plan_metrics;
60use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
61use crate::prometheus_handler::PrometheusHandlerRef;
62
63#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
65pub struct PromSeriesVector {
66 pub metric: BTreeMap<String, String>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub value: Option<(f64, String)>,
69}
70
71#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
73pub struct PromSeriesMatrix {
74 pub metric: BTreeMap<String, String>,
75 pub values: Vec<(f64, String)>,
76}
77
78#[derive(Debug, Serialize, Deserialize, PartialEq)]
80#[serde(untagged)]
81pub enum PromQueryResult {
82 Matrix(Vec<PromSeriesMatrix>),
83 Vector(Vec<PromSeriesVector>),
84 Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
85 String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
86}
87
88impl Default for PromQueryResult {
89 fn default() -> Self {
90 PromQueryResult::Matrix(Default::default())
91 }
92}
93
94#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
95pub struct PromData {
96 #[serde(rename = "resultType")]
97 pub result_type: String,
98 pub result: PromQueryResult,
99}
100
101#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
102#[serde(untagged)]
103pub enum PrometheusResponse {
104 PromData(PromData),
105 Labels(Vec<String>),
106 Series(Vec<HashMap<String, String>>),
107 LabelValues(Vec<String>),
108 FormatQuery(String),
109 BuildInfo(OwnedBuildInfo),
110 #[serde(skip_deserializing)]
111 ParseResult(promql_parser::parser::Expr),
112 #[default]
113 None,
114}
115
116impl PrometheusResponse {
117 fn append(&mut self, other: PrometheusResponse) {
121 match (self, other) {
122 (
123 PrometheusResponse::PromData(PromData {
124 result: PromQueryResult::Matrix(lhs),
125 ..
126 }),
127 PrometheusResponse::PromData(PromData {
128 result: PromQueryResult::Matrix(rhs),
129 ..
130 }),
131 ) => {
132 lhs.extend(rhs);
133 }
134
135 (
136 PrometheusResponse::PromData(PromData {
137 result: PromQueryResult::Vector(lhs),
138 ..
139 }),
140 PrometheusResponse::PromData(PromData {
141 result: PromQueryResult::Vector(rhs),
142 ..
143 }),
144 ) => {
145 lhs.extend(rhs);
146 }
147 _ => {
148 }
150 }
151 }
152
153 pub fn is_none(&self) -> bool {
154 matches!(self, PrometheusResponse::None)
155 }
156}
157
158#[derive(Debug, Default, Serialize, Deserialize)]
159pub struct FormatQuery {
160 query: Option<String>,
161}
162
163#[axum_macros::debug_handler]
164#[tracing::instrument(
165 skip_all,
166 fields(protocol = "prometheus", request_type = "format_query")
167)]
168pub async fn format_query(
169 State(_handler): State<PrometheusHandlerRef>,
170 Query(params): Query<InstantQuery>,
171 Extension(_query_ctx): Extension<QueryContext>,
172 Form(form_params): Form<InstantQuery>,
173) -> PrometheusJsonResponse {
174 let query = params.query.or(form_params.query).unwrap_or_default();
175 match promql_parser::parser::parse(&query) {
176 Ok(expr) => {
177 let pretty = expr.prettify();
178 PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
179 }
180 Err(reason) => {
181 let err = InvalidQuerySnafu { reason }.build();
182 PrometheusJsonResponse::error(err.status_code(), err.output_msg())
183 }
184 }
185}
186
187#[derive(Debug, Default, Serialize, Deserialize)]
188pub struct BuildInfoQuery {}
189
190#[axum_macros::debug_handler]
191#[tracing::instrument(
192 skip_all,
193 fields(protocol = "prometheus", request_type = "build_info_query")
194)]
195pub async fn build_info_query() -> PrometheusJsonResponse {
196 let build_info = common_version::build_info().clone();
197 PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
198}
199
200#[derive(Debug, Default, Serialize, Deserialize)]
201pub struct InstantQuery {
202 query: Option<String>,
203 lookback: Option<String>,
204 time: Option<String>,
205 timeout: Option<String>,
206 db: Option<String>,
207}
208
209macro_rules! try_call_return_response {
212 ($handle: expr) => {
213 match $handle {
214 Ok(res) => res,
215 Err(err) => {
216 let msg = err.to_string();
217 return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
218 }
219 }
220 };
221}
222
223#[axum_macros::debug_handler]
224#[tracing::instrument(
225 skip_all,
226 fields(protocol = "prometheus", request_type = "instant_query")
227)]
228pub async fn instant_query(
229 State(handler): State<PrometheusHandlerRef>,
230 Query(params): Query<InstantQuery>,
231 Extension(mut query_ctx): Extension<QueryContext>,
232 Form(form_params): Form<InstantQuery>,
233) -> PrometheusJsonResponse {
234 let time = params
236 .time
237 .or(form_params.time)
238 .unwrap_or_else(current_time_rfc3339);
239 let prom_query = PromQuery {
240 query: params.query.or(form_params.query).unwrap_or_default(),
241 start: time.clone(),
242 end: time,
243 step: "1s".to_string(),
244 lookback: params
245 .lookback
246 .or(form_params.lookback)
247 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
248 alias: None,
249 };
250
251 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
252
253 if let Some(db) = ¶ms.db {
255 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
256 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
257 }
258 let query_ctx = Arc::new(query_ctx);
259
260 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
261 .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
262 .start_timer();
263
264 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
265 && !name_matchers.is_empty()
266 {
267 debug!("Find metric name matchers: {:?}", name_matchers);
268
269 let metric_names =
270 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
271
272 debug!("Find metric names: {:?}", metric_names);
273
274 if metric_names.is_empty() {
275 let result_type = promql_expr.value_type();
276
277 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
278 result_type: result_type.to_string(),
279 ..Default::default()
280 }));
281 }
282
283 let responses = join_all(metric_names.into_iter().map(|metric| {
284 let mut prom_query = prom_query.clone();
285 let mut promql_expr = promql_expr.clone();
286 let query_ctx = query_ctx.clone();
287 let handler = handler.clone();
288
289 async move {
290 update_metric_name_matcher(&mut promql_expr, &metric);
291 let new_query = promql_expr.to_string();
292 debug!(
293 "Updated promql, before: {}, after: {}",
294 &prom_query.query, new_query
295 );
296 prom_query.query = new_query;
297
298 do_instant_query(&handler, &prom_query, query_ctx).await
299 }
300 }))
301 .await;
302
303 responses
304 .into_iter()
305 .reduce(|mut acc, resp| {
306 acc.data.append(resp.data);
307 acc
308 })
309 .unwrap()
310 } else {
311 do_instant_query(&handler, &prom_query, query_ctx).await
312 }
313}
314
315async fn do_instant_query(
317 handler: &PrometheusHandlerRef,
318 prom_query: &PromQuery,
319 query_ctx: QueryContextRef,
320) -> PrometheusJsonResponse {
321 let result = handler.do_query(prom_query, query_ctx).await;
322 let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
323 Ok((metric_name, result_type)) => (metric_name, result_type),
324 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
325 };
326 PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
327}
328
329#[derive(Debug, Default, Serialize, Deserialize)]
330pub struct RangeQuery {
331 query: Option<String>,
332 start: Option<String>,
333 end: Option<String>,
334 step: Option<String>,
335 lookback: Option<String>,
336 timeout: Option<String>,
337 db: Option<String>,
338}
339
340#[axum_macros::debug_handler]
341#[tracing::instrument(
342 skip_all,
343 fields(protocol = "prometheus", request_type = "range_query")
344)]
345pub async fn range_query(
346 State(handler): State<PrometheusHandlerRef>,
347 Query(params): Query<RangeQuery>,
348 Extension(mut query_ctx): Extension<QueryContext>,
349 Form(form_params): Form<RangeQuery>,
350) -> PrometheusJsonResponse {
351 let prom_query = PromQuery {
352 query: params.query.or(form_params.query).unwrap_or_default(),
353 start: params.start.or(form_params.start).unwrap_or_default(),
354 end: params.end.or(form_params.end).unwrap_or_default(),
355 step: params.step.or(form_params.step).unwrap_or_default(),
356 lookback: params
357 .lookback
358 .or(form_params.lookback)
359 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
360 alias: None,
361 };
362
363 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
364
365 if let Some(db) = ¶ms.db {
367 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
368 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
369 }
370 let query_ctx = Arc::new(query_ctx);
371 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
372 .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
373 .start_timer();
374
375 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
376 && !name_matchers.is_empty()
377 {
378 debug!("Find metric name matchers: {:?}", name_matchers);
379
380 let metric_names =
381 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
382
383 debug!("Find metric names: {:?}", metric_names);
384
385 if metric_names.is_empty() {
386 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
387 result_type: ValueType::Matrix.to_string(),
388 ..Default::default()
389 }));
390 }
391
392 let responses = join_all(metric_names.into_iter().map(|metric| {
393 let mut prom_query = prom_query.clone();
394 let mut promql_expr = promql_expr.clone();
395 let query_ctx = query_ctx.clone();
396 let handler = handler.clone();
397
398 async move {
399 update_metric_name_matcher(&mut promql_expr, &metric);
400 let new_query = promql_expr.to_string();
401 debug!(
402 "Updated promql, before: {}, after: {}",
403 &prom_query.query, new_query
404 );
405 prom_query.query = new_query;
406
407 do_range_query(&handler, &prom_query, query_ctx).await
408 }
409 }))
410 .await;
411
412 responses
414 .into_iter()
415 .reduce(|mut acc, resp| {
416 acc.data.append(resp.data);
417 acc
418 })
419 .unwrap()
420 } else {
421 do_range_query(&handler, &prom_query, query_ctx).await
422 }
423}
424
425async fn do_range_query(
427 handler: &PrometheusHandlerRef,
428 prom_query: &PromQuery,
429 query_ctx: QueryContextRef,
430) -> PrometheusJsonResponse {
431 let result = handler.do_query(prom_query, query_ctx).await;
432 let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
433 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
434 Ok((metric_name, _)) => metric_name,
435 };
436 PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
437}
438
439#[derive(Debug, Default, Serialize)]
440struct Matches(Vec<String>);
441
442#[derive(Debug, Default, Serialize, Deserialize)]
443pub struct LabelsQuery {
444 start: Option<String>,
445 end: Option<String>,
446 lookback: Option<String>,
447 #[serde(flatten)]
448 matches: Matches,
449 db: Option<String>,
450}
451
452impl<'de> Deserialize<'de> for Matches {
454 fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
455 where
456 D: de::Deserializer<'de>,
457 {
458 struct MatchesVisitor;
459
460 impl<'d> Visitor<'d> for MatchesVisitor {
461 type Value = Vec<String>;
462
463 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
464 formatter.write_str("a string")
465 }
466
467 fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
468 where
469 M: MapAccess<'d>,
470 {
471 let mut matches = Vec::new();
472 while let Some((key, value)) = access.next_entry::<String, String>()? {
473 if key == "match[]" {
474 matches.push(value);
475 }
476 }
477 Ok(matches)
478 }
479 }
480 Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
481 }
482}
483
484macro_rules! handle_schema_err {
491 ($result:expr) => {
492 match $result {
493 Ok(v) => Some(v),
494 Err(err) => {
495 if err.status_code() == StatusCode::TableNotFound
496 || err.status_code() == StatusCode::TableColumnNotFound
497 {
498 None
500 } else {
501 return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
502 }
503 }
504 }
505 };
506}
507
508#[axum_macros::debug_handler]
509#[tracing::instrument(
510 skip_all,
511 fields(protocol = "prometheus", request_type = "labels_query")
512)]
513pub async fn labels_query(
514 State(handler): State<PrometheusHandlerRef>,
515 Query(params): Query<LabelsQuery>,
516 Extension(mut query_ctx): Extension<QueryContext>,
517 Form(form_params): Form<LabelsQuery>,
518) -> PrometheusJsonResponse {
519 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
520 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
521 let query_ctx = Arc::new(query_ctx);
522
523 let mut queries = params.matches.0;
524 if queries.is_empty() {
525 queries = form_params.matches.0;
526 }
527
528 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
529 .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
530 .start_timer();
531
532 let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
534 {
535 Ok(labels) => labels,
536 Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
537 };
538 let _ = labels.insert(METRIC_NAME.to_string());
540
541 if queries.is_empty() {
543 let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
544 labels_vec.sort_unstable();
545 return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
546 }
547
548 let start = params
550 .start
551 .or(form_params.start)
552 .unwrap_or_else(yesterday_rfc3339);
553 let end = params
554 .end
555 .or(form_params.end)
556 .unwrap_or_else(current_time_rfc3339);
557 let lookback = params
558 .lookback
559 .or(form_params.lookback)
560 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
561
562 let mut fetched_labels = HashSet::new();
563 let _ = fetched_labels.insert(METRIC_NAME.to_string());
564
565 let mut merge_map = HashMap::new();
566 for query in queries {
567 let prom_query = PromQuery {
568 query,
569 start: start.clone(),
570 end: end.clone(),
571 step: DEFAULT_LOOKBACK_STRING.to_string(),
572 lookback: lookback.clone(),
573 alias: None,
574 };
575
576 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
577 handle_schema_err!(
578 retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
579 .await
580 );
581 }
582
583 fetched_labels.retain(|l| labels.contains(l));
585 let _ = labels.insert(METRIC_NAME.to_string());
586
587 let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
588 sorted_labels.sort();
589 let merge_map = merge_map
590 .into_iter()
591 .map(|(k, v)| (k, Value::from(v)))
592 .collect();
593 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
594 resp.resp_metrics = merge_map;
595 resp
596}
597
598async fn get_all_column_names(
600 catalog: &str,
601 schema: &str,
602 manager: &CatalogManagerRef,
603) -> std::result::Result<HashSet<String>, catalog::error::Error> {
604 let table_names = manager.table_names(catalog, schema, None).await?;
605
606 let mut labels = HashSet::new();
607 for table_name in table_names {
608 let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
609 continue;
610 };
611 for column in table.primary_key_columns() {
612 if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
613 && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
614 {
615 labels.insert(column.name);
616 }
617 }
618 }
619
620 Ok(labels)
621}
622
623async fn retrieve_series_from_query_result(
624 result: Result<Output>,
625 series: &mut Vec<HashMap<String, String>>,
626 query_ctx: &QueryContext,
627 table_name: &str,
628 manager: &CatalogManagerRef,
629 metrics: &mut HashMap<String, u64>,
630) -> Result<()> {
631 let result = result?;
632
633 let table = manager
635 .table(
636 query_ctx.current_catalog(),
637 &query_ctx.current_schema(),
638 table_name,
639 Some(query_ctx),
640 )
641 .await
642 .context(CatalogSnafu)?
643 .with_context(|| TableNotFoundSnafu {
644 catalog: query_ctx.current_catalog(),
645 schema: query_ctx.current_schema(),
646 table: table_name,
647 })?;
648 let tag_columns = table
649 .primary_key_columns()
650 .map(|c| c.name)
651 .collect::<HashSet<_>>();
652
653 match result.data {
654 OutputData::RecordBatches(batches) => {
655 record_batches_to_series(batches, series, table_name, &tag_columns)
656 }
657 OutputData::Stream(stream) => {
658 let batches = RecordBatches::try_collect(stream)
659 .await
660 .context(CollectRecordbatchSnafu)?;
661 record_batches_to_series(batches, series, table_name, &tag_columns)
662 }
663 OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
664 reason: "expected data result, but got affected rows".to_string(),
665 location: Location::default(),
666 }),
667 }?;
668
669 if let Some(ref plan) = result.meta.plan {
670 collect_plan_metrics(plan, &mut [metrics]);
671 }
672 Ok(())
673}
674
675async fn retrieve_labels_name_from_query_result(
677 result: Result<Output>,
678 labels: &mut HashSet<String>,
679 metrics: &mut HashMap<String, u64>,
680) -> Result<()> {
681 let result = result?;
682 match result.data {
683 OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
684 OutputData::Stream(stream) => {
685 let batches = RecordBatches::try_collect(stream)
686 .await
687 .context(CollectRecordbatchSnafu)?;
688 record_batches_to_labels_name(batches, labels)
689 }
690 OutputData::AffectedRows(_) => UnexpectedResultSnafu {
691 reason: "expected data result, but got affected rows".to_string(),
692 }
693 .fail(),
694 }?;
695 if let Some(ref plan) = result.meta.plan {
696 collect_plan_metrics(plan, &mut [metrics]);
697 }
698 Ok(())
699}
700
701fn record_batches_to_series(
702 batches: RecordBatches,
703 series: &mut Vec<HashMap<String, String>>,
704 table_name: &str,
705 tag_columns: &HashSet<String>,
706) -> Result<()> {
707 for batch in batches.iter() {
708 let projection = batch
710 .schema
711 .column_schemas()
712 .iter()
713 .enumerate()
714 .filter_map(|(idx, col)| {
715 if tag_columns.contains(&col.name) {
716 Some(idx)
717 } else {
718 None
719 }
720 })
721 .collect::<Vec<_>>();
722 let batch = batch
723 .try_project(&projection)
724 .context(CollectRecordbatchSnafu)?;
725
726 for row in batch.rows() {
727 let mut element: HashMap<String, String> = row
728 .iter()
729 .enumerate()
730 .map(|(idx, column)| {
731 let column_name = batch.schema.column_name_by_index(idx);
732 (column_name.to_string(), column.to_string())
733 })
734 .collect();
735 let _ = element.insert("__name__".to_string(), table_name.to_string());
736 series.push(element);
737 }
738 }
739 Ok(())
740}
741
742fn record_batches_to_labels_name(
744 batches: RecordBatches,
745 labels: &mut HashSet<String>,
746) -> Result<()> {
747 let mut column_indices = Vec::new();
748 let mut field_column_indices = Vec::new();
749 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
750 if let ConcreteDataType::Float64(_) = column.data_type {
751 field_column_indices.push(i);
752 }
753 column_indices.push(i);
754 }
755
756 if field_column_indices.is_empty() {
757 return Err(Error::Internal {
758 err_msg: "no value column found".to_string(),
759 });
760 }
761
762 for batch in batches.iter() {
763 let names = column_indices
764 .iter()
765 .map(|c| batches.schema().column_name_by_index(*c).to_string())
766 .collect::<Vec<_>>();
767
768 let field_columns = field_column_indices
769 .iter()
770 .map(|i| {
771 batch
772 .column(*i)
773 .as_any()
774 .downcast_ref::<Float64Vector>()
775 .unwrap()
776 })
777 .collect::<Vec<_>>();
778
779 for row_index in 0..batch.num_rows() {
780 if field_columns
782 .iter()
783 .all(|c| c.get_data(row_index).is_none())
784 {
785 continue;
786 }
787
788 names.iter().for_each(|name| {
790 let _ = labels.insert(name.clone());
791 });
792 return Ok(());
793 }
794 }
795 Ok(())
796}
797
798pub(crate) fn retrieve_metric_name_and_result_type(
799 promql: &str,
800) -> Result<(Option<String>, ValueType)> {
801 let promql_expr = promql_parser::parser::parse(promql)
802 .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
803 let metric_name = promql_expr_to_metric_name(&promql_expr);
804 let result_type = promql_expr.value_type();
805
806 Ok((metric_name, result_type))
807}
808
809pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
812 if let Some(db) = db {
813 parse_catalog_and_schema_from_db_string(db)
814 } else {
815 (
816 ctx.current_catalog().to_string(),
817 ctx.current_schema().clone(),
818 )
819 }
820}
821
822pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
824 if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
825 ctx.set_current_catalog(catalog);
826 ctx.set_current_schema(schema);
827 }
828}
829
830fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
831 let mut metric_names = HashSet::new();
832 collect_metric_names(expr, &mut metric_names);
833
834 if metric_names.len() == 1 {
836 metric_names.into_iter().next()
837 } else {
838 None
839 }
840}
841
842fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
844 match expr {
845 PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
846 match modifier {
847 Some(LabelModifier::Include(labels)) => {
848 if !labels.labels.contains(&METRIC_NAME.to_string()) {
849 metric_names.clear();
850 return;
851 }
852 }
853 Some(LabelModifier::Exclude(labels)) => {
854 if labels.labels.contains(&METRIC_NAME.to_string()) {
855 metric_names.clear();
856 return;
857 }
858 }
859 _ => {}
860 }
861 collect_metric_names(expr, metric_names)
862 }
863 PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
864 PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
865 if matches!(
866 op.id(),
867 token::T_LAND | token::T_LOR | token::T_LUNLESS ) {
871 collect_metric_names(lhs, metric_names)
872 } else {
873 metric_names.clear()
874 }
875 }
876 PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
877 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
878 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
879 if let Some(name) = name {
880 metric_names.insert(name.clone());
881 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
882 metric_names.insert(matcher.value);
883 }
884 }
885 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
886 let VectorSelector { name, matchers, .. } = vs;
887 if let Some(name) = name {
888 metric_names.insert(name.clone());
889 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
890 metric_names.insert(matcher.value);
891 }
892 }
893 PromqlExpr::Call(Call { args, .. }) => {
894 args.args
895 .iter()
896 .for_each(|e| collect_metric_names(e, metric_names));
897 }
898 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
899 }
900}
901
902fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
903where
904 F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
905{
906 match expr {
907 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
908 PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
909 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
910 find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
911 }
912 PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
913 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
914 PromqlExpr::NumberLiteral(_) => None,
915 PromqlExpr::StringLiteral(_) => None,
916 PromqlExpr::Extension(_) => None,
917 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
918 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
919 let VectorSelector { name, matchers, .. } = vs;
920
921 f(name, matchers)
922 }
923 PromqlExpr::Call(Call { args, .. }) => args
924 .args
925 .iter()
926 .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
927 }
928}
929
930fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
932 find_metric_name_and_matchers(expr, |name, matchers| {
933 if name.is_some() {
935 return None;
936 }
937
938 Some(matchers.find_matchers(METRIC_NAME))
940 })
941 .map(|matchers| {
942 matchers
943 .into_iter()
944 .filter(|m| !matches!(m.op, MatchOp::Equal))
945 .map(normalize_matcher)
946 .collect::<Vec<_>>()
947 })
948}
949
950fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
953 match expr {
954 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
955 update_metric_name_matcher(expr, metric_name)
956 }
957 PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
958 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
959 update_metric_name_matcher(lhs, metric_name);
960 update_metric_name_matcher(rhs, metric_name);
961 }
962 PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
963 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
964 update_metric_name_matcher(expr, metric_name)
965 }
966 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
967 if name.is_some() {
968 return;
969 }
970
971 for m in &mut matchers.matchers {
972 if m.name == METRIC_NAME {
973 m.op = MatchOp::Equal;
974 m.value = metric_name.to_string();
975 }
976 }
977 }
978 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
979 let VectorSelector { name, matchers, .. } = vs;
980 if name.is_some() {
981 return;
982 }
983
984 for m in &mut matchers.matchers {
985 if m.name == METRIC_NAME {
986 m.op = MatchOp::Equal;
987 m.value = metric_name.to_string();
988 }
989 }
990 }
991 PromqlExpr::Call(Call { args, .. }) => {
992 args.args.iter_mut().for_each(|e| {
993 update_metric_name_matcher(e, metric_name);
994 });
995 }
996 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
997 }
998}
999
1000#[derive(Debug, Default, Serialize, Deserialize)]
1001pub struct LabelValueQuery {
1002 start: Option<String>,
1003 end: Option<String>,
1004 lookback: Option<String>,
1005 #[serde(flatten)]
1006 matches: Matches,
1007 db: Option<String>,
1008 limit: Option<usize>,
1009}
1010
1011#[axum_macros::debug_handler]
1012#[tracing::instrument(
1013 skip_all,
1014 fields(protocol = "prometheus", request_type = "label_values_query")
1015)]
1016pub async fn label_values_query(
1017 State(handler): State<PrometheusHandlerRef>,
1018 Path(label_name): Path<String>,
1019 Extension(mut query_ctx): Extension<QueryContext>,
1020 Query(params): Query<LabelValueQuery>,
1021) -> PrometheusJsonResponse {
1022 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
1023 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1024 let query_ctx = Arc::new(query_ctx);
1025
1026 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1027 .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1028 .start_timer();
1029
1030 if label_name == METRIC_NAME_LABEL {
1031 let catalog_manager = handler.catalog_manager();
1032
1033 let mut table_names = try_call_return_response!(
1034 retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1035 );
1036
1037 truncate_results(&mut table_names, params.limit);
1038 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1039 } else if label_name == FIELD_NAME_LABEL {
1040 let field_columns = handle_schema_err!(
1041 retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1042 )
1043 .unwrap_or_default();
1044 let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1045 field_columns.sort_unstable();
1046 truncate_results(&mut field_columns, params.limit);
1047 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1048 } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
1049 let catalog_manager = handler.catalog_manager();
1050
1051 let mut schema_names = try_call_return_response!(
1052 retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1053 );
1054 truncate_results(&mut schema_names, params.limit);
1055 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1056 }
1057
1058 let queries = params.matches.0;
1059 if queries.is_empty() {
1060 return PrometheusJsonResponse::error(
1061 StatusCode::InvalidArguments,
1062 "match[] parameter is required",
1063 );
1064 }
1065
1066 let start = params.start.unwrap_or_else(yesterday_rfc3339);
1067 let end = params.end.unwrap_or_else(current_time_rfc3339);
1068 let mut label_values = HashSet::new();
1069
1070 let start = try_call_return_response!(
1071 QueryLanguageParser::parse_promql_timestamp(&start)
1072 .context(ParseTimestampSnafu { timestamp: &start })
1073 );
1074 let end = try_call_return_response!(
1075 QueryLanguageParser::parse_promql_timestamp(&end)
1076 .context(ParseTimestampSnafu { timestamp: &end })
1077 );
1078
1079 for query in queries {
1080 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1081 let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1082 return PrometheusJsonResponse::error(
1083 StatusCode::InvalidArguments,
1084 "expected vector selector",
1085 );
1086 };
1087 let Some(name) = take_metric_name(&mut vector_selector) else {
1088 return PrometheusJsonResponse::error(
1089 StatusCode::InvalidArguments,
1090 "expected metric name",
1091 );
1092 };
1093 let VectorSelector { matchers, .. } = vector_selector;
1094 let matchers = matchers.matchers;
1096 let result = handler
1097 .query_label_values(name, label_name.clone(), matchers, start, end, &query_ctx)
1098 .await;
1099 if let Some(result) = handle_schema_err!(result) {
1100 label_values.extend(result.into_iter());
1101 }
1102 }
1103
1104 let mut label_values: Vec<_> = label_values.into_iter().collect();
1105 label_values.sort_unstable();
1106 truncate_results(&mut label_values, params.limit);
1107
1108 PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1109}
1110
1111fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1112 if let Some(limit) = limit
1113 && limit > 0
1114 && label_values.len() >= limit
1115 {
1116 label_values.truncate(limit);
1117 }
1118}
1119
1120fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1123 if let Some(name) = selector.name.take() {
1124 return Some(name);
1125 }
1126
1127 let (pos, matcher) = selector
1128 .matchers
1129 .matchers
1130 .iter()
1131 .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1132 let name = matcher.value.clone();
1133 selector.matchers.matchers.remove(pos);
1135
1136 Some(name)
1137}
1138
1139async fn retrieve_table_names(
1140 query_ctx: &QueryContext,
1141 catalog_manager: CatalogManagerRef,
1142 matches: Vec<String>,
1143) -> Result<Vec<String>> {
1144 let catalog = query_ctx.current_catalog();
1145 let schema = query_ctx.current_schema();
1146
1147 let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1148 let mut table_names = Vec::new();
1149
1150 let name_matcher = matches
1152 .first()
1153 .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1154 .and_then(|expr| {
1155 if let PromqlExpr::VectorSelector(vector_selector) = expr {
1156 let matchers = vector_selector.matchers.matchers;
1157 for matcher in matchers {
1158 if matcher.name == METRIC_NAME_LABEL {
1159 return Some(matcher);
1160 }
1161 }
1162
1163 None
1164 } else {
1165 None
1166 }
1167 });
1168
1169 while let Some(table) = tables_stream.next().await {
1170 let table = table.context(CatalogSnafu)?;
1171 if !table
1172 .table_info()
1173 .meta
1174 .options
1175 .extra_options
1176 .contains_key(LOGICAL_TABLE_METADATA_KEY)
1177 {
1178 continue;
1180 }
1181
1182 let table_name = &table.table_info().name;
1183
1184 if let Some(matcher) = &name_matcher {
1185 match &matcher.op {
1186 MatchOp::Equal => {
1187 if table_name == &matcher.value {
1188 table_names.push(table_name.clone());
1189 }
1190 }
1191 MatchOp::Re(reg) => {
1192 if reg.is_match(table_name) {
1193 table_names.push(table_name.clone());
1194 }
1195 }
1196 _ => {
1197 table_names.push(table_name.clone());
1200 }
1201 }
1202 } else {
1203 table_names.push(table_name.clone());
1204 }
1205 }
1206
1207 table_names.sort_unstable();
1208 Ok(table_names)
1209}
1210
1211async fn retrieve_field_names(
1212 query_ctx: &QueryContext,
1213 manager: CatalogManagerRef,
1214 matches: Vec<String>,
1215) -> Result<HashSet<String>> {
1216 let mut field_columns = HashSet::new();
1217 let catalog = query_ctx.current_catalog();
1218 let schema = query_ctx.current_schema();
1219
1220 if matches.is_empty() {
1221 while let Some(table) = manager
1223 .tables(catalog, &schema, Some(query_ctx))
1224 .next()
1225 .await
1226 {
1227 let table = table.context(CatalogSnafu)?;
1228 for column in table.field_columns() {
1229 field_columns.insert(column.name);
1230 }
1231 }
1232 return Ok(field_columns);
1233 }
1234
1235 for table_name in matches {
1236 let table = manager
1237 .table(catalog, &schema, &table_name, Some(query_ctx))
1238 .await
1239 .context(CatalogSnafu)?
1240 .with_context(|| TableNotFoundSnafu {
1241 catalog: catalog.to_string(),
1242 schema: schema.clone(),
1243 table: table_name.clone(),
1244 })?;
1245
1246 for column in table.field_columns() {
1247 field_columns.insert(column.name);
1248 }
1249 }
1250 Ok(field_columns)
1251}
1252
1253async fn retrieve_schema_names(
1254 query_ctx: &QueryContext,
1255 catalog_manager: CatalogManagerRef,
1256 matches: Vec<String>,
1257) -> Result<Vec<String>> {
1258 let mut schemas = Vec::new();
1259 let catalog = query_ctx.current_catalog();
1260
1261 let candidate_schemas = catalog_manager
1262 .schema_names(catalog, Some(query_ctx))
1263 .await
1264 .context(CatalogSnafu)?;
1265
1266 for schema in candidate_schemas {
1267 let mut found = true;
1268 for match_item in &matches {
1269 if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1270 let exists = catalog_manager
1271 .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1272 .await
1273 .context(CatalogSnafu)?;
1274 if !exists {
1275 found = false;
1276 break;
1277 }
1278 }
1279 }
1280
1281 if found {
1282 schemas.push(schema);
1283 }
1284 }
1285
1286 schemas.sort_unstable();
1287
1288 Ok(schemas)
1289}
1290
1291fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1296 let promql_expr = promql_parser::parser::parse(query).ok()?;
1297 promql_expr_to_metric_name(&promql_expr)
1298}
1299
1300#[derive(Debug, Default, Serialize, Deserialize)]
1301pub struct SeriesQuery {
1302 start: Option<String>,
1303 end: Option<String>,
1304 lookback: Option<String>,
1305 #[serde(flatten)]
1306 matches: Matches,
1307 db: Option<String>,
1308}
1309
1310#[axum_macros::debug_handler]
1311#[tracing::instrument(
1312 skip_all,
1313 fields(protocol = "prometheus", request_type = "series_query")
1314)]
1315pub async fn series_query(
1316 State(handler): State<PrometheusHandlerRef>,
1317 Query(params): Query<SeriesQuery>,
1318 Extension(mut query_ctx): Extension<QueryContext>,
1319 Form(form_params): Form<SeriesQuery>,
1320) -> PrometheusJsonResponse {
1321 let mut queries: Vec<String> = params.matches.0;
1322 if queries.is_empty() {
1323 queries = form_params.matches.0;
1324 }
1325 if queries.is_empty() {
1326 return PrometheusJsonResponse::error(
1327 StatusCode::Unsupported,
1328 "match[] parameter is required",
1329 );
1330 }
1331 let start = params
1332 .start
1333 .or(form_params.start)
1334 .unwrap_or_else(yesterday_rfc3339);
1335 let end = params
1336 .end
1337 .or(form_params.end)
1338 .unwrap_or_else(current_time_rfc3339);
1339 let lookback = params
1340 .lookback
1341 .or(form_params.lookback)
1342 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1343
1344 if let Some(db) = ¶ms.db {
1346 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1347 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1348 }
1349 let query_ctx = Arc::new(query_ctx);
1350
1351 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1352 .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1353 .start_timer();
1354
1355 let mut series = Vec::new();
1356 let mut merge_map = HashMap::new();
1357 for query in queries {
1358 let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1359 let prom_query = PromQuery {
1360 query,
1361 start: start.clone(),
1362 end: end.clone(),
1363 step: DEFAULT_LOOKBACK_STRING.to_string(),
1365 lookback: lookback.clone(),
1366 alias: None,
1367 };
1368 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1369
1370 handle_schema_err!(
1371 retrieve_series_from_query_result(
1372 result,
1373 &mut series,
1374 &query_ctx,
1375 &table_name,
1376 &handler.catalog_manager(),
1377 &mut merge_map,
1378 )
1379 .await
1380 );
1381 }
1382 let merge_map = merge_map
1383 .into_iter()
1384 .map(|(k, v)| (k, Value::from(v)))
1385 .collect();
1386 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1387 resp.resp_metrics = merge_map;
1388 resp
1389}
1390
1391#[derive(Debug, Default, Serialize, Deserialize)]
1392pub struct ParseQuery {
1393 query: Option<String>,
1394 db: Option<String>,
1395}
1396
1397#[axum_macros::debug_handler]
1398#[tracing::instrument(
1399 skip_all,
1400 fields(protocol = "prometheus", request_type = "parse_query")
1401)]
1402pub async fn parse_query(
1403 State(_handler): State<PrometheusHandlerRef>,
1404 Query(params): Query<ParseQuery>,
1405 Extension(_query_ctx): Extension<QueryContext>,
1406 Form(form_params): Form<ParseQuery>,
1407) -> PrometheusJsonResponse {
1408 if let Some(query) = params.query.or(form_params.query) {
1409 let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1410 PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1411 } else {
1412 PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1413 }
1414}
1415
1416#[cfg(test)]
1417mod tests {
1418 use promql_parser::parser::value::ValueType;
1419
1420 use super::*;
1421
1422 struct TestCase {
1423 name: &'static str,
1424 promql: &'static str,
1425 expected_metric: Option<&'static str>,
1426 expected_type: ValueType,
1427 should_error: bool,
1428 }
1429
1430 #[test]
1431 fn test_retrieve_metric_name_and_result_type() {
1432 let test_cases = &[
1433 TestCase {
1435 name: "simple metric",
1436 promql: "cpu_usage",
1437 expected_metric: Some("cpu_usage"),
1438 expected_type: ValueType::Vector,
1439 should_error: false,
1440 },
1441 TestCase {
1442 name: "metric with selector",
1443 promql: r#"cpu_usage{instance="localhost"}"#,
1444 expected_metric: Some("cpu_usage"),
1445 expected_type: ValueType::Vector,
1446 should_error: false,
1447 },
1448 TestCase {
1449 name: "metric with range selector",
1450 promql: "cpu_usage[5m]",
1451 expected_metric: Some("cpu_usage"),
1452 expected_type: ValueType::Matrix,
1453 should_error: false,
1454 },
1455 TestCase {
1456 name: "metric with __name__ matcher",
1457 promql: r#"{__name__="cpu_usage"}"#,
1458 expected_metric: Some("cpu_usage"),
1459 expected_type: ValueType::Vector,
1460 should_error: false,
1461 },
1462 TestCase {
1463 name: "metric with unary operator",
1464 promql: "-cpu_usage",
1465 expected_metric: None,
1466 expected_type: ValueType::Vector,
1467 should_error: false,
1468 },
1469 TestCase {
1471 name: "metric with aggregation",
1472 promql: "sum(cpu_usage)",
1473 expected_metric: Some("cpu_usage"),
1474 expected_type: ValueType::Vector,
1475 should_error: false,
1476 },
1477 TestCase {
1478 name: "complex aggregation",
1479 promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1480 expected_metric: None,
1481 expected_type: ValueType::Vector,
1482 should_error: false,
1483 },
1484 TestCase {
1485 name: "complex aggregation",
1486 promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1487 expected_metric: Some("cpu_usage"),
1488 expected_type: ValueType::Vector,
1489 should_error: false,
1490 },
1491 TestCase {
1492 name: "complex aggregation",
1493 promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1494 expected_metric: Some("cpu_usage"),
1495 expected_type: ValueType::Vector,
1496 should_error: false,
1497 },
1498 TestCase {
1500 name: "same metric addition",
1501 promql: "cpu_usage + cpu_usage",
1502 expected_metric: None,
1503 expected_type: ValueType::Vector,
1504 should_error: false,
1505 },
1506 TestCase {
1507 name: "metric with scalar addition",
1508 promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1509 expected_metric: None,
1510 expected_type: ValueType::Vector,
1511 should_error: false,
1512 },
1513 TestCase {
1515 name: "different metrics addition",
1516 promql: "cpu_usage + memory_usage",
1517 expected_metric: None,
1518 expected_type: ValueType::Vector,
1519 should_error: false,
1520 },
1521 TestCase {
1522 name: "different metrics subtraction",
1523 promql: "network_in - network_out",
1524 expected_metric: None,
1525 expected_type: ValueType::Vector,
1526 should_error: false,
1527 },
1528 TestCase {
1530 name: "unless with different metrics",
1531 promql: "cpu_usage unless memory_usage",
1532 expected_metric: Some("cpu_usage"),
1533 expected_type: ValueType::Vector,
1534 should_error: false,
1535 },
1536 TestCase {
1537 name: "unless with same metric",
1538 promql: "cpu_usage unless cpu_usage",
1539 expected_metric: Some("cpu_usage"),
1540 expected_type: ValueType::Vector,
1541 should_error: false,
1542 },
1543 TestCase {
1545 name: "basic subquery",
1546 promql: "cpu_usage[5m:1m]",
1547 expected_metric: Some("cpu_usage"),
1548 expected_type: ValueType::Matrix,
1549 should_error: false,
1550 },
1551 TestCase {
1552 name: "subquery with multiple metrics",
1553 promql: "(cpu_usage + memory_usage)[5m:1m]",
1554 expected_metric: None,
1555 expected_type: ValueType::Matrix,
1556 should_error: false,
1557 },
1558 TestCase {
1560 name: "scalar value",
1561 promql: "42",
1562 expected_metric: None,
1563 expected_type: ValueType::Scalar,
1564 should_error: false,
1565 },
1566 TestCase {
1567 name: "string literal",
1568 promql: r#""hello world""#,
1569 expected_metric: None,
1570 expected_type: ValueType::String,
1571 should_error: false,
1572 },
1573 TestCase {
1575 name: "invalid syntax",
1576 promql: "cpu_usage{invalid=",
1577 expected_metric: None,
1578 expected_type: ValueType::Vector,
1579 should_error: true,
1580 },
1581 TestCase {
1582 name: "empty query",
1583 promql: "",
1584 expected_metric: None,
1585 expected_type: ValueType::Vector,
1586 should_error: true,
1587 },
1588 TestCase {
1589 name: "malformed brackets",
1590 promql: "cpu_usage[5m",
1591 expected_metric: None,
1592 expected_type: ValueType::Vector,
1593 should_error: true,
1594 },
1595 ];
1596
1597 for test_case in test_cases {
1598 let result = retrieve_metric_name_and_result_type(test_case.promql);
1599
1600 if test_case.should_error {
1601 assert!(
1602 result.is_err(),
1603 "Test '{}' should have failed but succeeded with: {:?}",
1604 test_case.name,
1605 result
1606 );
1607 } else {
1608 let (metric_name, value_type) = result.unwrap_or_else(|e| {
1609 panic!(
1610 "Test '{}' should have succeeded but failed with error: {}",
1611 test_case.name, e
1612 )
1613 });
1614
1615 let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1616 assert_eq!(
1617 metric_name, expected_metric_name,
1618 "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1619 test_case.name, expected_metric_name, metric_name
1620 );
1621
1622 assert_eq!(
1623 value_type, test_case.expected_type,
1624 "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1625 test_case.name, test_case.expected_type, value_type
1626 );
1627 }
1628 }
1629 }
1630}