1use std::collections::{BTreeMap, HashMap, HashSet};
17use std::sync::Arc;
18
19use axum::extract::{Path, Query, State};
20use axum::{Extension, Form};
21use catalog::CatalogManagerRef;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_query::{Output, OutputData};
26use common_recordbatch::RecordBatches;
27use common_telemetry::{debug, tracing};
28use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
29use common_version::OwnedBuildInfo;
30use datatypes::prelude::ConcreteDataType;
31use datatypes::scalars::ScalarVector;
32use datatypes::vectors::Float64Vector;
33use futures::future::join_all;
34use futures::StreamExt;
35use itertools::Itertools;
36use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
37use promql_parser::parser::value::ValueType;
38use promql_parser::parser::{
39 AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
40 UnaryExpr, VectorSelector,
41};
42use query::parser::{PromQuery, QueryLanguageParser, DEFAULT_LOOKBACK_STRING};
43use query::promql::planner::normalize_matcher;
44use serde::de::{self, MapAccess, Visitor};
45use serde::{Deserialize, Serialize};
46use serde_json::Value;
47use session::context::{QueryContext, QueryContextRef};
48use snafu::{Location, OptionExt, ResultExt};
49use store_api::metric_engine_consts::{
50 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY,
51};
52
53pub use super::result::prometheus_resp::PrometheusJsonResponse;
54use crate::error::{
55 CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result,
56 TableNotFoundSnafu, UnexpectedResultSnafu,
57};
58use crate::http::header::collect_plan_metrics;
59use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
60use crate::prometheus_handler::PrometheusHandlerRef;
61
62#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
64pub struct PromSeriesVector {
65 pub metric: BTreeMap<String, String>,
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub value: Option<(f64, String)>,
68}
69
70#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
72pub struct PromSeriesMatrix {
73 pub metric: BTreeMap<String, String>,
74 pub values: Vec<(f64, String)>,
75}
76
77#[derive(Debug, Serialize, Deserialize, PartialEq)]
79#[serde(untagged)]
80pub enum PromQueryResult {
81 Matrix(Vec<PromSeriesMatrix>),
82 Vector(Vec<PromSeriesVector>),
83 Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
84 String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
85}
86
87impl Default for PromQueryResult {
88 fn default() -> Self {
89 PromQueryResult::Matrix(Default::default())
90 }
91}
92
93#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
94pub struct PromData {
95 #[serde(rename = "resultType")]
96 pub result_type: String,
97 pub result: PromQueryResult,
98}
99
100#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
101#[serde(untagged)]
102pub enum PrometheusResponse {
103 PromData(PromData),
104 Labels(Vec<String>),
105 Series(Vec<HashMap<String, String>>),
106 LabelValues(Vec<String>),
107 FormatQuery(String),
108 BuildInfo(OwnedBuildInfo),
109 #[serde(skip_deserializing)]
110 ParseResult(promql_parser::parser::Expr),
111 #[default]
112 None,
113}
114
115impl PrometheusResponse {
116 fn append(&mut self, other: PrometheusResponse) {
120 match (self, other) {
121 (
122 PrometheusResponse::PromData(PromData {
123 result: PromQueryResult::Matrix(lhs),
124 ..
125 }),
126 PrometheusResponse::PromData(PromData {
127 result: PromQueryResult::Matrix(rhs),
128 ..
129 }),
130 ) => {
131 lhs.extend(rhs);
132 }
133
134 (
135 PrometheusResponse::PromData(PromData {
136 result: PromQueryResult::Vector(lhs),
137 ..
138 }),
139 PrometheusResponse::PromData(PromData {
140 result: PromQueryResult::Vector(rhs),
141 ..
142 }),
143 ) => {
144 lhs.extend(rhs);
145 }
146 _ => {
147 }
149 }
150 }
151
152 pub fn is_none(&self) -> bool {
153 matches!(self, PrometheusResponse::None)
154 }
155}
156
157#[derive(Debug, Default, Serialize, Deserialize)]
158pub struct FormatQuery {
159 query: Option<String>,
160}
161
162#[axum_macros::debug_handler]
163#[tracing::instrument(
164 skip_all,
165 fields(protocol = "prometheus", request_type = "format_query")
166)]
167pub async fn format_query(
168 State(_handler): State<PrometheusHandlerRef>,
169 Query(params): Query<InstantQuery>,
170 Extension(_query_ctx): Extension<QueryContext>,
171 Form(form_params): Form<InstantQuery>,
172) -> PrometheusJsonResponse {
173 let query = params.query.or(form_params.query).unwrap_or_default();
174 match promql_parser::parser::parse(&query) {
175 Ok(expr) => {
176 let pretty = expr.prettify();
177 PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
178 }
179 Err(reason) => {
180 let err = InvalidQuerySnafu { reason }.build();
181 PrometheusJsonResponse::error(err.status_code(), err.output_msg())
182 }
183 }
184}
185
186#[derive(Debug, Default, Serialize, Deserialize)]
187pub struct BuildInfoQuery {}
188
189#[axum_macros::debug_handler]
190#[tracing::instrument(
191 skip_all,
192 fields(protocol = "prometheus", request_type = "build_info_query")
193)]
194pub async fn build_info_query() -> PrometheusJsonResponse {
195 let build_info = common_version::build_info().clone();
196 PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
197}
198
199#[derive(Debug, Default, Serialize, Deserialize)]
200pub struct InstantQuery {
201 query: Option<String>,
202 lookback: Option<String>,
203 time: Option<String>,
204 timeout: Option<String>,
205 db: Option<String>,
206}
207
208macro_rules! try_call_return_response {
211 ($handle: expr) => {
212 match $handle {
213 Ok(res) => res,
214 Err(err) => {
215 let msg = err.to_string();
216 return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
217 }
218 }
219 };
220}
221
222#[axum_macros::debug_handler]
223#[tracing::instrument(
224 skip_all,
225 fields(protocol = "prometheus", request_type = "instant_query")
226)]
227pub async fn instant_query(
228 State(handler): State<PrometheusHandlerRef>,
229 Query(params): Query<InstantQuery>,
230 Extension(mut query_ctx): Extension<QueryContext>,
231 Form(form_params): Form<InstantQuery>,
232) -> PrometheusJsonResponse {
233 let time = params
235 .time
236 .or(form_params.time)
237 .unwrap_or_else(current_time_rfc3339);
238 let prom_query = PromQuery {
239 query: params.query.or(form_params.query).unwrap_or_default(),
240 start: time.clone(),
241 end: time,
242 step: "1s".to_string(),
243 lookback: params
244 .lookback
245 .or(form_params.lookback)
246 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
247 };
248
249 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
250
251 if let Some(db) = ¶ms.db {
253 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
254 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
255 }
256 let query_ctx = Arc::new(query_ctx);
257
258 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
259 .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
260 .start_timer();
261
262 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
263 && !name_matchers.is_empty()
264 {
265 debug!("Find metric name matchers: {:?}", name_matchers);
266
267 let metric_names =
268 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
269
270 debug!("Find metric names: {:?}", metric_names);
271
272 if metric_names.is_empty() {
273 let result_type = promql_expr.value_type();
274
275 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
276 result_type: result_type.to_string(),
277 ..Default::default()
278 }));
279 }
280
281 let responses = join_all(metric_names.into_iter().map(|metric| {
282 let mut prom_query = prom_query.clone();
283 let mut promql_expr = promql_expr.clone();
284 let query_ctx = query_ctx.clone();
285 let handler = handler.clone();
286
287 async move {
288 update_metric_name_matcher(&mut promql_expr, &metric);
289 let new_query = promql_expr.to_string();
290 debug!(
291 "Updated promql, before: {}, after: {}",
292 &prom_query.query, new_query
293 );
294 prom_query.query = new_query;
295
296 do_instant_query(&handler, &prom_query, query_ctx).await
297 }
298 }))
299 .await;
300
301 responses
302 .into_iter()
303 .reduce(|mut acc, resp| {
304 acc.data.append(resp.data);
305 acc
306 })
307 .unwrap()
308 } else {
309 do_instant_query(&handler, &prom_query, query_ctx).await
310 }
311}
312
313async fn do_instant_query(
315 handler: &PrometheusHandlerRef,
316 prom_query: &PromQuery,
317 query_ctx: QueryContextRef,
318) -> PrometheusJsonResponse {
319 let result = handler.do_query(prom_query, query_ctx).await;
320 let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
321 Ok((metric_name, result_type)) => (metric_name, result_type),
322 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
323 };
324 PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
325}
326
327#[derive(Debug, Default, Serialize, Deserialize)]
328pub struct RangeQuery {
329 query: Option<String>,
330 start: Option<String>,
331 end: Option<String>,
332 step: Option<String>,
333 lookback: Option<String>,
334 timeout: Option<String>,
335 db: Option<String>,
336}
337
338#[axum_macros::debug_handler]
339#[tracing::instrument(
340 skip_all,
341 fields(protocol = "prometheus", request_type = "range_query")
342)]
343pub async fn range_query(
344 State(handler): State<PrometheusHandlerRef>,
345 Query(params): Query<RangeQuery>,
346 Extension(mut query_ctx): Extension<QueryContext>,
347 Form(form_params): Form<RangeQuery>,
348) -> PrometheusJsonResponse {
349 let prom_query = PromQuery {
350 query: params.query.or(form_params.query).unwrap_or_default(),
351 start: params.start.or(form_params.start).unwrap_or_default(),
352 end: params.end.or(form_params.end).unwrap_or_default(),
353 step: params.step.or(form_params.step).unwrap_or_default(),
354 lookback: params
355 .lookback
356 .or(form_params.lookback)
357 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
358 };
359
360 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
361
362 if let Some(db) = ¶ms.db {
364 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
365 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
366 }
367 let query_ctx = Arc::new(query_ctx);
368 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
369 .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
370 .start_timer();
371
372 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
373 && !name_matchers.is_empty()
374 {
375 debug!("Find metric name matchers: {:?}", name_matchers);
376
377 let metric_names =
378 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
379
380 debug!("Find metric names: {:?}", metric_names);
381
382 if metric_names.is_empty() {
383 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
384 result_type: ValueType::Matrix.to_string(),
385 ..Default::default()
386 }));
387 }
388
389 let responses = join_all(metric_names.into_iter().map(|metric| {
390 let mut prom_query = prom_query.clone();
391 let mut promql_expr = promql_expr.clone();
392 let query_ctx = query_ctx.clone();
393 let handler = handler.clone();
394
395 async move {
396 update_metric_name_matcher(&mut promql_expr, &metric);
397 let new_query = promql_expr.to_string();
398 debug!(
399 "Updated promql, before: {}, after: {}",
400 &prom_query.query, new_query
401 );
402 prom_query.query = new_query;
403
404 do_range_query(&handler, &prom_query, query_ctx).await
405 }
406 }))
407 .await;
408
409 responses
411 .into_iter()
412 .reduce(|mut acc, resp| {
413 acc.data.append(resp.data);
414 acc
415 })
416 .unwrap()
417 } else {
418 do_range_query(&handler, &prom_query, query_ctx).await
419 }
420}
421
422async fn do_range_query(
424 handler: &PrometheusHandlerRef,
425 prom_query: &PromQuery,
426 query_ctx: QueryContextRef,
427) -> PrometheusJsonResponse {
428 let result = handler.do_query(prom_query, query_ctx).await;
429 let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
430 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
431 Ok((metric_name, _)) => metric_name,
432 };
433 PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
434}
435
436#[derive(Debug, Default, Serialize)]
437struct Matches(Vec<String>);
438
439#[derive(Debug, Default, Serialize, Deserialize)]
440pub struct LabelsQuery {
441 start: Option<String>,
442 end: Option<String>,
443 lookback: Option<String>,
444 #[serde(flatten)]
445 matches: Matches,
446 db: Option<String>,
447}
448
449impl<'de> Deserialize<'de> for Matches {
451 fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
452 where
453 D: de::Deserializer<'de>,
454 {
455 struct MatchesVisitor;
456
457 impl<'d> Visitor<'d> for MatchesVisitor {
458 type Value = Vec<String>;
459
460 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
461 formatter.write_str("a string")
462 }
463
464 fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
465 where
466 M: MapAccess<'d>,
467 {
468 let mut matches = Vec::new();
469 while let Some((key, value)) = access.next_entry::<String, String>()? {
470 if key == "match[]" {
471 matches.push(value);
472 }
473 }
474 Ok(matches)
475 }
476 }
477 Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
478 }
479}
480
481macro_rules! handle_schema_err {
488 ($result:expr) => {
489 match $result {
490 Ok(v) => Some(v),
491 Err(err) => {
492 if err.status_code() == StatusCode::TableNotFound
493 || err.status_code() == StatusCode::TableColumnNotFound
494 {
495 None
497 } else {
498 return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
499 }
500 }
501 }
502 };
503}
504
505#[axum_macros::debug_handler]
506#[tracing::instrument(
507 skip_all,
508 fields(protocol = "prometheus", request_type = "labels_query")
509)]
510pub async fn labels_query(
511 State(handler): State<PrometheusHandlerRef>,
512 Query(params): Query<LabelsQuery>,
513 Extension(mut query_ctx): Extension<QueryContext>,
514 Form(form_params): Form<LabelsQuery>,
515) -> PrometheusJsonResponse {
516 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
517 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
518 let query_ctx = Arc::new(query_ctx);
519
520 let mut queries = params.matches.0;
521 if queries.is_empty() {
522 queries = form_params.matches.0;
523 }
524
525 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
526 .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
527 .start_timer();
528
529 let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
531 {
532 Ok(labels) => labels,
533 Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
534 };
535 let _ = labels.insert(METRIC_NAME.to_string());
537
538 if queries.is_empty() {
540 let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
541 labels_vec.sort_unstable();
542 return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
543 }
544
545 let start = params
547 .start
548 .or(form_params.start)
549 .unwrap_or_else(yesterday_rfc3339);
550 let end = params
551 .end
552 .or(form_params.end)
553 .unwrap_or_else(current_time_rfc3339);
554 let lookback = params
555 .lookback
556 .or(form_params.lookback)
557 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
558
559 let mut fetched_labels = HashSet::new();
560 let _ = fetched_labels.insert(METRIC_NAME.to_string());
561
562 let mut merge_map = HashMap::new();
563 for query in queries {
564 let prom_query = PromQuery {
565 query,
566 start: start.clone(),
567 end: end.clone(),
568 step: DEFAULT_LOOKBACK_STRING.to_string(),
569 lookback: lookback.clone(),
570 };
571
572 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
573 handle_schema_err!(
574 retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
575 .await
576 );
577 }
578
579 fetched_labels.retain(|l| labels.contains(l));
581 let _ = labels.insert(METRIC_NAME.to_string());
582
583 let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
584 sorted_labels.sort();
585 let merge_map = merge_map
586 .into_iter()
587 .map(|(k, v)| (k, Value::from(v)))
588 .collect();
589 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
590 resp.resp_metrics = merge_map;
591 resp
592}
593
594async fn get_all_column_names(
596 catalog: &str,
597 schema: &str,
598 manager: &CatalogManagerRef,
599) -> std::result::Result<HashSet<String>, catalog::error::Error> {
600 let table_names = manager.table_names(catalog, schema, None).await?;
601
602 let mut labels = HashSet::new();
603 for table_name in table_names {
604 let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
605 continue;
606 };
607 for column in table.primary_key_columns() {
608 if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
609 && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
610 {
611 labels.insert(column.name);
612 }
613 }
614 }
615
616 Ok(labels)
617}
618
619async fn retrieve_series_from_query_result(
620 result: Result<Output>,
621 series: &mut Vec<HashMap<String, String>>,
622 query_ctx: &QueryContext,
623 table_name: &str,
624 manager: &CatalogManagerRef,
625 metrics: &mut HashMap<String, u64>,
626) -> Result<()> {
627 let result = result?;
628
629 let table = manager
631 .table(
632 query_ctx.current_catalog(),
633 &query_ctx.current_schema(),
634 table_name,
635 Some(query_ctx),
636 )
637 .await
638 .context(CatalogSnafu)?
639 .with_context(|| TableNotFoundSnafu {
640 catalog: query_ctx.current_catalog(),
641 schema: query_ctx.current_schema(),
642 table: table_name,
643 })?;
644 let tag_columns = table
645 .primary_key_columns()
646 .map(|c| c.name)
647 .collect::<HashSet<_>>();
648
649 match result.data {
650 OutputData::RecordBatches(batches) => {
651 record_batches_to_series(batches, series, table_name, &tag_columns)
652 }
653 OutputData::Stream(stream) => {
654 let batches = RecordBatches::try_collect(stream)
655 .await
656 .context(CollectRecordbatchSnafu)?;
657 record_batches_to_series(batches, series, table_name, &tag_columns)
658 }
659 OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
660 reason: "expected data result, but got affected rows".to_string(),
661 location: Location::default(),
662 }),
663 }?;
664
665 if let Some(ref plan) = result.meta.plan {
666 collect_plan_metrics(plan, &mut [metrics]);
667 }
668 Ok(())
669}
670
671async fn retrieve_labels_name_from_query_result(
673 result: Result<Output>,
674 labels: &mut HashSet<String>,
675 metrics: &mut HashMap<String, u64>,
676) -> Result<()> {
677 let result = result?;
678 match result.data {
679 OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
680 OutputData::Stream(stream) => {
681 let batches = RecordBatches::try_collect(stream)
682 .await
683 .context(CollectRecordbatchSnafu)?;
684 record_batches_to_labels_name(batches, labels)
685 }
686 OutputData::AffectedRows(_) => UnexpectedResultSnafu {
687 reason: "expected data result, but got affected rows".to_string(),
688 }
689 .fail(),
690 }?;
691 if let Some(ref plan) = result.meta.plan {
692 collect_plan_metrics(plan, &mut [metrics]);
693 }
694 Ok(())
695}
696
697fn record_batches_to_series(
698 batches: RecordBatches,
699 series: &mut Vec<HashMap<String, String>>,
700 table_name: &str,
701 tag_columns: &HashSet<String>,
702) -> Result<()> {
703 for batch in batches.iter() {
704 let projection = batch
706 .schema
707 .column_schemas()
708 .iter()
709 .enumerate()
710 .filter_map(|(idx, col)| {
711 if tag_columns.contains(&col.name) {
712 Some(idx)
713 } else {
714 None
715 }
716 })
717 .collect::<Vec<_>>();
718 let batch = batch
719 .try_project(&projection)
720 .context(CollectRecordbatchSnafu)?;
721
722 for row in batch.rows() {
723 let mut element: HashMap<String, String> = row
724 .iter()
725 .enumerate()
726 .map(|(idx, column)| {
727 let column_name = batch.schema.column_name_by_index(idx);
728 (column_name.to_string(), column.to_string())
729 })
730 .collect();
731 let _ = element.insert("__name__".to_string(), table_name.to_string());
732 series.push(element);
733 }
734 }
735 Ok(())
736}
737
738fn record_batches_to_labels_name(
740 batches: RecordBatches,
741 labels: &mut HashSet<String>,
742) -> Result<()> {
743 let mut column_indices = Vec::new();
744 let mut field_column_indices = Vec::new();
745 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
746 if let ConcreteDataType::Float64(_) = column.data_type {
747 field_column_indices.push(i);
748 }
749 column_indices.push(i);
750 }
751
752 if field_column_indices.is_empty() {
753 return Err(Error::Internal {
754 err_msg: "no value column found".to_string(),
755 });
756 }
757
758 for batch in batches.iter() {
759 let names = column_indices
760 .iter()
761 .map(|c| batches.schema().column_name_by_index(*c).to_string())
762 .collect::<Vec<_>>();
763
764 let field_columns = field_column_indices
765 .iter()
766 .map(|i| {
767 batch
768 .column(*i)
769 .as_any()
770 .downcast_ref::<Float64Vector>()
771 .unwrap()
772 })
773 .collect::<Vec<_>>();
774
775 for row_index in 0..batch.num_rows() {
776 if field_columns
778 .iter()
779 .all(|c| c.get_data(row_index).is_none())
780 {
781 continue;
782 }
783
784 names.iter().for_each(|name| {
786 let _ = labels.insert(name.to_string());
787 });
788 return Ok(());
789 }
790 }
791 Ok(())
792}
793
794pub(crate) fn retrieve_metric_name_and_result_type(
795 promql: &str,
796) -> Result<(Option<String>, ValueType)> {
797 let promql_expr = promql_parser::parser::parse(promql)
798 .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
799 let metric_name = promql_expr_to_metric_name(&promql_expr);
800 let result_type = promql_expr.value_type();
801
802 Ok((metric_name, result_type))
803}
804
805pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
808 if let Some(db) = db {
809 parse_catalog_and_schema_from_db_string(db)
810 } else {
811 (
812 ctx.current_catalog().to_string(),
813 ctx.current_schema().to_string(),
814 )
815 }
816}
817
818pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
820 if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
821 ctx.set_current_catalog(catalog);
822 ctx.set_current_schema(schema);
823 }
824}
825
826fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
827 let mut metric_names = HashSet::new();
828 collect_metric_names(expr, &mut metric_names);
829
830 if metric_names.len() == 1 {
832 metric_names.into_iter().next()
833 } else {
834 None
835 }
836}
837
838fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
840 match expr {
841 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
842 collect_metric_names(expr, metric_names)
843 }
844 PromqlExpr::Unary(UnaryExpr { expr }) => collect_metric_names(expr, metric_names),
845 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
846 collect_metric_names(lhs, metric_names);
847 collect_metric_names(rhs, metric_names);
848 }
849 PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
850 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
851 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
852 if let Some(name) = name {
853 metric_names.insert(name.clone());
854 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
855 metric_names.insert(matcher.value);
856 }
857 }
858 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
859 let VectorSelector { name, matchers, .. } = vs;
860 if let Some(name) = name {
861 metric_names.insert(name.clone());
862 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
863 metric_names.insert(matcher.value);
864 }
865 }
866 PromqlExpr::Call(Call { args, .. }) => {
867 args.args
868 .iter()
869 .for_each(|e| collect_metric_names(e, metric_names));
870 }
871 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
872 }
873}
874
875fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
876where
877 F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
878{
879 match expr {
880 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
881 PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
882 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
883 find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
884 }
885 PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
886 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
887 PromqlExpr::NumberLiteral(_) => None,
888 PromqlExpr::StringLiteral(_) => None,
889 PromqlExpr::Extension(_) => None,
890 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
891 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
892 let VectorSelector { name, matchers, .. } = vs;
893
894 f(name, matchers)
895 }
896 PromqlExpr::Call(Call { args, .. }) => args
897 .args
898 .iter()
899 .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
900 }
901}
902
903fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
905 find_metric_name_and_matchers(expr, |name, matchers| {
906 if name.is_some() {
908 return None;
909 }
910
911 Some(matchers.find_matchers(METRIC_NAME))
913 })
914 .map(|matchers| {
915 matchers
916 .into_iter()
917 .filter(|m| !matches!(m.op, MatchOp::Equal))
918 .map(normalize_matcher)
919 .collect::<Vec<_>>()
920 })
921}
922
923fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
926 match expr {
927 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
928 update_metric_name_matcher(expr, metric_name)
929 }
930 PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
931 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
932 update_metric_name_matcher(lhs, metric_name);
933 update_metric_name_matcher(rhs, metric_name);
934 }
935 PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
936 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
937 update_metric_name_matcher(expr, metric_name)
938 }
939 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
940 if name.is_some() {
941 return;
942 }
943
944 for m in &mut matchers.matchers {
945 if m.name == METRIC_NAME {
946 m.op = MatchOp::Equal;
947 m.value = metric_name.to_string();
948 }
949 }
950 }
951 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
952 let VectorSelector { name, matchers, .. } = vs;
953 if name.is_some() {
954 return;
955 }
956
957 for m in &mut matchers.matchers {
958 if m.name == METRIC_NAME {
959 m.op = MatchOp::Equal;
960 m.value = metric_name.to_string();
961 }
962 }
963 }
964 PromqlExpr::Call(Call { args, .. }) => {
965 args.args.iter_mut().for_each(|e| {
966 update_metric_name_matcher(e, metric_name);
967 });
968 }
969 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
970 }
971}
972
973#[derive(Debug, Default, Serialize, Deserialize)]
974pub struct LabelValueQuery {
975 start: Option<String>,
976 end: Option<String>,
977 lookback: Option<String>,
978 #[serde(flatten)]
979 matches: Matches,
980 db: Option<String>,
981 limit: Option<usize>,
982}
983
984#[axum_macros::debug_handler]
985#[tracing::instrument(
986 skip_all,
987 fields(protocol = "prometheus", request_type = "label_values_query")
988)]
989pub async fn label_values_query(
990 State(handler): State<PrometheusHandlerRef>,
991 Path(label_name): Path<String>,
992 Extension(mut query_ctx): Extension<QueryContext>,
993 Query(params): Query<LabelValueQuery>,
994) -> PrometheusJsonResponse {
995 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
996 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
997 let query_ctx = Arc::new(query_ctx);
998
999 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1000 .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1001 .start_timer();
1002
1003 if label_name == METRIC_NAME_LABEL {
1004 let catalog_manager = handler.catalog_manager();
1005 let mut tables_stream = catalog_manager.tables(&catalog, &schema, Some(&query_ctx));
1006 let mut table_names = Vec::new();
1007 while let Some(table) = tables_stream.next().await {
1008 match table {
1010 Ok(table) => {
1011 if table
1012 .table_info()
1013 .meta
1014 .options
1015 .extra_options
1016 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1017 {
1018 continue;
1019 }
1020
1021 table_names.push(table.table_info().name.clone());
1022 }
1023 Err(e) => {
1024 return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
1025 }
1026 }
1027 }
1028 table_names.sort_unstable();
1029 truncate_results(&mut table_names, params.limit);
1030 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1031 } else if label_name == FIELD_NAME_LABEL {
1032 let field_columns = handle_schema_err!(
1033 retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1034 )
1035 .unwrap_or_default();
1036 let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1037 field_columns.sort_unstable();
1038 truncate_results(&mut field_columns, params.limit);
1039 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1040 } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
1041 let catalog_manager = handler.catalog_manager();
1042
1043 match retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await {
1044 Ok(mut schema_names) => {
1045 truncate_results(&mut schema_names, params.limit);
1046 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(
1047 schema_names,
1048 ));
1049 }
1050 Err(e) => {
1051 return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
1052 }
1053 }
1054 }
1055
1056 let queries = params.matches.0;
1057 if queries.is_empty() {
1058 return PrometheusJsonResponse::error(
1059 StatusCode::InvalidArguments,
1060 "match[] parameter is required",
1061 );
1062 }
1063
1064 let start = params.start.unwrap_or_else(yesterday_rfc3339);
1065 let end = params.end.unwrap_or_else(current_time_rfc3339);
1066 let mut label_values = HashSet::new();
1067
1068 let start = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&start)
1069 .context(ParseTimestampSnafu { timestamp: &start }));
1070 let end = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&end)
1071 .context(ParseTimestampSnafu { timestamp: &end }));
1072
1073 for query in queries {
1074 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1075 let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1076 return PrometheusJsonResponse::error(
1077 StatusCode::InvalidArguments,
1078 "expected vector selector",
1079 );
1080 };
1081 let Some(name) = take_metric_name(&mut vector_selector) else {
1082 return PrometheusJsonResponse::error(
1083 StatusCode::InvalidArguments,
1084 "expected metric name",
1085 );
1086 };
1087 let VectorSelector { matchers, .. } = vector_selector;
1088 let matchers = matchers.matchers;
1090 let result = handler
1091 .query_label_values(
1092 name,
1093 label_name.to_string(),
1094 matchers,
1095 start,
1096 end,
1097 &query_ctx,
1098 )
1099 .await;
1100 if let Some(result) = handle_schema_err!(result) {
1101 label_values.extend(result.into_iter());
1102 }
1103 }
1104
1105 let mut label_values: Vec<_> = label_values.into_iter().collect();
1106 label_values.sort_unstable();
1107 truncate_results(&mut label_values, params.limit);
1108
1109 PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1110}
1111
1112fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1113 if let Some(limit) = limit {
1114 if limit > 0 && label_values.len() >= limit {
1115 label_values.truncate(limit);
1116 }
1117 }
1118}
1119
1120fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1123 if let Some(name) = selector.name.take() {
1124 return Some(name);
1125 }
1126
1127 let (pos, matcher) = selector
1128 .matchers
1129 .matchers
1130 .iter()
1131 .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1132 let name = matcher.value.clone();
1133 selector.matchers.matchers.remove(pos);
1135
1136 Some(name)
1137}
1138
1139async fn retrieve_field_names(
1140 query_ctx: &QueryContext,
1141 manager: CatalogManagerRef,
1142 matches: Vec<String>,
1143) -> Result<HashSet<String>> {
1144 let mut field_columns = HashSet::new();
1145 let catalog = query_ctx.current_catalog();
1146 let schema = query_ctx.current_schema();
1147
1148 if matches.is_empty() {
1149 while let Some(table) = manager
1151 .tables(catalog, &schema, Some(query_ctx))
1152 .next()
1153 .await
1154 {
1155 let table = table.context(CatalogSnafu)?;
1156 for column in table.field_columns() {
1157 field_columns.insert(column.name);
1158 }
1159 }
1160 return Ok(field_columns);
1161 }
1162
1163 for table_name in matches {
1164 let table = manager
1165 .table(catalog, &schema, &table_name, Some(query_ctx))
1166 .await
1167 .context(CatalogSnafu)?
1168 .with_context(|| TableNotFoundSnafu {
1169 catalog: catalog.to_string(),
1170 schema: schema.to_string(),
1171 table: table_name.to_string(),
1172 })?;
1173
1174 for column in table.field_columns() {
1175 field_columns.insert(column.name);
1176 }
1177 }
1178 Ok(field_columns)
1179}
1180
1181async fn retrieve_schema_names(
1182 query_ctx: &QueryContext,
1183 catalog_manager: CatalogManagerRef,
1184 matches: Vec<String>,
1185) -> Result<Vec<String>> {
1186 let mut schemas = Vec::new();
1187 let catalog = query_ctx.current_catalog();
1188
1189 let candidate_schemas = catalog_manager
1190 .schema_names(catalog, Some(query_ctx))
1191 .await
1192 .context(CatalogSnafu)?;
1193
1194 for schema in candidate_schemas {
1195 let mut found = true;
1196 for match_item in &matches {
1197 if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1198 let exists = catalog_manager
1199 .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1200 .await
1201 .context(CatalogSnafu)?;
1202 if !exists {
1203 found = false;
1204 break;
1205 }
1206 }
1207 }
1208
1209 if found {
1210 schemas.push(schema);
1211 }
1212 }
1213
1214 schemas.sort_unstable();
1215
1216 Ok(schemas)
1217}
1218
1219fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1224 let promql_expr = promql_parser::parser::parse(query).ok()?;
1225 promql_expr_to_metric_name(&promql_expr)
1226}
1227
1228#[derive(Debug, Default, Serialize, Deserialize)]
1229pub struct SeriesQuery {
1230 start: Option<String>,
1231 end: Option<String>,
1232 lookback: Option<String>,
1233 #[serde(flatten)]
1234 matches: Matches,
1235 db: Option<String>,
1236}
1237
1238#[axum_macros::debug_handler]
1239#[tracing::instrument(
1240 skip_all,
1241 fields(protocol = "prometheus", request_type = "series_query")
1242)]
1243pub async fn series_query(
1244 State(handler): State<PrometheusHandlerRef>,
1245 Query(params): Query<SeriesQuery>,
1246 Extension(mut query_ctx): Extension<QueryContext>,
1247 Form(form_params): Form<SeriesQuery>,
1248) -> PrometheusJsonResponse {
1249 let mut queries: Vec<String> = params.matches.0;
1250 if queries.is_empty() {
1251 queries = form_params.matches.0;
1252 }
1253 if queries.is_empty() {
1254 return PrometheusJsonResponse::error(
1255 StatusCode::Unsupported,
1256 "match[] parameter is required",
1257 );
1258 }
1259 let start = params
1260 .start
1261 .or(form_params.start)
1262 .unwrap_or_else(yesterday_rfc3339);
1263 let end = params
1264 .end
1265 .or(form_params.end)
1266 .unwrap_or_else(current_time_rfc3339);
1267 let lookback = params
1268 .lookback
1269 .or(form_params.lookback)
1270 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1271
1272 if let Some(db) = ¶ms.db {
1274 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1275 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1276 }
1277 let query_ctx = Arc::new(query_ctx);
1278
1279 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1280 .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1281 .start_timer();
1282
1283 let mut series = Vec::new();
1284 let mut merge_map = HashMap::new();
1285 for query in queries {
1286 let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1287 let prom_query = PromQuery {
1288 query,
1289 start: start.clone(),
1290 end: end.clone(),
1291 step: DEFAULT_LOOKBACK_STRING.to_string(),
1293 lookback: lookback.clone(),
1294 };
1295 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1296
1297 handle_schema_err!(
1298 retrieve_series_from_query_result(
1299 result,
1300 &mut series,
1301 &query_ctx,
1302 &table_name,
1303 &handler.catalog_manager(),
1304 &mut merge_map,
1305 )
1306 .await
1307 );
1308 }
1309 let merge_map = merge_map
1310 .into_iter()
1311 .map(|(k, v)| (k, Value::from(v)))
1312 .collect();
1313 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1314 resp.resp_metrics = merge_map;
1315 resp
1316}
1317
1318#[derive(Debug, Default, Serialize, Deserialize)]
1319pub struct ParseQuery {
1320 query: Option<String>,
1321 db: Option<String>,
1322}
1323
1324#[axum_macros::debug_handler]
1325#[tracing::instrument(
1326 skip_all,
1327 fields(protocol = "prometheus", request_type = "parse_query")
1328)]
1329pub async fn parse_query(
1330 State(_handler): State<PrometheusHandlerRef>,
1331 Query(params): Query<ParseQuery>,
1332 Extension(_query_ctx): Extension<QueryContext>,
1333 Form(form_params): Form<ParseQuery>,
1334) -> PrometheusJsonResponse {
1335 if let Some(query) = params.query.or(form_params.query) {
1336 let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1337 PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1338 } else {
1339 PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1340 }
1341}
1342
1343#[cfg(test)]
1344mod tests {
1345 use promql_parser::parser::value::ValueType;
1346
1347 use super::*;
1348
1349 struct TestCase {
1350 name: &'static str,
1351 promql: &'static str,
1352 expected_metric: Option<&'static str>,
1353 expected_type: ValueType,
1354 should_error: bool,
1355 }
1356
1357 #[test]
1358 fn test_retrieve_metric_name_and_result_type() {
1359 let test_cases = &[
1360 TestCase {
1362 name: "simple metric",
1363 promql: "cpu_usage",
1364 expected_metric: Some("cpu_usage"),
1365 expected_type: ValueType::Vector,
1366 should_error: false,
1367 },
1368 TestCase {
1369 name: "metric with selector",
1370 promql: r#"cpu_usage{instance="localhost"}"#,
1371 expected_metric: Some("cpu_usage"),
1372 expected_type: ValueType::Vector,
1373 should_error: false,
1374 },
1375 TestCase {
1376 name: "metric with range selector",
1377 promql: "cpu_usage[5m]",
1378 expected_metric: Some("cpu_usage"),
1379 expected_type: ValueType::Matrix,
1380 should_error: false,
1381 },
1382 TestCase {
1383 name: "metric with __name__ matcher",
1384 promql: r#"{__name__="cpu_usage"}"#,
1385 expected_metric: Some("cpu_usage"),
1386 expected_type: ValueType::Vector,
1387 should_error: false,
1388 },
1389 TestCase {
1390 name: "metric with unary operator",
1391 promql: "-cpu_usage",
1392 expected_metric: Some("cpu_usage"),
1393 expected_type: ValueType::Vector,
1394 should_error: false,
1395 },
1396 TestCase {
1398 name: "metric with aggregation",
1399 promql: "sum(cpu_usage)",
1400 expected_metric: Some("cpu_usage"),
1401 expected_type: ValueType::Vector,
1402 should_error: false,
1403 },
1404 TestCase {
1405 name: "complex aggregation",
1406 promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1407 expected_metric: Some("cpu_usage"),
1408 expected_type: ValueType::Vector,
1409 should_error: false,
1410 },
1411 TestCase {
1413 name: "same metric addition",
1414 promql: "cpu_usage + cpu_usage",
1415 expected_metric: Some("cpu_usage"),
1416 expected_type: ValueType::Vector,
1417 should_error: false,
1418 },
1419 TestCase {
1420 name: "metric with scalar addition",
1421 promql: r#"sum(rate(cpu_usage{job="node"}[5m])) by (instance) + 100"#,
1422 expected_metric: Some("cpu_usage"),
1423 expected_type: ValueType::Vector,
1424 should_error: false,
1425 },
1426 TestCase {
1428 name: "different metrics addition",
1429 promql: "cpu_usage + memory_usage",
1430 expected_metric: None,
1431 expected_type: ValueType::Vector,
1432 should_error: false,
1433 },
1434 TestCase {
1435 name: "different metrics subtraction",
1436 promql: "network_in - network_out",
1437 expected_metric: None,
1438 expected_type: ValueType::Vector,
1439 should_error: false,
1440 },
1441 TestCase {
1443 name: "unless with different metrics",
1444 promql: "cpu_usage unless memory_usage",
1445 expected_metric: None,
1446 expected_type: ValueType::Vector,
1447 should_error: false,
1448 },
1449 TestCase {
1450 name: "unless with same metric",
1451 promql: "cpu_usage unless cpu_usage",
1452 expected_metric: Some("cpu_usage"),
1453 expected_type: ValueType::Vector,
1454 should_error: false,
1455 },
1456 TestCase {
1458 name: "basic subquery",
1459 promql: "cpu_usage[5m:1m]",
1460 expected_metric: Some("cpu_usage"),
1461 expected_type: ValueType::Matrix,
1462 should_error: false,
1463 },
1464 TestCase {
1465 name: "subquery with multiple metrics",
1466 promql: "(cpu_usage + memory_usage)[5m:1m]",
1467 expected_metric: None,
1468 expected_type: ValueType::Matrix,
1469 should_error: false,
1470 },
1471 TestCase {
1473 name: "scalar value",
1474 promql: "42",
1475 expected_metric: None,
1476 expected_type: ValueType::Scalar,
1477 should_error: false,
1478 },
1479 TestCase {
1480 name: "string literal",
1481 promql: r#""hello world""#,
1482 expected_metric: None,
1483 expected_type: ValueType::String,
1484 should_error: false,
1485 },
1486 TestCase {
1488 name: "invalid syntax",
1489 promql: "cpu_usage{invalid=",
1490 expected_metric: None,
1491 expected_type: ValueType::Vector,
1492 should_error: true,
1493 },
1494 TestCase {
1495 name: "empty query",
1496 promql: "",
1497 expected_metric: None,
1498 expected_type: ValueType::Vector,
1499 should_error: true,
1500 },
1501 TestCase {
1502 name: "malformed brackets",
1503 promql: "cpu_usage[5m",
1504 expected_metric: None,
1505 expected_type: ValueType::Vector,
1506 should_error: true,
1507 },
1508 ];
1509
1510 for test_case in test_cases {
1511 let result = retrieve_metric_name_and_result_type(test_case.promql);
1512
1513 if test_case.should_error {
1514 assert!(
1515 result.is_err(),
1516 "Test '{}' should have failed but succeeded with: {:?}",
1517 test_case.name,
1518 result
1519 );
1520 } else {
1521 let (metric_name, value_type) = result.unwrap_or_else(|e| {
1522 panic!(
1523 "Test '{}' should have succeeded but failed with error: {}",
1524 test_case.name, e
1525 )
1526 });
1527
1528 let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1529 assert_eq!(
1530 metric_name, expected_metric_name,
1531 "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1532 test_case.name, expected_metric_name, metric_name
1533 );
1534
1535 assert_eq!(
1536 value_type, test_case.expected_type,
1537 "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1538 test_case.name, test_case.expected_type, value_type
1539 );
1540 }
1541 }
1542 }
1543}