1use std::borrow::Borrow;
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::hash::{Hash, Hasher};
20use std::sync::Arc;
21
22use arrow::array::{Array, AsArray};
23use arrow::datatypes::{
24 Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
25 Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
26 UInt8Type, UInt16Type, UInt32Type, UInt64Type,
27};
28use arrow_schema::{DataType, IntervalUnit};
29use axum::extract::{Path, Query, State};
30use axum::{Extension, Form};
31use catalog::CatalogManagerRef;
32use common_catalog::parse_catalog_and_schema_from_db_string;
33use common_decimal::Decimal128;
34use common_error::ext::ErrorExt;
35use common_error::status_code::StatusCode;
36use common_query::{Output, OutputData};
37use common_recordbatch::{RecordBatch, RecordBatches};
38use common_telemetry::{debug, tracing};
39use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
40use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
41use common_version::OwnedBuildInfo;
42use datafusion_common::ScalarValue;
43use datatypes::prelude::ConcreteDataType;
44use datatypes::schema::{ColumnSchema, SchemaRef};
45use datatypes::types::jsonb_to_string;
46use futures::StreamExt;
47use futures::future::join_all;
48use itertools::Itertools;
49use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
50use promql_parser::parser::token::{self};
51use promql_parser::parser::value::ValueType;
52use promql_parser::parser::{
53 AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
54 SubqueryExpr, UnaryExpr, VectorSelector,
55};
56use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
57use serde::de::{self, MapAccess, Visitor};
58use serde::{Deserialize, Serialize};
59use serde_json::Value;
60use session::context::{QueryContext, QueryContextRef};
61use snafu::{Location, OptionExt, ResultExt};
62use store_api::metric_engine_consts::{
63 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
64};
65
66pub use super::result::prometheus_resp::PrometheusJsonResponse;
67use crate::error::{
68 CatalogSnafu, CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error,
69 InvalidQuerySnafu, NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu,
70 UnexpectedResultSnafu,
71};
72use crate::http::header::collect_plan_metrics;
73use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL, is_database_selection_label};
74use crate::prometheus_handler::PrometheusHandlerRef;
75
76#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
78pub struct PromSeriesVector {
79 pub metric: BTreeMap<String, String>,
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub value: Option<(f64, String)>,
82}
83
84#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
86pub struct PromSeriesMatrix {
87 pub metric: BTreeMap<String, String>,
88 pub values: Vec<(f64, String)>,
89}
90
91#[derive(Debug, Serialize, Deserialize, PartialEq)]
93#[serde(untagged)]
94pub enum PromQueryResult {
95 Matrix(Vec<PromSeriesMatrix>),
96 Vector(Vec<PromSeriesVector>),
97 Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
98 String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
99}
100
101impl Default for PromQueryResult {
102 fn default() -> Self {
103 PromQueryResult::Matrix(Default::default())
104 }
105}
106
107#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
108pub struct PromData {
109 #[serde(rename = "resultType")]
110 pub result_type: String,
111 pub result: PromQueryResult,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
117pub struct Column(Arc<String>);
118
119impl From<&str> for Column {
120 fn from(s: &str) -> Self {
121 Self(Arc::new(s.to_string()))
122 }
123}
124
125#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
126#[serde(untagged)]
127pub enum PrometheusResponse {
128 PromData(PromData),
129 Labels(Vec<String>),
130 Series(Vec<HashMap<Column, String>>),
131 LabelValues(Vec<String>),
132 FormatQuery(String),
133 BuildInfo(OwnedBuildInfo),
134 #[serde(skip_deserializing)]
135 ParseResult(promql_parser::parser::Expr),
136 #[default]
137 None,
138}
139
140impl PrometheusResponse {
141 fn append(&mut self, other: PrometheusResponse) {
145 match (self, other) {
146 (
147 PrometheusResponse::PromData(PromData {
148 result: PromQueryResult::Matrix(lhs),
149 ..
150 }),
151 PrometheusResponse::PromData(PromData {
152 result: PromQueryResult::Matrix(rhs),
153 ..
154 }),
155 ) => {
156 lhs.extend(rhs);
157 }
158
159 (
160 PrometheusResponse::PromData(PromData {
161 result: PromQueryResult::Vector(lhs),
162 ..
163 }),
164 PrometheusResponse::PromData(PromData {
165 result: PromQueryResult::Vector(rhs),
166 ..
167 }),
168 ) => {
169 lhs.extend(rhs);
170 }
171 _ => {
172 }
174 }
175 }
176
177 pub fn is_none(&self) -> bool {
178 matches!(self, PrometheusResponse::None)
179 }
180}
181
182#[derive(Debug, Default, Serialize, Deserialize)]
183pub struct FormatQuery {
184 query: Option<String>,
185}
186
187#[axum_macros::debug_handler]
188#[tracing::instrument(
189 skip_all,
190 fields(protocol = "prometheus", request_type = "format_query")
191)]
192pub async fn format_query(
193 State(_handler): State<PrometheusHandlerRef>,
194 Query(params): Query<InstantQuery>,
195 Extension(_query_ctx): Extension<QueryContext>,
196 Form(form_params): Form<InstantQuery>,
197) -> PrometheusJsonResponse {
198 let query = params.query.or(form_params.query).unwrap_or_default();
199 match promql_parser::parser::parse(&query) {
200 Ok(expr) => {
201 let pretty = expr.prettify();
202 PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
203 }
204 Err(reason) => {
205 let err = InvalidQuerySnafu { reason }.build();
206 PrometheusJsonResponse::error(err.status_code(), err.output_msg())
207 }
208 }
209}
210
211#[derive(Debug, Default, Serialize, Deserialize)]
212pub struct BuildInfoQuery {}
213
214#[axum_macros::debug_handler]
215#[tracing::instrument(
216 skip_all,
217 fields(protocol = "prometheus", request_type = "build_info_query")
218)]
219pub async fn build_info_query() -> PrometheusJsonResponse {
220 let build_info = common_version::build_info().clone();
221 PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
222}
223
224#[derive(Debug, Default, Serialize, Deserialize)]
225pub struct InstantQuery {
226 query: Option<String>,
227 lookback: Option<String>,
228 time: Option<String>,
229 timeout: Option<String>,
230 db: Option<String>,
231}
232
233macro_rules! try_call_return_response {
236 ($handle: expr) => {
237 match $handle {
238 Ok(res) => res,
239 Err(err) => {
240 let msg = err.to_string();
241 return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
242 }
243 }
244 };
245}
246
247#[axum_macros::debug_handler]
248#[tracing::instrument(
249 skip_all,
250 fields(protocol = "prometheus", request_type = "instant_query")
251)]
252pub async fn instant_query(
253 State(handler): State<PrometheusHandlerRef>,
254 Query(params): Query<InstantQuery>,
255 Extension(mut query_ctx): Extension<QueryContext>,
256 Form(form_params): Form<InstantQuery>,
257) -> PrometheusJsonResponse {
258 let time = params
260 .time
261 .or(form_params.time)
262 .unwrap_or_else(current_time_rfc3339);
263 let prom_query = PromQuery {
264 query: params.query.or(form_params.query).unwrap_or_default(),
265 start: time.clone(),
266 end: time,
267 step: "1s".to_string(),
268 lookback: params
269 .lookback
270 .or(form_params.lookback)
271 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
272 alias: None,
273 };
274
275 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
276
277 if let Some(db) = ¶ms.db {
279 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
280 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
281 }
282 let query_ctx = Arc::new(query_ctx);
283
284 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
285 .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
286 .start_timer();
287
288 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
289 && !name_matchers.is_empty()
290 {
291 debug!("Find metric name matchers: {:?}", name_matchers);
292
293 let metric_names =
294 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
295
296 debug!("Find metric names: {:?}", metric_names);
297
298 if metric_names.is_empty() {
299 let result_type = promql_expr.value_type();
300
301 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
302 result_type: result_type.to_string(),
303 ..Default::default()
304 }));
305 }
306
307 let responses = join_all(metric_names.into_iter().map(|metric| {
308 let mut prom_query = prom_query.clone();
309 let mut promql_expr = promql_expr.clone();
310 let query_ctx = query_ctx.clone();
311 let handler = handler.clone();
312
313 async move {
314 update_metric_name_matcher(&mut promql_expr, &metric);
315 let new_query = promql_expr.to_string();
316 debug!(
317 "Updated promql, before: {}, after: {}",
318 &prom_query.query, new_query
319 );
320 prom_query.query = new_query;
321
322 do_instant_query(&handler, &prom_query, query_ctx).await
323 }
324 }))
325 .await;
326
327 responses
328 .into_iter()
329 .reduce(|mut acc, resp| {
330 acc.data.append(resp.data);
331 acc
332 })
333 .unwrap()
334 } else {
335 do_instant_query(&handler, &prom_query, query_ctx).await
336 }
337}
338
339async fn do_instant_query(
341 handler: &PrometheusHandlerRef,
342 prom_query: &PromQuery,
343 query_ctx: QueryContextRef,
344) -> PrometheusJsonResponse {
345 let result = handler.do_query(prom_query, query_ctx).await;
346 let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
347 Ok((metric_name, result_type)) => (metric_name, result_type),
348 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
349 };
350 PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
351}
352
353#[derive(Debug, Default, Serialize, Deserialize)]
354pub struct RangeQuery {
355 query: Option<String>,
356 start: Option<String>,
357 end: Option<String>,
358 step: Option<String>,
359 lookback: Option<String>,
360 timeout: Option<String>,
361 db: Option<String>,
362}
363
364#[axum_macros::debug_handler]
365#[tracing::instrument(
366 skip_all,
367 fields(protocol = "prometheus", request_type = "range_query")
368)]
369pub async fn range_query(
370 State(handler): State<PrometheusHandlerRef>,
371 Query(params): Query<RangeQuery>,
372 Extension(mut query_ctx): Extension<QueryContext>,
373 Form(form_params): Form<RangeQuery>,
374) -> PrometheusJsonResponse {
375 let prom_query = PromQuery {
376 query: params.query.or(form_params.query).unwrap_or_default(),
377 start: params.start.or(form_params.start).unwrap_or_default(),
378 end: params.end.or(form_params.end).unwrap_or_default(),
379 step: params.step.or(form_params.step).unwrap_or_default(),
380 lookback: params
381 .lookback
382 .or(form_params.lookback)
383 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
384 alias: None,
385 };
386
387 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
388
389 if let Some(db) = ¶ms.db {
391 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
392 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
393 }
394 let query_ctx = Arc::new(query_ctx);
395 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
396 .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
397 .start_timer();
398
399 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
400 && !name_matchers.is_empty()
401 {
402 debug!("Find metric name matchers: {:?}", name_matchers);
403
404 let metric_names =
405 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
406
407 debug!("Find metric names: {:?}", metric_names);
408
409 if metric_names.is_empty() {
410 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
411 result_type: ValueType::Matrix.to_string(),
412 ..Default::default()
413 }));
414 }
415
416 let responses = join_all(metric_names.into_iter().map(|metric| {
417 let mut prom_query = prom_query.clone();
418 let mut promql_expr = promql_expr.clone();
419 let query_ctx = query_ctx.clone();
420 let handler = handler.clone();
421
422 async move {
423 update_metric_name_matcher(&mut promql_expr, &metric);
424 let new_query = promql_expr.to_string();
425 debug!(
426 "Updated promql, before: {}, after: {}",
427 &prom_query.query, new_query
428 );
429 prom_query.query = new_query;
430
431 do_range_query(&handler, &prom_query, query_ctx).await
432 }
433 }))
434 .await;
435
436 responses
438 .into_iter()
439 .reduce(|mut acc, resp| {
440 acc.data.append(resp.data);
441 acc
442 })
443 .unwrap()
444 } else {
445 do_range_query(&handler, &prom_query, query_ctx).await
446 }
447}
448
449async fn do_range_query(
451 handler: &PrometheusHandlerRef,
452 prom_query: &PromQuery,
453 query_ctx: QueryContextRef,
454) -> PrometheusJsonResponse {
455 let result = handler.do_query(prom_query, query_ctx).await;
456 let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
457 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
458 Ok((metric_name, _)) => metric_name,
459 };
460 PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
461}
462
463#[derive(Debug, Default, Serialize)]
464struct Matches(Vec<String>);
465
466#[derive(Debug, Default, Serialize, Deserialize)]
467pub struct LabelsQuery {
468 start: Option<String>,
469 end: Option<String>,
470 lookback: Option<String>,
471 #[serde(flatten)]
472 matches: Matches,
473 db: Option<String>,
474}
475
476impl<'de> Deserialize<'de> for Matches {
478 fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
479 where
480 D: de::Deserializer<'de>,
481 {
482 struct MatchesVisitor;
483
484 impl<'d> Visitor<'d> for MatchesVisitor {
485 type Value = Vec<String>;
486
487 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
488 formatter.write_str("a string")
489 }
490
491 fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
492 where
493 M: MapAccess<'d>,
494 {
495 let mut matches = Vec::new();
496 while let Some((key, value)) = access.next_entry::<String, String>()? {
497 if key == "match[]" {
498 matches.push(value);
499 }
500 }
501 Ok(matches)
502 }
503 }
504 Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
505 }
506}
507
508macro_rules! handle_schema_err {
515 ($result:expr) => {
516 match $result {
517 Ok(v) => Some(v),
518 Err(err) => {
519 if err.status_code() == StatusCode::TableNotFound
520 || err.status_code() == StatusCode::TableColumnNotFound
521 {
522 None
524 } else {
525 return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
526 }
527 }
528 }
529 };
530}
531
532#[axum_macros::debug_handler]
533#[tracing::instrument(
534 skip_all,
535 fields(protocol = "prometheus", request_type = "labels_query")
536)]
537pub async fn labels_query(
538 State(handler): State<PrometheusHandlerRef>,
539 Query(params): Query<LabelsQuery>,
540 Extension(mut query_ctx): Extension<QueryContext>,
541 Form(form_params): Form<LabelsQuery>,
542) -> PrometheusJsonResponse {
543 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
544 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
545 let query_ctx = Arc::new(query_ctx);
546
547 let mut queries = params.matches.0;
548 if queries.is_empty() {
549 queries = form_params.matches.0;
550 }
551
552 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
553 .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
554 .start_timer();
555
556 let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
558 {
559 Ok(labels) => labels,
560 Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
561 };
562 let _ = labels.insert(METRIC_NAME.to_string());
564
565 if queries.is_empty() {
567 let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
568 labels_vec.sort_unstable();
569 return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
570 }
571
572 let start = params
574 .start
575 .or(form_params.start)
576 .unwrap_or_else(yesterday_rfc3339);
577 let end = params
578 .end
579 .or(form_params.end)
580 .unwrap_or_else(current_time_rfc3339);
581 let lookback = params
582 .lookback
583 .or(form_params.lookback)
584 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
585
586 let mut fetched_labels = HashSet::new();
587 let _ = fetched_labels.insert(METRIC_NAME.to_string());
588
589 let mut merge_map = HashMap::new();
590 for query in queries {
591 let prom_query = PromQuery {
592 query,
593 start: start.clone(),
594 end: end.clone(),
595 step: DEFAULT_LOOKBACK_STRING.to_string(),
596 lookback: lookback.clone(),
597 alias: None,
598 };
599
600 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
601 handle_schema_err!(
602 retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
603 .await
604 );
605 }
606
607 fetched_labels.retain(|l| labels.contains(l));
609 let _ = labels.insert(METRIC_NAME.to_string());
610
611 let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
612 sorted_labels.sort();
613 let merge_map = merge_map
614 .into_iter()
615 .map(|(k, v)| (k, Value::from(v)))
616 .collect();
617 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
618 resp.resp_metrics = merge_map;
619 resp
620}
621
622async fn get_all_column_names(
624 catalog: &str,
625 schema: &str,
626 manager: &CatalogManagerRef,
627) -> std::result::Result<HashSet<String>, catalog::error::Error> {
628 let table_names = manager.table_names(catalog, schema, None).await?;
629
630 let mut labels = HashSet::new();
631 for table_name in table_names {
632 let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
633 continue;
634 };
635 for column in table.primary_key_columns() {
636 if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
637 && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
638 {
639 labels.insert(column.name);
640 }
641 }
642 }
643
644 Ok(labels)
645}
646
647async fn retrieve_series_from_query_result(
648 result: Result<Output>,
649 series: &mut Vec<HashMap<Column, String>>,
650 query_ctx: &QueryContext,
651 table_name: &str,
652 manager: &CatalogManagerRef,
653 metrics: &mut HashMap<String, u64>,
654) -> Result<()> {
655 let result = result?;
656
657 let table = manager
659 .table(
660 query_ctx.current_catalog(),
661 &query_ctx.current_schema(),
662 table_name,
663 Some(query_ctx),
664 )
665 .await
666 .context(CatalogSnafu)?
667 .with_context(|| TableNotFoundSnafu {
668 catalog: query_ctx.current_catalog(),
669 schema: query_ctx.current_schema(),
670 table: table_name,
671 })?;
672 let tag_columns = table
673 .primary_key_columns()
674 .map(|c| c.name)
675 .collect::<HashSet<_>>();
676
677 match result.data {
678 OutputData::RecordBatches(batches) => {
679 record_batches_to_series(batches, series, table_name, &tag_columns)
680 }
681 OutputData::Stream(stream) => {
682 let batches = RecordBatches::try_collect(stream)
683 .await
684 .context(CollectRecordbatchSnafu)?;
685 record_batches_to_series(batches, series, table_name, &tag_columns)
686 }
687 OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
688 reason: "expected data result, but got affected rows".to_string(),
689 location: Location::default(),
690 }),
691 }?;
692
693 if let Some(ref plan) = result.meta.plan {
694 collect_plan_metrics(plan, &mut [metrics]);
695 }
696 Ok(())
697}
698
699async fn retrieve_labels_name_from_query_result(
701 result: Result<Output>,
702 labels: &mut HashSet<String>,
703 metrics: &mut HashMap<String, u64>,
704) -> Result<()> {
705 let result = result?;
706 match result.data {
707 OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
708 OutputData::Stream(stream) => {
709 let batches = RecordBatches::try_collect(stream)
710 .await
711 .context(CollectRecordbatchSnafu)?;
712 record_batches_to_labels_name(batches, labels)
713 }
714 OutputData::AffectedRows(_) => UnexpectedResultSnafu {
715 reason: "expected data result, but got affected rows".to_string(),
716 }
717 .fail(),
718 }?;
719 if let Some(ref plan) = result.meta.plan {
720 collect_plan_metrics(plan, &mut [metrics]);
721 }
722 Ok(())
723}
724
725fn record_batches_to_series(
726 batches: RecordBatches,
727 series: &mut Vec<HashMap<Column, String>>,
728 table_name: &str,
729 tag_columns: &HashSet<String>,
730) -> Result<()> {
731 for batch in batches.iter() {
732 let projection = batch
734 .schema
735 .column_schemas()
736 .iter()
737 .enumerate()
738 .filter_map(|(idx, col)| {
739 if tag_columns.contains(&col.name) {
740 Some(idx)
741 } else {
742 None
743 }
744 })
745 .collect::<Vec<_>>();
746 let batch = batch
747 .try_project(&projection)
748 .context(CollectRecordbatchSnafu)?;
749
750 let mut writer = RowWriter::new(&batch.schema, table_name);
751 writer.write(batch, series)?;
752 }
753 Ok(())
754}
755
756struct RowWriter {
763 template: HashMap<Column, Option<String>>,
766 current: Option<HashMap<Column, Option<String>>>,
768}
769
770impl RowWriter {
771 fn new(schema: &SchemaRef, table: &str) -> Self {
772 let mut template = schema
773 .column_schemas()
774 .iter()
775 .map(|x| (x.name.as_str().into(), None))
776 .collect::<HashMap<Column, Option<String>>>();
777 template.insert("__name__".into(), Some(table.to_string()));
778 Self {
779 template,
780 current: None,
781 }
782 }
783
784 fn insert(&mut self, column: ColumnRef, value: impl ToString) {
785 let current = self.current.get_or_insert_with(|| self.template.clone());
786 match current.get_mut(&column as &dyn AsColumnRef) {
787 Some(x) => {
788 let _ = x.insert(value.to_string());
789 }
790 None => {
791 let _ = current.insert(column.0.into(), Some(value.to_string()));
792 }
793 }
794 }
795
796 fn insert_bytes(&mut self, column_schema: &ColumnSchema, bytes: &[u8]) -> Result<()> {
797 let column_name = column_schema.name.as_str().into();
798
799 if column_schema.data_type.is_json() {
800 let s = jsonb_to_string(bytes).context(ConvertScalarValueSnafu)?;
801 self.insert(column_name, s);
802 } else {
803 let hex = bytes
804 .iter()
805 .map(|b| format!("{b:02x}"))
806 .collect::<Vec<String>>()
807 .join("");
808 self.insert(column_name, hex);
809 }
810 Ok(())
811 }
812
813 fn finish(&mut self) -> HashMap<Column, String> {
814 let Some(current) = self.current.take() else {
815 return HashMap::new();
816 };
817 current
818 .into_iter()
819 .filter_map(|(k, v)| v.map(|v| (k, v)))
820 .collect()
821 }
822
823 fn write(
824 &mut self,
825 record_batch: RecordBatch,
826 series: &mut Vec<HashMap<Column, String>>,
827 ) -> Result<()> {
828 let schema = record_batch.schema.clone();
829 let record_batch = record_batch.into_df_record_batch();
830 for i in 0..record_batch.num_rows() {
831 for (j, array) in record_batch.columns().iter().enumerate() {
832 let column = schema.column_name_by_index(j).into();
833
834 if array.is_null(i) {
835 self.insert(column, "Null");
836 continue;
837 }
838
839 match array.data_type() {
840 DataType::Null => {
841 self.insert(column, "Null");
842 }
843 DataType::Boolean => {
844 let array = array.as_boolean();
845 let v = array.value(i);
846 self.insert(column, v);
847 }
848 DataType::UInt8 => {
849 let array = array.as_primitive::<UInt8Type>();
850 let v = array.value(i);
851 self.insert(column, v);
852 }
853 DataType::UInt16 => {
854 let array = array.as_primitive::<UInt16Type>();
855 let v = array.value(i);
856 self.insert(column, v);
857 }
858 DataType::UInt32 => {
859 let array = array.as_primitive::<UInt32Type>();
860 let v = array.value(i);
861 self.insert(column, v);
862 }
863 DataType::UInt64 => {
864 let array = array.as_primitive::<UInt64Type>();
865 let v = array.value(i);
866 self.insert(column, v);
867 }
868 DataType::Int8 => {
869 let array = array.as_primitive::<Int8Type>();
870 let v = array.value(i);
871 self.insert(column, v);
872 }
873 DataType::Int16 => {
874 let array = array.as_primitive::<Int16Type>();
875 let v = array.value(i);
876 self.insert(column, v);
877 }
878 DataType::Int32 => {
879 let array = array.as_primitive::<Int32Type>();
880 let v = array.value(i);
881 self.insert(column, v);
882 }
883 DataType::Int64 => {
884 let array = array.as_primitive::<Int64Type>();
885 let v = array.value(i);
886 self.insert(column, v);
887 }
888 DataType::Float32 => {
889 let array = array.as_primitive::<Float32Type>();
890 let v = array.value(i);
891 self.insert(column, v);
892 }
893 DataType::Float64 => {
894 let array = array.as_primitive::<Float64Type>();
895 let v = array.value(i);
896 self.insert(column, v);
897 }
898 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
899 let v = datatypes::arrow_array::string_array_value(array, i);
900 self.insert(column, v);
901 }
902 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
903 let v = datatypes::arrow_array::binary_array_value(array, i);
904 let column_schema = &schema.column_schemas()[j];
905 self.insert_bytes(column_schema, v)?;
906 }
907 DataType::Date32 => {
908 let array = array.as_primitive::<Date32Type>();
909 let v = Date::new(array.value(i));
910 self.insert(column, v);
911 }
912 DataType::Date64 => {
913 let array = array.as_primitive::<Date64Type>();
914 let v = Date::new((array.value(i) / 86_400_000) as i32);
918 self.insert(column, v);
919 }
920 DataType::Timestamp(_, _) => {
921 let v = datatypes::arrow_array::timestamp_array_value(array, i);
922 self.insert(column, v.to_iso8601_string());
923 }
924 DataType::Time32(_) | DataType::Time64(_) => {
925 let v = datatypes::arrow_array::time_array_value(array, i);
926 self.insert(column, v.to_iso8601_string());
927 }
928 DataType::Interval(interval_unit) => match interval_unit {
929 IntervalUnit::YearMonth => {
930 let array = array.as_primitive::<IntervalYearMonthType>();
931 let v: IntervalYearMonth = array.value(i).into();
932 self.insert(column, v.to_iso8601_string());
933 }
934 IntervalUnit::DayTime => {
935 let array = array.as_primitive::<IntervalDayTimeType>();
936 let v: IntervalDayTime = array.value(i).into();
937 self.insert(column, v.to_iso8601_string());
938 }
939 IntervalUnit::MonthDayNano => {
940 let array = array.as_primitive::<IntervalMonthDayNanoType>();
941 let v: IntervalMonthDayNano = array.value(i).into();
942 self.insert(column, v.to_iso8601_string());
943 }
944 },
945 DataType::Duration(_) => {
946 let d = datatypes::arrow_array::duration_array_value(array, i);
947 self.insert(column, d);
948 }
949 DataType::List(_) => {
950 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
951 self.insert(column, v);
952 }
953 DataType::Struct(_) => {
954 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
955 self.insert(column, v);
956 }
957 DataType::Decimal128(precision, scale) => {
958 let array = array.as_primitive::<Decimal128Type>();
959 let v = Decimal128::new(array.value(i), *precision, *scale);
960 self.insert(column, v);
961 }
962 _ => {
963 return NotSupportedSnafu {
964 feat: format!("convert {} to http value", array.data_type()),
965 }
966 .fail();
967 }
968 }
969 }
970
971 series.push(self.finish())
972 }
973 Ok(())
974 }
975}
976
977#[derive(Debug, Clone, Copy, PartialEq)]
978struct ColumnRef<'a>(&'a str);
979
980impl<'a> From<&'a str> for ColumnRef<'a> {
981 fn from(s: &'a str) -> Self {
982 Self(s)
983 }
984}
985
986trait AsColumnRef {
987 fn as_ref(&self) -> ColumnRef<'_>;
988}
989
990impl AsColumnRef for Column {
991 fn as_ref(&self) -> ColumnRef<'_> {
992 self.0.as_str().into()
993 }
994}
995
996impl AsColumnRef for ColumnRef<'_> {
997 fn as_ref(&self) -> ColumnRef<'_> {
998 *self
999 }
1000}
1001
1002impl<'a> PartialEq for dyn AsColumnRef + 'a {
1003 fn eq(&self, other: &Self) -> bool {
1004 self.as_ref() == other.as_ref()
1005 }
1006}
1007
1008impl<'a> Eq for dyn AsColumnRef + 'a {}
1009
1010impl<'a> Hash for dyn AsColumnRef + 'a {
1011 fn hash<H: Hasher>(&self, state: &mut H) {
1012 self.as_ref().0.hash(state);
1013 }
1014}
1015
1016impl<'a> Borrow<dyn AsColumnRef + 'a> for Column {
1017 fn borrow(&self) -> &(dyn AsColumnRef + 'a) {
1018 self
1019 }
1020}
1021
1022fn record_batches_to_labels_name(
1024 batches: RecordBatches,
1025 labels: &mut HashSet<String>,
1026) -> Result<()> {
1027 let mut column_indices = Vec::new();
1028 let mut field_column_indices = Vec::new();
1029 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
1030 if let ConcreteDataType::Float64(_) = column.data_type {
1031 field_column_indices.push(i);
1032 }
1033 column_indices.push(i);
1034 }
1035
1036 if field_column_indices.is_empty() {
1037 return Err(Error::Internal {
1038 err_msg: "no value column found".to_string(),
1039 });
1040 }
1041
1042 for batch in batches.iter() {
1043 let names = column_indices
1044 .iter()
1045 .map(|c| batches.schema().column_name_by_index(*c).to_string())
1046 .collect::<Vec<_>>();
1047
1048 let field_columns = field_column_indices
1049 .iter()
1050 .map(|i| {
1051 let column = batch.column(*i);
1052 column.as_primitive::<Float64Type>()
1053 })
1054 .collect::<Vec<_>>();
1055
1056 for row_index in 0..batch.num_rows() {
1057 if field_columns.iter().all(|c| c.is_null(row_index)) {
1059 continue;
1060 }
1061
1062 names.iter().for_each(|name| {
1064 let _ = labels.insert(name.clone());
1065 });
1066 return Ok(());
1067 }
1068 }
1069 Ok(())
1070}
1071
1072pub(crate) fn retrieve_metric_name_and_result_type(
1073 promql: &str,
1074) -> Result<(Option<String>, ValueType)> {
1075 let promql_expr = promql_parser::parser::parse(promql)
1076 .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
1077 let metric_name = promql_expr_to_metric_name(&promql_expr);
1078 let result_type = promql_expr.value_type();
1079
1080 Ok((metric_name, result_type))
1081}
1082
1083pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
1086 if let Some(db) = db {
1087 parse_catalog_and_schema_from_db_string(db)
1088 } else {
1089 (
1090 ctx.current_catalog().to_string(),
1091 ctx.current_schema().clone(),
1092 )
1093 }
1094}
1095
1096pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
1098 if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
1099 ctx.set_current_catalog(catalog);
1100 ctx.set_current_schema(schema);
1101 }
1102}
1103
1104fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
1105 let mut metric_names = HashSet::new();
1106 collect_metric_names(expr, &mut metric_names);
1107
1108 if metric_names.len() == 1 {
1110 metric_names.into_iter().next()
1111 } else {
1112 None
1113 }
1114}
1115
1116fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
1118 match expr {
1119 PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
1120 match modifier {
1121 Some(LabelModifier::Include(labels)) => {
1122 if !labels.labels.contains(&METRIC_NAME.to_string()) {
1123 metric_names.clear();
1124 return;
1125 }
1126 }
1127 Some(LabelModifier::Exclude(labels)) => {
1128 if labels.labels.contains(&METRIC_NAME.to_string()) {
1129 metric_names.clear();
1130 return;
1131 }
1132 }
1133 _ => {}
1134 }
1135 collect_metric_names(expr, metric_names)
1136 }
1137 PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
1138 PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
1139 if matches!(
1140 op.id(),
1141 token::T_LAND | token::T_LOR | token::T_LUNLESS ) {
1145 collect_metric_names(lhs, metric_names)
1146 } else {
1147 metric_names.clear()
1148 }
1149 }
1150 PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
1151 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
1152 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1153 if let Some(name) = name {
1154 metric_names.insert(name.clone());
1155 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1156 metric_names.insert(matcher.value);
1157 }
1158 }
1159 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1160 let VectorSelector { name, matchers, .. } = vs;
1161 if let Some(name) = name {
1162 metric_names.insert(name.clone());
1163 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1164 metric_names.insert(matcher.value);
1165 }
1166 }
1167 PromqlExpr::Call(Call { args, .. }) => {
1168 args.args
1169 .iter()
1170 .for_each(|e| collect_metric_names(e, metric_names));
1171 }
1172 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1173 }
1174}
1175
1176fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
1177where
1178 F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
1179{
1180 match expr {
1181 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1182 PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
1183 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1184 find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
1185 }
1186 PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
1187 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1188 PromqlExpr::NumberLiteral(_) => None,
1189 PromqlExpr::StringLiteral(_) => None,
1190 PromqlExpr::Extension(_) => None,
1191 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
1192 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1193 let VectorSelector { name, matchers, .. } = vs;
1194
1195 f(name, matchers)
1196 }
1197 PromqlExpr::Call(Call { args, .. }) => args
1198 .args
1199 .iter()
1200 .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
1201 }
1202}
1203
1204fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
1206 find_metric_name_and_matchers(expr, |name, matchers| {
1207 if name.is_some() {
1209 return None;
1210 }
1211
1212 Some(matchers.find_matchers(METRIC_NAME))
1214 })
1215 .map(|matchers| {
1216 matchers
1217 .into_iter()
1218 .filter(|m| !matches!(m.op, MatchOp::Equal))
1219 .collect::<Vec<_>>()
1220 })
1221}
1222
1223fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
1226 match expr {
1227 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
1228 update_metric_name_matcher(expr, metric_name)
1229 }
1230 PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1231 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1232 update_metric_name_matcher(lhs, metric_name);
1233 update_metric_name_matcher(rhs, metric_name);
1234 }
1235 PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1236 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
1237 update_metric_name_matcher(expr, metric_name)
1238 }
1239 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1240 if name.is_some() {
1241 return;
1242 }
1243
1244 for m in &mut matchers.matchers {
1245 if m.name == METRIC_NAME {
1246 m.op = MatchOp::Equal;
1247 m.value = metric_name.to_string();
1248 }
1249 }
1250 }
1251 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1252 let VectorSelector { name, matchers, .. } = vs;
1253 if name.is_some() {
1254 return;
1255 }
1256
1257 for m in &mut matchers.matchers {
1258 if m.name == METRIC_NAME {
1259 m.op = MatchOp::Equal;
1260 m.value = metric_name.to_string();
1261 }
1262 }
1263 }
1264 PromqlExpr::Call(Call { args, .. }) => {
1265 args.args.iter_mut().for_each(|e| {
1266 update_metric_name_matcher(e, metric_name);
1267 });
1268 }
1269 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1270 }
1271}
1272
1273#[derive(Debug, Default, Serialize, Deserialize)]
1274pub struct LabelValueQuery {
1275 start: Option<String>,
1276 end: Option<String>,
1277 lookback: Option<String>,
1278 #[serde(flatten)]
1279 matches: Matches,
1280 db: Option<String>,
1281 limit: Option<usize>,
1282}
1283
1284#[axum_macros::debug_handler]
1285#[tracing::instrument(
1286 skip_all,
1287 fields(protocol = "prometheus", request_type = "label_values_query")
1288)]
1289pub async fn label_values_query(
1290 State(handler): State<PrometheusHandlerRef>,
1291 Path(label_name): Path<String>,
1292 Extension(mut query_ctx): Extension<QueryContext>,
1293 Query(params): Query<LabelValueQuery>,
1294) -> PrometheusJsonResponse {
1295 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
1296 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1297 let query_ctx = Arc::new(query_ctx);
1298
1299 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1300 .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1301 .start_timer();
1302
1303 if label_name == METRIC_NAME_LABEL {
1304 let catalog_manager = handler.catalog_manager();
1305
1306 let mut table_names = try_call_return_response!(
1307 retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1308 );
1309
1310 truncate_results(&mut table_names, params.limit);
1311 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1312 } else if label_name == FIELD_NAME_LABEL {
1313 let field_columns = handle_schema_err!(
1314 retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1315 )
1316 .unwrap_or_default();
1317 let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1318 field_columns.sort_unstable();
1319 truncate_results(&mut field_columns, params.limit);
1320 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1321 } else if is_database_selection_label(&label_name) {
1322 let catalog_manager = handler.catalog_manager();
1323
1324 let mut schema_names = try_call_return_response!(
1325 retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1326 );
1327 truncate_results(&mut schema_names, params.limit);
1328 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1329 }
1330
1331 let queries = params.matches.0;
1332 if queries.is_empty() {
1333 return PrometheusJsonResponse::error(
1334 StatusCode::InvalidArguments,
1335 "match[] parameter is required",
1336 );
1337 }
1338
1339 let start = params.start.unwrap_or_else(yesterday_rfc3339);
1340 let end = params.end.unwrap_or_else(current_time_rfc3339);
1341 let mut label_values = HashSet::new();
1342
1343 let start = try_call_return_response!(
1344 QueryLanguageParser::parse_promql_timestamp(&start)
1345 .context(ParseTimestampSnafu { timestamp: &start })
1346 );
1347 let end = try_call_return_response!(
1348 QueryLanguageParser::parse_promql_timestamp(&end)
1349 .context(ParseTimestampSnafu { timestamp: &end })
1350 );
1351
1352 for query in queries {
1353 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1354 let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1355 return PrometheusJsonResponse::error(
1356 StatusCode::InvalidArguments,
1357 "expected vector selector",
1358 );
1359 };
1360 let Some(name) = take_metric_name(&mut vector_selector) else {
1361 return PrometheusJsonResponse::error(
1362 StatusCode::InvalidArguments,
1363 "expected metric name",
1364 );
1365 };
1366 let VectorSelector { matchers, .. } = vector_selector;
1367 let matchers = matchers.matchers;
1369 let result = handler
1370 .query_label_values(name, label_name.clone(), matchers, start, end, &query_ctx)
1371 .await;
1372 if let Some(result) = handle_schema_err!(result) {
1373 label_values.extend(result.into_iter());
1374 }
1375 }
1376
1377 let mut label_values: Vec<_> = label_values.into_iter().collect();
1378 label_values.sort_unstable();
1379 truncate_results(&mut label_values, params.limit);
1380
1381 PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1382}
1383
1384fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1385 if let Some(limit) = limit
1386 && limit > 0
1387 && label_values.len() >= limit
1388 {
1389 label_values.truncate(limit);
1390 }
1391}
1392
1393fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1396 if let Some(name) = selector.name.take() {
1397 return Some(name);
1398 }
1399
1400 let (pos, matcher) = selector
1401 .matchers
1402 .matchers
1403 .iter()
1404 .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1405 let name = matcher.value.clone();
1406 selector.matchers.matchers.remove(pos);
1408
1409 Some(name)
1410}
1411
1412async fn retrieve_table_names(
1413 query_ctx: &QueryContext,
1414 catalog_manager: CatalogManagerRef,
1415 matches: Vec<String>,
1416) -> Result<Vec<String>> {
1417 let catalog = query_ctx.current_catalog();
1418 let schema = query_ctx.current_schema();
1419
1420 let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1421 let mut table_names = Vec::new();
1422
1423 let name_matcher = matches
1425 .first()
1426 .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1427 .and_then(|expr| {
1428 if let PromqlExpr::VectorSelector(vector_selector) = expr {
1429 let matchers = vector_selector.matchers.matchers;
1430 for matcher in matchers {
1431 if matcher.name == METRIC_NAME_LABEL {
1432 return Some(matcher);
1433 }
1434 }
1435
1436 None
1437 } else {
1438 None
1439 }
1440 });
1441
1442 while let Some(table) = tables_stream.next().await {
1443 let table = table.context(CatalogSnafu)?;
1444 if !table
1445 .table_info()
1446 .meta
1447 .options
1448 .extra_options
1449 .contains_key(LOGICAL_TABLE_METADATA_KEY)
1450 {
1451 continue;
1453 }
1454
1455 let table_name = &table.table_info().name;
1456
1457 if let Some(matcher) = &name_matcher {
1458 match &matcher.op {
1459 MatchOp::Equal => {
1460 if table_name == &matcher.value {
1461 table_names.push(table_name.clone());
1462 }
1463 }
1464 MatchOp::Re(reg) => {
1465 if reg.is_match(table_name) {
1466 table_names.push(table_name.clone());
1467 }
1468 }
1469 _ => {
1470 table_names.push(table_name.clone());
1473 }
1474 }
1475 } else {
1476 table_names.push(table_name.clone());
1477 }
1478 }
1479
1480 table_names.sort_unstable();
1481 Ok(table_names)
1482}
1483
1484async fn retrieve_field_names(
1485 query_ctx: &QueryContext,
1486 manager: CatalogManagerRef,
1487 matches: Vec<String>,
1488) -> Result<HashSet<String>> {
1489 let mut field_columns = HashSet::new();
1490 let catalog = query_ctx.current_catalog();
1491 let schema = query_ctx.current_schema();
1492
1493 if matches.is_empty() {
1494 while let Some(table) = manager
1496 .tables(catalog, &schema, Some(query_ctx))
1497 .next()
1498 .await
1499 {
1500 let table = table.context(CatalogSnafu)?;
1501 for column in table.field_columns() {
1502 field_columns.insert(column.name);
1503 }
1504 }
1505 return Ok(field_columns);
1506 }
1507
1508 for table_name in matches {
1509 let table = manager
1510 .table(catalog, &schema, &table_name, Some(query_ctx))
1511 .await
1512 .context(CatalogSnafu)?
1513 .with_context(|| TableNotFoundSnafu {
1514 catalog: catalog.to_string(),
1515 schema: schema.clone(),
1516 table: table_name.clone(),
1517 })?;
1518
1519 for column in table.field_columns() {
1520 field_columns.insert(column.name);
1521 }
1522 }
1523 Ok(field_columns)
1524}
1525
1526async fn retrieve_schema_names(
1527 query_ctx: &QueryContext,
1528 catalog_manager: CatalogManagerRef,
1529 matches: Vec<String>,
1530) -> Result<Vec<String>> {
1531 let mut schemas = Vec::new();
1532 let catalog = query_ctx.current_catalog();
1533
1534 let candidate_schemas = catalog_manager
1535 .schema_names(catalog, Some(query_ctx))
1536 .await
1537 .context(CatalogSnafu)?;
1538
1539 for schema in candidate_schemas {
1540 let mut found = true;
1541 for match_item in &matches {
1542 if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1543 let exists = catalog_manager
1544 .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1545 .await
1546 .context(CatalogSnafu)?;
1547 if !exists {
1548 found = false;
1549 break;
1550 }
1551 }
1552 }
1553
1554 if found {
1555 schemas.push(schema);
1556 }
1557 }
1558
1559 schemas.sort_unstable();
1560
1561 Ok(schemas)
1562}
1563
1564fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1569 let promql_expr = promql_parser::parser::parse(query).ok()?;
1570 promql_expr_to_metric_name(&promql_expr)
1571}
1572
1573#[derive(Debug, Default, Serialize, Deserialize)]
1574pub struct SeriesQuery {
1575 start: Option<String>,
1576 end: Option<String>,
1577 lookback: Option<String>,
1578 #[serde(flatten)]
1579 matches: Matches,
1580 db: Option<String>,
1581}
1582
1583#[axum_macros::debug_handler]
1584#[tracing::instrument(
1585 skip_all,
1586 fields(protocol = "prometheus", request_type = "series_query")
1587)]
1588pub async fn series_query(
1589 State(handler): State<PrometheusHandlerRef>,
1590 Query(params): Query<SeriesQuery>,
1591 Extension(mut query_ctx): Extension<QueryContext>,
1592 Form(form_params): Form<SeriesQuery>,
1593) -> PrometheusJsonResponse {
1594 let mut queries: Vec<String> = params.matches.0;
1595 if queries.is_empty() {
1596 queries = form_params.matches.0;
1597 }
1598 if queries.is_empty() {
1599 return PrometheusJsonResponse::error(
1600 StatusCode::Unsupported,
1601 "match[] parameter is required",
1602 );
1603 }
1604 let start = params
1605 .start
1606 .or(form_params.start)
1607 .unwrap_or_else(yesterday_rfc3339);
1608 let end = params
1609 .end
1610 .or(form_params.end)
1611 .unwrap_or_else(current_time_rfc3339);
1612 let lookback = params
1613 .lookback
1614 .or(form_params.lookback)
1615 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1616
1617 if let Some(db) = ¶ms.db {
1619 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1620 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1621 }
1622 let query_ctx = Arc::new(query_ctx);
1623
1624 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1625 .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1626 .start_timer();
1627
1628 let mut series = Vec::new();
1629 let mut merge_map = HashMap::new();
1630 for query in queries {
1631 let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1632 let prom_query = PromQuery {
1633 query,
1634 start: start.clone(),
1635 end: end.clone(),
1636 step: DEFAULT_LOOKBACK_STRING.to_string(),
1638 lookback: lookback.clone(),
1639 alias: None,
1640 };
1641 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1642
1643 handle_schema_err!(
1644 retrieve_series_from_query_result(
1645 result,
1646 &mut series,
1647 &query_ctx,
1648 &table_name,
1649 &handler.catalog_manager(),
1650 &mut merge_map,
1651 )
1652 .await
1653 );
1654 }
1655 let merge_map = merge_map
1656 .into_iter()
1657 .map(|(k, v)| (k, Value::from(v)))
1658 .collect();
1659 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1660 resp.resp_metrics = merge_map;
1661 resp
1662}
1663
1664#[derive(Debug, Default, Serialize, Deserialize)]
1665pub struct ParseQuery {
1666 query: Option<String>,
1667 db: Option<String>,
1668}
1669
1670#[axum_macros::debug_handler]
1671#[tracing::instrument(
1672 skip_all,
1673 fields(protocol = "prometheus", request_type = "parse_query")
1674)]
1675pub async fn parse_query(
1676 State(_handler): State<PrometheusHandlerRef>,
1677 Query(params): Query<ParseQuery>,
1678 Extension(_query_ctx): Extension<QueryContext>,
1679 Form(form_params): Form<ParseQuery>,
1680) -> PrometheusJsonResponse {
1681 if let Some(query) = params.query.or(form_params.query) {
1682 let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1683 PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1684 } else {
1685 PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1686 }
1687}
1688
1689#[cfg(test)]
1690mod tests {
1691 use promql_parser::parser::value::ValueType;
1692
1693 use super::*;
1694
1695 struct TestCase {
1696 name: &'static str,
1697 promql: &'static str,
1698 expected_metric: Option<&'static str>,
1699 expected_type: ValueType,
1700 should_error: bool,
1701 }
1702
1703 #[test]
1704 fn test_retrieve_metric_name_and_result_type() {
1705 let test_cases = &[
1706 TestCase {
1708 name: "simple metric",
1709 promql: "cpu_usage",
1710 expected_metric: Some("cpu_usage"),
1711 expected_type: ValueType::Vector,
1712 should_error: false,
1713 },
1714 TestCase {
1715 name: "metric with selector",
1716 promql: r#"cpu_usage{instance="localhost"}"#,
1717 expected_metric: Some("cpu_usage"),
1718 expected_type: ValueType::Vector,
1719 should_error: false,
1720 },
1721 TestCase {
1722 name: "metric with range selector",
1723 promql: "cpu_usage[5m]",
1724 expected_metric: Some("cpu_usage"),
1725 expected_type: ValueType::Matrix,
1726 should_error: false,
1727 },
1728 TestCase {
1729 name: "metric with __name__ matcher",
1730 promql: r#"{__name__="cpu_usage"}"#,
1731 expected_metric: Some("cpu_usage"),
1732 expected_type: ValueType::Vector,
1733 should_error: false,
1734 },
1735 TestCase {
1736 name: "metric with unary operator",
1737 promql: "-cpu_usage",
1738 expected_metric: None,
1739 expected_type: ValueType::Vector,
1740 should_error: false,
1741 },
1742 TestCase {
1744 name: "metric with aggregation",
1745 promql: "sum(cpu_usage)",
1746 expected_metric: Some("cpu_usage"),
1747 expected_type: ValueType::Vector,
1748 should_error: false,
1749 },
1750 TestCase {
1751 name: "complex aggregation",
1752 promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1753 expected_metric: None,
1754 expected_type: ValueType::Vector,
1755 should_error: false,
1756 },
1757 TestCase {
1758 name: "complex aggregation",
1759 promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1760 expected_metric: Some("cpu_usage"),
1761 expected_type: ValueType::Vector,
1762 should_error: false,
1763 },
1764 TestCase {
1765 name: "complex aggregation",
1766 promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1767 expected_metric: Some("cpu_usage"),
1768 expected_type: ValueType::Vector,
1769 should_error: false,
1770 },
1771 TestCase {
1773 name: "same metric addition",
1774 promql: "cpu_usage + cpu_usage",
1775 expected_metric: None,
1776 expected_type: ValueType::Vector,
1777 should_error: false,
1778 },
1779 TestCase {
1780 name: "metric with scalar addition",
1781 promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1782 expected_metric: None,
1783 expected_type: ValueType::Vector,
1784 should_error: false,
1785 },
1786 TestCase {
1788 name: "different metrics addition",
1789 promql: "cpu_usage + memory_usage",
1790 expected_metric: None,
1791 expected_type: ValueType::Vector,
1792 should_error: false,
1793 },
1794 TestCase {
1795 name: "different metrics subtraction",
1796 promql: "network_in - network_out",
1797 expected_metric: None,
1798 expected_type: ValueType::Vector,
1799 should_error: false,
1800 },
1801 TestCase {
1803 name: "unless with different metrics",
1804 promql: "cpu_usage unless memory_usage",
1805 expected_metric: Some("cpu_usage"),
1806 expected_type: ValueType::Vector,
1807 should_error: false,
1808 },
1809 TestCase {
1810 name: "unless with same metric",
1811 promql: "cpu_usage unless cpu_usage",
1812 expected_metric: Some("cpu_usage"),
1813 expected_type: ValueType::Vector,
1814 should_error: false,
1815 },
1816 TestCase {
1818 name: "basic subquery",
1819 promql: "cpu_usage[5m:1m]",
1820 expected_metric: Some("cpu_usage"),
1821 expected_type: ValueType::Matrix,
1822 should_error: false,
1823 },
1824 TestCase {
1825 name: "subquery with multiple metrics",
1826 promql: "(cpu_usage + memory_usage)[5m:1m]",
1827 expected_metric: None,
1828 expected_type: ValueType::Matrix,
1829 should_error: false,
1830 },
1831 TestCase {
1833 name: "scalar value",
1834 promql: "42",
1835 expected_metric: None,
1836 expected_type: ValueType::Scalar,
1837 should_error: false,
1838 },
1839 TestCase {
1840 name: "string literal",
1841 promql: r#""hello world""#,
1842 expected_metric: None,
1843 expected_type: ValueType::String,
1844 should_error: false,
1845 },
1846 TestCase {
1848 name: "invalid syntax",
1849 promql: "cpu_usage{invalid=",
1850 expected_metric: None,
1851 expected_type: ValueType::Vector,
1852 should_error: true,
1853 },
1854 TestCase {
1855 name: "empty query",
1856 promql: "",
1857 expected_metric: None,
1858 expected_type: ValueType::Vector,
1859 should_error: true,
1860 },
1861 TestCase {
1862 name: "malformed brackets",
1863 promql: "cpu_usage[5m",
1864 expected_metric: None,
1865 expected_type: ValueType::Vector,
1866 should_error: true,
1867 },
1868 ];
1869
1870 for test_case in test_cases {
1871 let result = retrieve_metric_name_and_result_type(test_case.promql);
1872
1873 if test_case.should_error {
1874 assert!(
1875 result.is_err(),
1876 "Test '{}' should have failed but succeeded with: {:?}",
1877 test_case.name,
1878 result
1879 );
1880 } else {
1881 let (metric_name, value_type) = result.unwrap_or_else(|e| {
1882 panic!(
1883 "Test '{}' should have succeeded but failed with error: {}",
1884 test_case.name, e
1885 )
1886 });
1887
1888 let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1889 assert_eq!(
1890 metric_name, expected_metric_name,
1891 "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1892 test_case.name, expected_metric_name, metric_name
1893 );
1894
1895 assert_eq!(
1896 value_type, test_case.expected_type,
1897 "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1898 test_case.name, test_case.expected_type, value_type
1899 );
1900 }
1901 }
1902 }
1903}