1use std::collections::{BTreeMap, HashMap, HashSet};
17use std::sync::Arc;
18
19use axum::extract::{Path, Query, State};
20use axum::{Extension, Form};
21use catalog::CatalogManagerRef;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_query::{Output, OutputData};
26use common_recordbatch::RecordBatches;
27use common_telemetry::{debug, tracing};
28use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
29use common_version::OwnedBuildInfo;
30use datatypes::prelude::ConcreteDataType;
31use datatypes::scalars::ScalarVector;
32use datatypes::vectors::Float64Vector;
33use futures::StreamExt;
34use futures::future::join_all;
35use itertools::Itertools;
36use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
37use promql_parser::parser::token::{self};
38use promql_parser::parser::value::ValueType;
39use promql_parser::parser::{
40 AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
41 SubqueryExpr, UnaryExpr, VectorSelector,
42};
43use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
44use query::promql::planner::normalize_matcher;
45use serde::de::{self, MapAccess, Visitor};
46use serde::{Deserialize, Serialize};
47use serde_json::Value;
48use session::context::{QueryContext, QueryContextRef};
49use snafu::{Location, OptionExt, ResultExt};
50use store_api::metric_engine_consts::{
51 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
52};
53
54pub use super::result::prometheus_resp::PrometheusJsonResponse;
55use crate::error::{
56 CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result,
57 TableNotFoundSnafu, UnexpectedResultSnafu,
58};
59use crate::http::header::collect_plan_metrics;
60use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
61use crate::prometheus_handler::PrometheusHandlerRef;
62
63#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
65pub struct PromSeriesVector {
66 pub metric: BTreeMap<String, String>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub value: Option<(f64, String)>,
69}
70
71#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
73pub struct PromSeriesMatrix {
74 pub metric: BTreeMap<String, String>,
75 pub values: Vec<(f64, String)>,
76}
77
78#[derive(Debug, Serialize, Deserialize, PartialEq)]
80#[serde(untagged)]
81pub enum PromQueryResult {
82 Matrix(Vec<PromSeriesMatrix>),
83 Vector(Vec<PromSeriesVector>),
84 Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
85 String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
86}
87
88impl Default for PromQueryResult {
89 fn default() -> Self {
90 PromQueryResult::Matrix(Default::default())
91 }
92}
93
94#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
95pub struct PromData {
96 #[serde(rename = "resultType")]
97 pub result_type: String,
98 pub result: PromQueryResult,
99}
100
101#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
102#[serde(untagged)]
103pub enum PrometheusResponse {
104 PromData(PromData),
105 Labels(Vec<String>),
106 Series(Vec<HashMap<String, String>>),
107 LabelValues(Vec<String>),
108 FormatQuery(String),
109 BuildInfo(OwnedBuildInfo),
110 #[serde(skip_deserializing)]
111 ParseResult(promql_parser::parser::Expr),
112 #[default]
113 None,
114}
115
116impl PrometheusResponse {
117 fn append(&mut self, other: PrometheusResponse) {
121 match (self, other) {
122 (
123 PrometheusResponse::PromData(PromData {
124 result: PromQueryResult::Matrix(lhs),
125 ..
126 }),
127 PrometheusResponse::PromData(PromData {
128 result: PromQueryResult::Matrix(rhs),
129 ..
130 }),
131 ) => {
132 lhs.extend(rhs);
133 }
134
135 (
136 PrometheusResponse::PromData(PromData {
137 result: PromQueryResult::Vector(lhs),
138 ..
139 }),
140 PrometheusResponse::PromData(PromData {
141 result: PromQueryResult::Vector(rhs),
142 ..
143 }),
144 ) => {
145 lhs.extend(rhs);
146 }
147 _ => {
148 }
150 }
151 }
152
153 pub fn is_none(&self) -> bool {
154 matches!(self, PrometheusResponse::None)
155 }
156}
157
158#[derive(Debug, Default, Serialize, Deserialize)]
159pub struct FormatQuery {
160 query: Option<String>,
161}
162
163#[axum_macros::debug_handler]
164#[tracing::instrument(
165 skip_all,
166 fields(protocol = "prometheus", request_type = "format_query")
167)]
168pub async fn format_query(
169 State(_handler): State<PrometheusHandlerRef>,
170 Query(params): Query<InstantQuery>,
171 Extension(_query_ctx): Extension<QueryContext>,
172 Form(form_params): Form<InstantQuery>,
173) -> PrometheusJsonResponse {
174 let query = params.query.or(form_params.query).unwrap_or_default();
175 match promql_parser::parser::parse(&query) {
176 Ok(expr) => {
177 let pretty = expr.prettify();
178 PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
179 }
180 Err(reason) => {
181 let err = InvalidQuerySnafu { reason }.build();
182 PrometheusJsonResponse::error(err.status_code(), err.output_msg())
183 }
184 }
185}
186
187#[derive(Debug, Default, Serialize, Deserialize)]
188pub struct BuildInfoQuery {}
189
190#[axum_macros::debug_handler]
191#[tracing::instrument(
192 skip_all,
193 fields(protocol = "prometheus", request_type = "build_info_query")
194)]
195pub async fn build_info_query() -> PrometheusJsonResponse {
196 let build_info = common_version::build_info().clone();
197 PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
198}
199
200#[derive(Debug, Default, Serialize, Deserialize)]
201pub struct InstantQuery {
202 query: Option<String>,
203 lookback: Option<String>,
204 time: Option<String>,
205 timeout: Option<String>,
206 db: Option<String>,
207}
208
209macro_rules! try_call_return_response {
212 ($handle: expr) => {
213 match $handle {
214 Ok(res) => res,
215 Err(err) => {
216 let msg = err.to_string();
217 return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
218 }
219 }
220 };
221}
222
223#[axum_macros::debug_handler]
224#[tracing::instrument(
225 skip_all,
226 fields(protocol = "prometheus", request_type = "instant_query")
227)]
228pub async fn instant_query(
229 State(handler): State<PrometheusHandlerRef>,
230 Query(params): Query<InstantQuery>,
231 Extension(mut query_ctx): Extension<QueryContext>,
232 Form(form_params): Form<InstantQuery>,
233) -> PrometheusJsonResponse {
234 let time = params
236 .time
237 .or(form_params.time)
238 .unwrap_or_else(current_time_rfc3339);
239 let prom_query = PromQuery {
240 query: params.query.or(form_params.query).unwrap_or_default(),
241 start: time.clone(),
242 end: time,
243 step: "1s".to_string(),
244 lookback: params
245 .lookback
246 .or(form_params.lookback)
247 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
248 };
249
250 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
251
252 if let Some(db) = ¶ms.db {
254 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
255 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
256 }
257 let query_ctx = Arc::new(query_ctx);
258
259 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
260 .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
261 .start_timer();
262
263 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
264 && !name_matchers.is_empty()
265 {
266 debug!("Find metric name matchers: {:?}", name_matchers);
267
268 let metric_names =
269 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
270
271 debug!("Find metric names: {:?}", metric_names);
272
273 if metric_names.is_empty() {
274 let result_type = promql_expr.value_type();
275
276 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
277 result_type: result_type.to_string(),
278 ..Default::default()
279 }));
280 }
281
282 let responses = join_all(metric_names.into_iter().map(|metric| {
283 let mut prom_query = prom_query.clone();
284 let mut promql_expr = promql_expr.clone();
285 let query_ctx = query_ctx.clone();
286 let handler = handler.clone();
287
288 async move {
289 update_metric_name_matcher(&mut promql_expr, &metric);
290 let new_query = promql_expr.to_string();
291 debug!(
292 "Updated promql, before: {}, after: {}",
293 &prom_query.query, new_query
294 );
295 prom_query.query = new_query;
296
297 do_instant_query(&handler, &prom_query, query_ctx).await
298 }
299 }))
300 .await;
301
302 responses
303 .into_iter()
304 .reduce(|mut acc, resp| {
305 acc.data.append(resp.data);
306 acc
307 })
308 .unwrap()
309 } else {
310 do_instant_query(&handler, &prom_query, query_ctx).await
311 }
312}
313
314async fn do_instant_query(
316 handler: &PrometheusHandlerRef,
317 prom_query: &PromQuery,
318 query_ctx: QueryContextRef,
319) -> PrometheusJsonResponse {
320 let result = handler.do_query(prom_query, query_ctx).await;
321 let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
322 Ok((metric_name, result_type)) => (metric_name, result_type),
323 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
324 };
325 PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
326}
327
328#[derive(Debug, Default, Serialize, Deserialize)]
329pub struct RangeQuery {
330 query: Option<String>,
331 start: Option<String>,
332 end: Option<String>,
333 step: Option<String>,
334 lookback: Option<String>,
335 timeout: Option<String>,
336 db: Option<String>,
337}
338
339#[axum_macros::debug_handler]
340#[tracing::instrument(
341 skip_all,
342 fields(protocol = "prometheus", request_type = "range_query")
343)]
344pub async fn range_query(
345 State(handler): State<PrometheusHandlerRef>,
346 Query(params): Query<RangeQuery>,
347 Extension(mut query_ctx): Extension<QueryContext>,
348 Form(form_params): Form<RangeQuery>,
349) -> PrometheusJsonResponse {
350 let prom_query = PromQuery {
351 query: params.query.or(form_params.query).unwrap_or_default(),
352 start: params.start.or(form_params.start).unwrap_or_default(),
353 end: params.end.or(form_params.end).unwrap_or_default(),
354 step: params.step.or(form_params.step).unwrap_or_default(),
355 lookback: params
356 .lookback
357 .or(form_params.lookback)
358 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
359 };
360
361 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
362
363 if let Some(db) = ¶ms.db {
365 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
366 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
367 }
368 let query_ctx = Arc::new(query_ctx);
369 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
370 .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
371 .start_timer();
372
373 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
374 && !name_matchers.is_empty()
375 {
376 debug!("Find metric name matchers: {:?}", name_matchers);
377
378 let metric_names =
379 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
380
381 debug!("Find metric names: {:?}", metric_names);
382
383 if metric_names.is_empty() {
384 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
385 result_type: ValueType::Matrix.to_string(),
386 ..Default::default()
387 }));
388 }
389
390 let responses = join_all(metric_names.into_iter().map(|metric| {
391 let mut prom_query = prom_query.clone();
392 let mut promql_expr = promql_expr.clone();
393 let query_ctx = query_ctx.clone();
394 let handler = handler.clone();
395
396 async move {
397 update_metric_name_matcher(&mut promql_expr, &metric);
398 let new_query = promql_expr.to_string();
399 debug!(
400 "Updated promql, before: {}, after: {}",
401 &prom_query.query, new_query
402 );
403 prom_query.query = new_query;
404
405 do_range_query(&handler, &prom_query, query_ctx).await
406 }
407 }))
408 .await;
409
410 responses
412 .into_iter()
413 .reduce(|mut acc, resp| {
414 acc.data.append(resp.data);
415 acc
416 })
417 .unwrap()
418 } else {
419 do_range_query(&handler, &prom_query, query_ctx).await
420 }
421}
422
423async fn do_range_query(
425 handler: &PrometheusHandlerRef,
426 prom_query: &PromQuery,
427 query_ctx: QueryContextRef,
428) -> PrometheusJsonResponse {
429 let result = handler.do_query(prom_query, query_ctx).await;
430 let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
431 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
432 Ok((metric_name, _)) => metric_name,
433 };
434 PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
435}
436
437#[derive(Debug, Default, Serialize)]
438struct Matches(Vec<String>);
439
440#[derive(Debug, Default, Serialize, Deserialize)]
441pub struct LabelsQuery {
442 start: Option<String>,
443 end: Option<String>,
444 lookback: Option<String>,
445 #[serde(flatten)]
446 matches: Matches,
447 db: Option<String>,
448}
449
450impl<'de> Deserialize<'de> for Matches {
452 fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
453 where
454 D: de::Deserializer<'de>,
455 {
456 struct MatchesVisitor;
457
458 impl<'d> Visitor<'d> for MatchesVisitor {
459 type Value = Vec<String>;
460
461 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
462 formatter.write_str("a string")
463 }
464
465 fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
466 where
467 M: MapAccess<'d>,
468 {
469 let mut matches = Vec::new();
470 while let Some((key, value)) = access.next_entry::<String, String>()? {
471 if key == "match[]" {
472 matches.push(value);
473 }
474 }
475 Ok(matches)
476 }
477 }
478 Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
479 }
480}
481
482macro_rules! handle_schema_err {
489 ($result:expr) => {
490 match $result {
491 Ok(v) => Some(v),
492 Err(err) => {
493 if err.status_code() == StatusCode::TableNotFound
494 || err.status_code() == StatusCode::TableColumnNotFound
495 {
496 None
498 } else {
499 return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
500 }
501 }
502 }
503 };
504}
505
506#[axum_macros::debug_handler]
507#[tracing::instrument(
508 skip_all,
509 fields(protocol = "prometheus", request_type = "labels_query")
510)]
511pub async fn labels_query(
512 State(handler): State<PrometheusHandlerRef>,
513 Query(params): Query<LabelsQuery>,
514 Extension(mut query_ctx): Extension<QueryContext>,
515 Form(form_params): Form<LabelsQuery>,
516) -> PrometheusJsonResponse {
517 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
518 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
519 let query_ctx = Arc::new(query_ctx);
520
521 let mut queries = params.matches.0;
522 if queries.is_empty() {
523 queries = form_params.matches.0;
524 }
525
526 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
527 .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
528 .start_timer();
529
530 let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
532 {
533 Ok(labels) => labels,
534 Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
535 };
536 let _ = labels.insert(METRIC_NAME.to_string());
538
539 if queries.is_empty() {
541 let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
542 labels_vec.sort_unstable();
543 return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
544 }
545
546 let start = params
548 .start
549 .or(form_params.start)
550 .unwrap_or_else(yesterday_rfc3339);
551 let end = params
552 .end
553 .or(form_params.end)
554 .unwrap_or_else(current_time_rfc3339);
555 let lookback = params
556 .lookback
557 .or(form_params.lookback)
558 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
559
560 let mut fetched_labels = HashSet::new();
561 let _ = fetched_labels.insert(METRIC_NAME.to_string());
562
563 let mut merge_map = HashMap::new();
564 for query in queries {
565 let prom_query = PromQuery {
566 query,
567 start: start.clone(),
568 end: end.clone(),
569 step: DEFAULT_LOOKBACK_STRING.to_string(),
570 lookback: lookback.clone(),
571 };
572
573 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
574 handle_schema_err!(
575 retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
576 .await
577 );
578 }
579
580 fetched_labels.retain(|l| labels.contains(l));
582 let _ = labels.insert(METRIC_NAME.to_string());
583
584 let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
585 sorted_labels.sort();
586 let merge_map = merge_map
587 .into_iter()
588 .map(|(k, v)| (k, Value::from(v)))
589 .collect();
590 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
591 resp.resp_metrics = merge_map;
592 resp
593}
594
595async fn get_all_column_names(
597 catalog: &str,
598 schema: &str,
599 manager: &CatalogManagerRef,
600) -> std::result::Result<HashSet<String>, catalog::error::Error> {
601 let table_names = manager.table_names(catalog, schema, None).await?;
602
603 let mut labels = HashSet::new();
604 for table_name in table_names {
605 let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
606 continue;
607 };
608 for column in table.primary_key_columns() {
609 if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
610 && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
611 {
612 labels.insert(column.name);
613 }
614 }
615 }
616
617 Ok(labels)
618}
619
620async fn retrieve_series_from_query_result(
621 result: Result<Output>,
622 series: &mut Vec<HashMap<String, String>>,
623 query_ctx: &QueryContext,
624 table_name: &str,
625 manager: &CatalogManagerRef,
626 metrics: &mut HashMap<String, u64>,
627) -> Result<()> {
628 let result = result?;
629
630 let table = manager
632 .table(
633 query_ctx.current_catalog(),
634 &query_ctx.current_schema(),
635 table_name,
636 Some(query_ctx),
637 )
638 .await
639 .context(CatalogSnafu)?
640 .with_context(|| TableNotFoundSnafu {
641 catalog: query_ctx.current_catalog(),
642 schema: query_ctx.current_schema(),
643 table: table_name,
644 })?;
645 let tag_columns = table
646 .primary_key_columns()
647 .map(|c| c.name)
648 .collect::<HashSet<_>>();
649
650 match result.data {
651 OutputData::RecordBatches(batches) => {
652 record_batches_to_series(batches, series, table_name, &tag_columns)
653 }
654 OutputData::Stream(stream) => {
655 let batches = RecordBatches::try_collect(stream)
656 .await
657 .context(CollectRecordbatchSnafu)?;
658 record_batches_to_series(batches, series, table_name, &tag_columns)
659 }
660 OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
661 reason: "expected data result, but got affected rows".to_string(),
662 location: Location::default(),
663 }),
664 }?;
665
666 if let Some(ref plan) = result.meta.plan {
667 collect_plan_metrics(plan, &mut [metrics]);
668 }
669 Ok(())
670}
671
672async fn retrieve_labels_name_from_query_result(
674 result: Result<Output>,
675 labels: &mut HashSet<String>,
676 metrics: &mut HashMap<String, u64>,
677) -> Result<()> {
678 let result = result?;
679 match result.data {
680 OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
681 OutputData::Stream(stream) => {
682 let batches = RecordBatches::try_collect(stream)
683 .await
684 .context(CollectRecordbatchSnafu)?;
685 record_batches_to_labels_name(batches, labels)
686 }
687 OutputData::AffectedRows(_) => UnexpectedResultSnafu {
688 reason: "expected data result, but got affected rows".to_string(),
689 }
690 .fail(),
691 }?;
692 if let Some(ref plan) = result.meta.plan {
693 collect_plan_metrics(plan, &mut [metrics]);
694 }
695 Ok(())
696}
697
698fn record_batches_to_series(
699 batches: RecordBatches,
700 series: &mut Vec<HashMap<String, String>>,
701 table_name: &str,
702 tag_columns: &HashSet<String>,
703) -> Result<()> {
704 for batch in batches.iter() {
705 let projection = batch
707 .schema
708 .column_schemas()
709 .iter()
710 .enumerate()
711 .filter_map(|(idx, col)| {
712 if tag_columns.contains(&col.name) {
713 Some(idx)
714 } else {
715 None
716 }
717 })
718 .collect::<Vec<_>>();
719 let batch = batch
720 .try_project(&projection)
721 .context(CollectRecordbatchSnafu)?;
722
723 for row in batch.rows() {
724 let mut element: HashMap<String, String> = row
725 .iter()
726 .enumerate()
727 .map(|(idx, column)| {
728 let column_name = batch.schema.column_name_by_index(idx);
729 (column_name.to_string(), column.to_string())
730 })
731 .collect();
732 let _ = element.insert("__name__".to_string(), table_name.to_string());
733 series.push(element);
734 }
735 }
736 Ok(())
737}
738
739fn record_batches_to_labels_name(
741 batches: RecordBatches,
742 labels: &mut HashSet<String>,
743) -> Result<()> {
744 let mut column_indices = Vec::new();
745 let mut field_column_indices = Vec::new();
746 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
747 if let ConcreteDataType::Float64(_) = column.data_type {
748 field_column_indices.push(i);
749 }
750 column_indices.push(i);
751 }
752
753 if field_column_indices.is_empty() {
754 return Err(Error::Internal {
755 err_msg: "no value column found".to_string(),
756 });
757 }
758
759 for batch in batches.iter() {
760 let names = column_indices
761 .iter()
762 .map(|c| batches.schema().column_name_by_index(*c).to_string())
763 .collect::<Vec<_>>();
764
765 let field_columns = field_column_indices
766 .iter()
767 .map(|i| {
768 batch
769 .column(*i)
770 .as_any()
771 .downcast_ref::<Float64Vector>()
772 .unwrap()
773 })
774 .collect::<Vec<_>>();
775
776 for row_index in 0..batch.num_rows() {
777 if field_columns
779 .iter()
780 .all(|c| c.get_data(row_index).is_none())
781 {
782 continue;
783 }
784
785 names.iter().for_each(|name| {
787 let _ = labels.insert(name.to_string());
788 });
789 return Ok(());
790 }
791 }
792 Ok(())
793}
794
795pub(crate) fn retrieve_metric_name_and_result_type(
796 promql: &str,
797) -> Result<(Option<String>, ValueType)> {
798 let promql_expr = promql_parser::parser::parse(promql)
799 .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
800 let metric_name = promql_expr_to_metric_name(&promql_expr);
801 let result_type = promql_expr.value_type();
802
803 Ok((metric_name, result_type))
804}
805
806pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
809 if let Some(db) = db {
810 parse_catalog_and_schema_from_db_string(db)
811 } else {
812 (
813 ctx.current_catalog().to_string(),
814 ctx.current_schema().to_string(),
815 )
816 }
817}
818
819pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
821 if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
822 ctx.set_current_catalog(catalog);
823 ctx.set_current_schema(schema);
824 }
825}
826
827fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
828 let mut metric_names = HashSet::new();
829 collect_metric_names(expr, &mut metric_names);
830
831 if metric_names.len() == 1 {
833 metric_names.into_iter().next()
834 } else {
835 None
836 }
837}
838
839fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
841 match expr {
842 PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
843 match modifier {
844 Some(LabelModifier::Include(labels)) => {
845 if !labels.labels.contains(&METRIC_NAME.to_string()) {
846 metric_names.clear();
847 return;
848 }
849 }
850 Some(LabelModifier::Exclude(labels)) => {
851 if labels.labels.contains(&METRIC_NAME.to_string()) {
852 metric_names.clear();
853 return;
854 }
855 }
856 _ => {}
857 }
858 collect_metric_names(expr, metric_names)
859 }
860 PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
861 PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
862 if matches!(
863 op.id(),
864 token::T_LAND | token::T_LOR | token::T_LUNLESS ) {
868 collect_metric_names(lhs, metric_names)
869 } else {
870 metric_names.clear()
871 }
872 }
873 PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
874 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
875 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
876 if let Some(name) = name {
877 metric_names.insert(name.clone());
878 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
879 metric_names.insert(matcher.value);
880 }
881 }
882 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
883 let VectorSelector { name, matchers, .. } = vs;
884 if let Some(name) = name {
885 metric_names.insert(name.clone());
886 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
887 metric_names.insert(matcher.value);
888 }
889 }
890 PromqlExpr::Call(Call { args, .. }) => {
891 args.args
892 .iter()
893 .for_each(|e| collect_metric_names(e, metric_names));
894 }
895 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
896 }
897}
898
899fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
900where
901 F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
902{
903 match expr {
904 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
905 PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
906 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
907 find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
908 }
909 PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
910 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
911 PromqlExpr::NumberLiteral(_) => None,
912 PromqlExpr::StringLiteral(_) => None,
913 PromqlExpr::Extension(_) => None,
914 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
915 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
916 let VectorSelector { name, matchers, .. } = vs;
917
918 f(name, matchers)
919 }
920 PromqlExpr::Call(Call { args, .. }) => args
921 .args
922 .iter()
923 .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
924 }
925}
926
927fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
929 find_metric_name_and_matchers(expr, |name, matchers| {
930 if name.is_some() {
932 return None;
933 }
934
935 Some(matchers.find_matchers(METRIC_NAME))
937 })
938 .map(|matchers| {
939 matchers
940 .into_iter()
941 .filter(|m| !matches!(m.op, MatchOp::Equal))
942 .map(normalize_matcher)
943 .collect::<Vec<_>>()
944 })
945}
946
947fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
950 match expr {
951 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
952 update_metric_name_matcher(expr, metric_name)
953 }
954 PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
955 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
956 update_metric_name_matcher(lhs, metric_name);
957 update_metric_name_matcher(rhs, metric_name);
958 }
959 PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
960 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
961 update_metric_name_matcher(expr, metric_name)
962 }
963 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
964 if name.is_some() {
965 return;
966 }
967
968 for m in &mut matchers.matchers {
969 if m.name == METRIC_NAME {
970 m.op = MatchOp::Equal;
971 m.value = metric_name.to_string();
972 }
973 }
974 }
975 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
976 let VectorSelector { name, matchers, .. } = vs;
977 if name.is_some() {
978 return;
979 }
980
981 for m in &mut matchers.matchers {
982 if m.name == METRIC_NAME {
983 m.op = MatchOp::Equal;
984 m.value = metric_name.to_string();
985 }
986 }
987 }
988 PromqlExpr::Call(Call { args, .. }) => {
989 args.args.iter_mut().for_each(|e| {
990 update_metric_name_matcher(e, metric_name);
991 });
992 }
993 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
994 }
995}
996
997#[derive(Debug, Default, Serialize, Deserialize)]
998pub struct LabelValueQuery {
999 start: Option<String>,
1000 end: Option<String>,
1001 lookback: Option<String>,
1002 #[serde(flatten)]
1003 matches: Matches,
1004 db: Option<String>,
1005 limit: Option<usize>,
1006}
1007
1008#[axum_macros::debug_handler]
1009#[tracing::instrument(
1010 skip_all,
1011 fields(protocol = "prometheus", request_type = "label_values_query")
1012)]
1013pub async fn label_values_query(
1014 State(handler): State<PrometheusHandlerRef>,
1015 Path(label_name): Path<String>,
1016 Extension(mut query_ctx): Extension<QueryContext>,
1017 Query(params): Query<LabelValueQuery>,
1018) -> PrometheusJsonResponse {
1019 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
1020 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1021 let query_ctx = Arc::new(query_ctx);
1022
1023 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1024 .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1025 .start_timer();
1026
1027 if label_name == METRIC_NAME_LABEL {
1028 let catalog_manager = handler.catalog_manager();
1029
1030 let mut table_names = try_call_return_response!(
1031 retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1032 );
1033
1034 truncate_results(&mut table_names, params.limit);
1035 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1036 } else if label_name == FIELD_NAME_LABEL {
1037 let field_columns = handle_schema_err!(
1038 retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1039 )
1040 .unwrap_or_default();
1041 let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1042 field_columns.sort_unstable();
1043 truncate_results(&mut field_columns, params.limit);
1044 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1045 } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
1046 let catalog_manager = handler.catalog_manager();
1047
1048 let mut schema_names = try_call_return_response!(
1049 retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1050 );
1051 truncate_results(&mut schema_names, params.limit);
1052 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1053 }
1054
1055 let queries = params.matches.0;
1056 if queries.is_empty() {
1057 return PrometheusJsonResponse::error(
1058 StatusCode::InvalidArguments,
1059 "match[] parameter is required",
1060 );
1061 }
1062
1063 let start = params.start.unwrap_or_else(yesterday_rfc3339);
1064 let end = params.end.unwrap_or_else(current_time_rfc3339);
1065 let mut label_values = HashSet::new();
1066
1067 let start = try_call_return_response!(
1068 QueryLanguageParser::parse_promql_timestamp(&start)
1069 .context(ParseTimestampSnafu { timestamp: &start })
1070 );
1071 let end = try_call_return_response!(
1072 QueryLanguageParser::parse_promql_timestamp(&end)
1073 .context(ParseTimestampSnafu { timestamp: &end })
1074 );
1075
1076 for query in queries {
1077 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1078 let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1079 return PrometheusJsonResponse::error(
1080 StatusCode::InvalidArguments,
1081 "expected vector selector",
1082 );
1083 };
1084 let Some(name) = take_metric_name(&mut vector_selector) else {
1085 return PrometheusJsonResponse::error(
1086 StatusCode::InvalidArguments,
1087 "expected metric name",
1088 );
1089 };
1090 let VectorSelector { matchers, .. } = vector_selector;
1091 let matchers = matchers.matchers;
1093 let result = handler
1094 .query_label_values(
1095 name,
1096 label_name.to_string(),
1097 matchers,
1098 start,
1099 end,
1100 &query_ctx,
1101 )
1102 .await;
1103 if let Some(result) = handle_schema_err!(result) {
1104 label_values.extend(result.into_iter());
1105 }
1106 }
1107
1108 let mut label_values: Vec<_> = label_values.into_iter().collect();
1109 label_values.sort_unstable();
1110 truncate_results(&mut label_values, params.limit);
1111
1112 PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1113}
1114
1115fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1116 if let Some(limit) = limit
1117 && limit > 0
1118 && label_values.len() >= limit
1119 {
1120 label_values.truncate(limit);
1121 }
1122}
1123
1124fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1127 if let Some(name) = selector.name.take() {
1128 return Some(name);
1129 }
1130
1131 let (pos, matcher) = selector
1132 .matchers
1133 .matchers
1134 .iter()
1135 .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1136 let name = matcher.value.clone();
1137 selector.matchers.matchers.remove(pos);
1139
1140 Some(name)
1141}
1142
1143async fn retrieve_table_names(
1144 query_ctx: &QueryContext,
1145 catalog_manager: CatalogManagerRef,
1146 matches: Vec<String>,
1147) -> Result<Vec<String>> {
1148 let catalog = query_ctx.current_catalog();
1149 let schema = query_ctx.current_schema();
1150
1151 let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1152 let mut table_names = Vec::new();
1153
1154 let name_matcher = matches
1156 .first()
1157 .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1158 .and_then(|expr| {
1159 if let PromqlExpr::VectorSelector(vector_selector) = expr {
1160 let matchers = vector_selector.matchers.matchers;
1161 for matcher in matchers {
1162 if matcher.name == METRIC_NAME_LABEL {
1163 return Some(matcher);
1164 }
1165 }
1166
1167 None
1168 } else {
1169 None
1170 }
1171 });
1172
1173 while let Some(table) = tables_stream.next().await {
1174 let table = table.context(CatalogSnafu)?;
1175 if !table
1176 .table_info()
1177 .meta
1178 .options
1179 .extra_options
1180 .contains_key(LOGICAL_TABLE_METADATA_KEY)
1181 {
1182 continue;
1184 }
1185
1186 let table_name = &table.table_info().name;
1187
1188 if let Some(matcher) = &name_matcher {
1189 match &matcher.op {
1190 MatchOp::Equal => {
1191 if table_name == &matcher.value {
1192 table_names.push(table_name.clone());
1193 }
1194 }
1195 MatchOp::Re(reg) => {
1196 if reg.is_match(table_name) {
1197 table_names.push(table_name.clone());
1198 }
1199 }
1200 _ => {
1201 table_names.push(table_name.clone());
1204 }
1205 }
1206 } else {
1207 table_names.push(table_name.clone());
1208 }
1209 }
1210
1211 table_names.sort_unstable();
1212 Ok(table_names)
1213}
1214
1215async fn retrieve_field_names(
1216 query_ctx: &QueryContext,
1217 manager: CatalogManagerRef,
1218 matches: Vec<String>,
1219) -> Result<HashSet<String>> {
1220 let mut field_columns = HashSet::new();
1221 let catalog = query_ctx.current_catalog();
1222 let schema = query_ctx.current_schema();
1223
1224 if matches.is_empty() {
1225 while let Some(table) = manager
1227 .tables(catalog, &schema, Some(query_ctx))
1228 .next()
1229 .await
1230 {
1231 let table = table.context(CatalogSnafu)?;
1232 for column in table.field_columns() {
1233 field_columns.insert(column.name);
1234 }
1235 }
1236 return Ok(field_columns);
1237 }
1238
1239 for table_name in matches {
1240 let table = manager
1241 .table(catalog, &schema, &table_name, Some(query_ctx))
1242 .await
1243 .context(CatalogSnafu)?
1244 .with_context(|| TableNotFoundSnafu {
1245 catalog: catalog.to_string(),
1246 schema: schema.to_string(),
1247 table: table_name.to_string(),
1248 })?;
1249
1250 for column in table.field_columns() {
1251 field_columns.insert(column.name);
1252 }
1253 }
1254 Ok(field_columns)
1255}
1256
1257async fn retrieve_schema_names(
1258 query_ctx: &QueryContext,
1259 catalog_manager: CatalogManagerRef,
1260 matches: Vec<String>,
1261) -> Result<Vec<String>> {
1262 let mut schemas = Vec::new();
1263 let catalog = query_ctx.current_catalog();
1264
1265 let candidate_schemas = catalog_manager
1266 .schema_names(catalog, Some(query_ctx))
1267 .await
1268 .context(CatalogSnafu)?;
1269
1270 for schema in candidate_schemas {
1271 let mut found = true;
1272 for match_item in &matches {
1273 if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1274 let exists = catalog_manager
1275 .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1276 .await
1277 .context(CatalogSnafu)?;
1278 if !exists {
1279 found = false;
1280 break;
1281 }
1282 }
1283 }
1284
1285 if found {
1286 schemas.push(schema);
1287 }
1288 }
1289
1290 schemas.sort_unstable();
1291
1292 Ok(schemas)
1293}
1294
1295fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1300 let promql_expr = promql_parser::parser::parse(query).ok()?;
1301 promql_expr_to_metric_name(&promql_expr)
1302}
1303
1304#[derive(Debug, Default, Serialize, Deserialize)]
1305pub struct SeriesQuery {
1306 start: Option<String>,
1307 end: Option<String>,
1308 lookback: Option<String>,
1309 #[serde(flatten)]
1310 matches: Matches,
1311 db: Option<String>,
1312}
1313
1314#[axum_macros::debug_handler]
1315#[tracing::instrument(
1316 skip_all,
1317 fields(protocol = "prometheus", request_type = "series_query")
1318)]
1319pub async fn series_query(
1320 State(handler): State<PrometheusHandlerRef>,
1321 Query(params): Query<SeriesQuery>,
1322 Extension(mut query_ctx): Extension<QueryContext>,
1323 Form(form_params): Form<SeriesQuery>,
1324) -> PrometheusJsonResponse {
1325 let mut queries: Vec<String> = params.matches.0;
1326 if queries.is_empty() {
1327 queries = form_params.matches.0;
1328 }
1329 if queries.is_empty() {
1330 return PrometheusJsonResponse::error(
1331 StatusCode::Unsupported,
1332 "match[] parameter is required",
1333 );
1334 }
1335 let start = params
1336 .start
1337 .or(form_params.start)
1338 .unwrap_or_else(yesterday_rfc3339);
1339 let end = params
1340 .end
1341 .or(form_params.end)
1342 .unwrap_or_else(current_time_rfc3339);
1343 let lookback = params
1344 .lookback
1345 .or(form_params.lookback)
1346 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1347
1348 if let Some(db) = ¶ms.db {
1350 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1351 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1352 }
1353 let query_ctx = Arc::new(query_ctx);
1354
1355 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1356 .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1357 .start_timer();
1358
1359 let mut series = Vec::new();
1360 let mut merge_map = HashMap::new();
1361 for query in queries {
1362 let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1363 let prom_query = PromQuery {
1364 query,
1365 start: start.clone(),
1366 end: end.clone(),
1367 step: DEFAULT_LOOKBACK_STRING.to_string(),
1369 lookback: lookback.clone(),
1370 };
1371 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1372
1373 handle_schema_err!(
1374 retrieve_series_from_query_result(
1375 result,
1376 &mut series,
1377 &query_ctx,
1378 &table_name,
1379 &handler.catalog_manager(),
1380 &mut merge_map,
1381 )
1382 .await
1383 );
1384 }
1385 let merge_map = merge_map
1386 .into_iter()
1387 .map(|(k, v)| (k, Value::from(v)))
1388 .collect();
1389 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1390 resp.resp_metrics = merge_map;
1391 resp
1392}
1393
1394#[derive(Debug, Default, Serialize, Deserialize)]
1395pub struct ParseQuery {
1396 query: Option<String>,
1397 db: Option<String>,
1398}
1399
1400#[axum_macros::debug_handler]
1401#[tracing::instrument(
1402 skip_all,
1403 fields(protocol = "prometheus", request_type = "parse_query")
1404)]
1405pub async fn parse_query(
1406 State(_handler): State<PrometheusHandlerRef>,
1407 Query(params): Query<ParseQuery>,
1408 Extension(_query_ctx): Extension<QueryContext>,
1409 Form(form_params): Form<ParseQuery>,
1410) -> PrometheusJsonResponse {
1411 if let Some(query) = params.query.or(form_params.query) {
1412 let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1413 PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1414 } else {
1415 PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1416 }
1417}
1418
1419#[cfg(test)]
1420mod tests {
1421 use promql_parser::parser::value::ValueType;
1422
1423 use super::*;
1424
1425 struct TestCase {
1426 name: &'static str,
1427 promql: &'static str,
1428 expected_metric: Option<&'static str>,
1429 expected_type: ValueType,
1430 should_error: bool,
1431 }
1432
1433 #[test]
1434 fn test_retrieve_metric_name_and_result_type() {
1435 let test_cases = &[
1436 TestCase {
1438 name: "simple metric",
1439 promql: "cpu_usage",
1440 expected_metric: Some("cpu_usage"),
1441 expected_type: ValueType::Vector,
1442 should_error: false,
1443 },
1444 TestCase {
1445 name: "metric with selector",
1446 promql: r#"cpu_usage{instance="localhost"}"#,
1447 expected_metric: Some("cpu_usage"),
1448 expected_type: ValueType::Vector,
1449 should_error: false,
1450 },
1451 TestCase {
1452 name: "metric with range selector",
1453 promql: "cpu_usage[5m]",
1454 expected_metric: Some("cpu_usage"),
1455 expected_type: ValueType::Matrix,
1456 should_error: false,
1457 },
1458 TestCase {
1459 name: "metric with __name__ matcher",
1460 promql: r#"{__name__="cpu_usage"}"#,
1461 expected_metric: Some("cpu_usage"),
1462 expected_type: ValueType::Vector,
1463 should_error: false,
1464 },
1465 TestCase {
1466 name: "metric with unary operator",
1467 promql: "-cpu_usage",
1468 expected_metric: None,
1469 expected_type: ValueType::Vector,
1470 should_error: false,
1471 },
1472 TestCase {
1474 name: "metric with aggregation",
1475 promql: "sum(cpu_usage)",
1476 expected_metric: Some("cpu_usage"),
1477 expected_type: ValueType::Vector,
1478 should_error: false,
1479 },
1480 TestCase {
1481 name: "complex aggregation",
1482 promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1483 expected_metric: None,
1484 expected_type: ValueType::Vector,
1485 should_error: false,
1486 },
1487 TestCase {
1488 name: "complex aggregation",
1489 promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1490 expected_metric: Some("cpu_usage"),
1491 expected_type: ValueType::Vector,
1492 should_error: false,
1493 },
1494 TestCase {
1495 name: "complex aggregation",
1496 promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1497 expected_metric: Some("cpu_usage"),
1498 expected_type: ValueType::Vector,
1499 should_error: false,
1500 },
1501 TestCase {
1503 name: "same metric addition",
1504 promql: "cpu_usage + cpu_usage",
1505 expected_metric: None,
1506 expected_type: ValueType::Vector,
1507 should_error: false,
1508 },
1509 TestCase {
1510 name: "metric with scalar addition",
1511 promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1512 expected_metric: None,
1513 expected_type: ValueType::Vector,
1514 should_error: false,
1515 },
1516 TestCase {
1518 name: "different metrics addition",
1519 promql: "cpu_usage + memory_usage",
1520 expected_metric: None,
1521 expected_type: ValueType::Vector,
1522 should_error: false,
1523 },
1524 TestCase {
1525 name: "different metrics subtraction",
1526 promql: "network_in - network_out",
1527 expected_metric: None,
1528 expected_type: ValueType::Vector,
1529 should_error: false,
1530 },
1531 TestCase {
1533 name: "unless with different metrics",
1534 promql: "cpu_usage unless memory_usage",
1535 expected_metric: Some("cpu_usage"),
1536 expected_type: ValueType::Vector,
1537 should_error: false,
1538 },
1539 TestCase {
1540 name: "unless with same metric",
1541 promql: "cpu_usage unless cpu_usage",
1542 expected_metric: Some("cpu_usage"),
1543 expected_type: ValueType::Vector,
1544 should_error: false,
1545 },
1546 TestCase {
1548 name: "basic subquery",
1549 promql: "cpu_usage[5m:1m]",
1550 expected_metric: Some("cpu_usage"),
1551 expected_type: ValueType::Matrix,
1552 should_error: false,
1553 },
1554 TestCase {
1555 name: "subquery with multiple metrics",
1556 promql: "(cpu_usage + memory_usage)[5m:1m]",
1557 expected_metric: None,
1558 expected_type: ValueType::Matrix,
1559 should_error: false,
1560 },
1561 TestCase {
1563 name: "scalar value",
1564 promql: "42",
1565 expected_metric: None,
1566 expected_type: ValueType::Scalar,
1567 should_error: false,
1568 },
1569 TestCase {
1570 name: "string literal",
1571 promql: r#""hello world""#,
1572 expected_metric: None,
1573 expected_type: ValueType::String,
1574 should_error: false,
1575 },
1576 TestCase {
1578 name: "invalid syntax",
1579 promql: "cpu_usage{invalid=",
1580 expected_metric: None,
1581 expected_type: ValueType::Vector,
1582 should_error: true,
1583 },
1584 TestCase {
1585 name: "empty query",
1586 promql: "",
1587 expected_metric: None,
1588 expected_type: ValueType::Vector,
1589 should_error: true,
1590 },
1591 TestCase {
1592 name: "malformed brackets",
1593 promql: "cpu_usage[5m",
1594 expected_metric: None,
1595 expected_type: ValueType::Vector,
1596 should_error: true,
1597 },
1598 ];
1599
1600 for test_case in test_cases {
1601 let result = retrieve_metric_name_and_result_type(test_case.promql);
1602
1603 if test_case.should_error {
1604 assert!(
1605 result.is_err(),
1606 "Test '{}' should have failed but succeeded with: {:?}",
1607 test_case.name,
1608 result
1609 );
1610 } else {
1611 let (metric_name, value_type) = result.unwrap_or_else(|e| {
1612 panic!(
1613 "Test '{}' should have succeeded but failed with error: {}",
1614 test_case.name, e
1615 )
1616 });
1617
1618 let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1619 assert_eq!(
1620 metric_name, expected_metric_name,
1621 "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1622 test_case.name, expected_metric_name, metric_name
1623 );
1624
1625 assert_eq!(
1626 value_type, test_case.expected_type,
1627 "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1628 test_case.name, test_case.expected_type, value_type
1629 );
1630 }
1631 }
1632 }
1633}