1use std::borrow::Borrow;
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::hash::{Hash, Hasher};
20use std::sync::Arc;
21
22use arrow::array::{Array, AsArray};
23use arrow::datatypes::{
24 Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
25 Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
26 UInt8Type, UInt16Type, UInt32Type, UInt64Type,
27};
28use arrow_schema::{DataType, IntervalUnit};
29use axum::extract::{Path, Query, State};
30use axum::{Extension, Form};
31use catalog::CatalogManagerRef;
32use common_catalog::parse_catalog_and_schema_from_db_string;
33use common_decimal::Decimal128;
34use common_error::ext::ErrorExt;
35use common_error::status_code::StatusCode;
36use common_query::{Output, OutputData};
37use common_recordbatch::{RecordBatch, RecordBatches};
38use common_telemetry::{debug, tracing};
39use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
40use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
41use common_version::OwnedBuildInfo;
42use datafusion_common::ScalarValue;
43use datatypes::prelude::ConcreteDataType;
44use datatypes::schema::{ColumnSchema, SchemaRef};
45use datatypes::types::jsonb_to_string;
46use futures::StreamExt;
47use futures::future::join_all;
48use itertools::Itertools;
49use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
50use promql_parser::parser::token::{self};
51use promql_parser::parser::value::ValueType;
52use promql_parser::parser::{
53 AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
54 SubqueryExpr, UnaryExpr, VectorSelector,
55};
56use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
57use serde::de::{self, MapAccess, Visitor};
58use serde::{Deserialize, Serialize};
59use serde_json::Value;
60use session::context::{QueryContext, QueryContextRef};
61use snafu::{Location, OptionExt, ResultExt};
62use store_api::metric_engine_consts::{
63 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
64};
65
66pub use super::result::prometheus_resp::PrometheusJsonResponse;
67use crate::error::{
68 CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error, InvalidQuerySnafu,
69 NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu, UnexpectedResultSnafu,
70};
71use crate::http::header::collect_plan_metrics;
72use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL, is_database_selection_label};
73use crate::prometheus_handler::PrometheusHandlerRef;
74
75#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
77pub struct PromSeriesVector {
78 pub metric: BTreeMap<String, String>,
79 #[serde(skip_serializing_if = "Option::is_none")]
80 pub value: Option<(f64, String)>,
81}
82
83#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
85pub struct PromSeriesMatrix {
86 pub metric: BTreeMap<String, String>,
87 pub values: Vec<(f64, String)>,
88}
89
90#[derive(Debug, Serialize, Deserialize, PartialEq)]
92#[serde(untagged)]
93pub enum PromQueryResult {
94 Matrix(Vec<PromSeriesMatrix>),
95 Vector(Vec<PromSeriesVector>),
96 Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
97 String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
98}
99
100impl Default for PromQueryResult {
101 fn default() -> Self {
102 PromQueryResult::Matrix(Default::default())
103 }
104}
105
106#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
107pub struct PromData {
108 #[serde(rename = "resultType")]
109 pub result_type: String,
110 pub result: PromQueryResult,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
116pub struct Column(Arc<String>);
117
118impl From<&str> for Column {
119 fn from(s: &str) -> Self {
120 Self(Arc::new(s.to_string()))
121 }
122}
123
124#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
125#[serde(untagged)]
126pub enum PrometheusResponse {
127 PromData(PromData),
128 Labels(Vec<String>),
129 Series(Vec<HashMap<Column, String>>),
130 LabelValues(Vec<String>),
131 FormatQuery(String),
132 BuildInfo(OwnedBuildInfo),
133 #[serde(skip_deserializing)]
134 ParseResult(promql_parser::parser::Expr),
135 #[default]
136 None,
137}
138
139impl PrometheusResponse {
140 fn append(&mut self, other: PrometheusResponse) {
144 match (self, other) {
145 (
146 PrometheusResponse::PromData(PromData {
147 result: PromQueryResult::Matrix(lhs),
148 ..
149 }),
150 PrometheusResponse::PromData(PromData {
151 result: PromQueryResult::Matrix(rhs),
152 ..
153 }),
154 ) => {
155 lhs.extend(rhs);
156 }
157
158 (
159 PrometheusResponse::PromData(PromData {
160 result: PromQueryResult::Vector(lhs),
161 ..
162 }),
163 PrometheusResponse::PromData(PromData {
164 result: PromQueryResult::Vector(rhs),
165 ..
166 }),
167 ) => {
168 lhs.extend(rhs);
169 }
170 _ => {
171 }
173 }
174 }
175
176 pub fn is_none(&self) -> bool {
177 matches!(self, PrometheusResponse::None)
178 }
179}
180
181#[derive(Debug, Default, Serialize, Deserialize)]
182pub struct FormatQuery {
183 query: Option<String>,
184}
185
186#[axum_macros::debug_handler]
187#[tracing::instrument(
188 skip_all,
189 fields(protocol = "prometheus", request_type = "format_query")
190)]
191pub async fn format_query(
192 State(_handler): State<PrometheusHandlerRef>,
193 Query(params): Query<InstantQuery>,
194 Extension(_query_ctx): Extension<QueryContext>,
195 Form(form_params): Form<InstantQuery>,
196) -> PrometheusJsonResponse {
197 let query = params.query.or(form_params.query).unwrap_or_default();
198 match promql_parser::parser::parse(&query) {
199 Ok(expr) => {
200 let pretty = expr.prettify();
201 PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
202 }
203 Err(reason) => {
204 let err = InvalidQuerySnafu { reason }.build();
205 PrometheusJsonResponse::error(err.status_code(), err.output_msg())
206 }
207 }
208}
209
210#[derive(Debug, Default, Serialize, Deserialize)]
211pub struct BuildInfoQuery {}
212
213#[axum_macros::debug_handler]
214#[tracing::instrument(
215 skip_all,
216 fields(protocol = "prometheus", request_type = "build_info_query")
217)]
218pub async fn build_info_query() -> PrometheusJsonResponse {
219 let build_info = common_version::build_info().clone();
220 PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
221}
222
223#[derive(Debug, Default, Serialize, Deserialize)]
224pub struct InstantQuery {
225 query: Option<String>,
226 lookback: Option<String>,
227 time: Option<String>,
228 timeout: Option<String>,
229 db: Option<String>,
230}
231
232macro_rules! try_call_return_response {
235 ($handle: expr) => {
236 match $handle {
237 Ok(res) => res,
238 Err(err) => {
239 let msg = err.to_string();
240 return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
241 }
242 }
243 };
244}
245
246#[axum_macros::debug_handler]
247#[tracing::instrument(
248 skip_all,
249 fields(protocol = "prometheus", request_type = "instant_query")
250)]
251pub async fn instant_query(
252 State(handler): State<PrometheusHandlerRef>,
253 Query(params): Query<InstantQuery>,
254 Extension(mut query_ctx): Extension<QueryContext>,
255 Form(form_params): Form<InstantQuery>,
256) -> PrometheusJsonResponse {
257 let time = params
259 .time
260 .or(form_params.time)
261 .unwrap_or_else(current_time_rfc3339);
262 let prom_query = PromQuery {
263 query: params.query.or(form_params.query).unwrap_or_default(),
264 start: time.clone(),
265 end: time,
266 step: "1s".to_string(),
267 lookback: params
268 .lookback
269 .or(form_params.lookback)
270 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
271 alias: None,
272 };
273
274 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
275
276 if let Some(db) = ¶ms.db {
278 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
279 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
280 }
281 let query_ctx = Arc::new(query_ctx);
282
283 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
284 .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
285 .start_timer();
286
287 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
288 && !name_matchers.is_empty()
289 {
290 debug!("Find metric name matchers: {:?}", name_matchers);
291
292 let metric_names =
293 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
294
295 debug!("Find metric names: {:?}", metric_names);
296
297 if metric_names.is_empty() {
298 let result_type = promql_expr.value_type();
299
300 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
301 result_type: result_type.to_string(),
302 ..Default::default()
303 }));
304 }
305
306 let responses = join_all(metric_names.into_iter().map(|metric| {
307 let mut prom_query = prom_query.clone();
308 let mut promql_expr = promql_expr.clone();
309 let query_ctx = query_ctx.clone();
310 let handler = handler.clone();
311
312 async move {
313 update_metric_name_matcher(&mut promql_expr, &metric);
314 let new_query = promql_expr.to_string();
315 debug!(
316 "Updated promql, before: {}, after: {}",
317 &prom_query.query, new_query
318 );
319 prom_query.query = new_query;
320
321 do_instant_query(&handler, &prom_query, query_ctx).await
322 }
323 }))
324 .await;
325
326 responses
327 .into_iter()
328 .reduce(|mut acc, resp| {
329 acc.data.append(resp.data);
330 acc
331 })
332 .unwrap()
333 } else {
334 do_instant_query(&handler, &prom_query, query_ctx).await
335 }
336}
337
338async fn do_instant_query(
340 handler: &PrometheusHandlerRef,
341 prom_query: &PromQuery,
342 query_ctx: QueryContextRef,
343) -> PrometheusJsonResponse {
344 let result = handler.do_query(prom_query, query_ctx).await;
345 let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
346 Ok((metric_name, result_type)) => (metric_name, result_type),
347 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
348 };
349 PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
350}
351
352#[derive(Debug, Default, Serialize, Deserialize)]
353pub struct RangeQuery {
354 query: Option<String>,
355 start: Option<String>,
356 end: Option<String>,
357 step: Option<String>,
358 lookback: Option<String>,
359 timeout: Option<String>,
360 db: Option<String>,
361}
362
363#[axum_macros::debug_handler]
364#[tracing::instrument(
365 skip_all,
366 fields(protocol = "prometheus", request_type = "range_query")
367)]
368pub async fn range_query(
369 State(handler): State<PrometheusHandlerRef>,
370 Query(params): Query<RangeQuery>,
371 Extension(mut query_ctx): Extension<QueryContext>,
372 Form(form_params): Form<RangeQuery>,
373) -> PrometheusJsonResponse {
374 let prom_query = PromQuery {
375 query: params.query.or(form_params.query).unwrap_or_default(),
376 start: params.start.or(form_params.start).unwrap_or_default(),
377 end: params.end.or(form_params.end).unwrap_or_default(),
378 step: params.step.or(form_params.step).unwrap_or_default(),
379 lookback: params
380 .lookback
381 .or(form_params.lookback)
382 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
383 alias: None,
384 };
385
386 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
387
388 if let Some(db) = ¶ms.db {
390 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
391 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
392 }
393 let query_ctx = Arc::new(query_ctx);
394 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
395 .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
396 .start_timer();
397
398 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
399 && !name_matchers.is_empty()
400 {
401 debug!("Find metric name matchers: {:?}", name_matchers);
402
403 let metric_names =
404 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
405
406 debug!("Find metric names: {:?}", metric_names);
407
408 if metric_names.is_empty() {
409 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
410 result_type: ValueType::Matrix.to_string(),
411 ..Default::default()
412 }));
413 }
414
415 let responses = join_all(metric_names.into_iter().map(|metric| {
416 let mut prom_query = prom_query.clone();
417 let mut promql_expr = promql_expr.clone();
418 let query_ctx = query_ctx.clone();
419 let handler = handler.clone();
420
421 async move {
422 update_metric_name_matcher(&mut promql_expr, &metric);
423 let new_query = promql_expr.to_string();
424 debug!(
425 "Updated promql, before: {}, after: {}",
426 &prom_query.query, new_query
427 );
428 prom_query.query = new_query;
429
430 do_range_query(&handler, &prom_query, query_ctx).await
431 }
432 }))
433 .await;
434
435 responses
437 .into_iter()
438 .reduce(|mut acc, resp| {
439 acc.data.append(resp.data);
440 acc
441 })
442 .unwrap()
443 } else {
444 do_range_query(&handler, &prom_query, query_ctx).await
445 }
446}
447
448async fn do_range_query(
450 handler: &PrometheusHandlerRef,
451 prom_query: &PromQuery,
452 query_ctx: QueryContextRef,
453) -> PrometheusJsonResponse {
454 let result = handler.do_query(prom_query, query_ctx).await;
455 let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
456 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
457 Ok((metric_name, _)) => metric_name,
458 };
459 PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
460}
461
462#[derive(Debug, Default, Serialize)]
463struct Matches(Vec<String>);
464
465#[derive(Debug, Default, Serialize, Deserialize)]
466pub struct LabelsQuery {
467 start: Option<String>,
468 end: Option<String>,
469 lookback: Option<String>,
470 #[serde(flatten)]
471 matches: Matches,
472 db: Option<String>,
473}
474
475impl<'de> Deserialize<'de> for Matches {
477 fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
478 where
479 D: de::Deserializer<'de>,
480 {
481 struct MatchesVisitor;
482
483 impl<'d> Visitor<'d> for MatchesVisitor {
484 type Value = Vec<String>;
485
486 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
487 formatter.write_str("a string")
488 }
489
490 fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
491 where
492 M: MapAccess<'d>,
493 {
494 let mut matches = Vec::new();
495 while let Some((key, value)) = access.next_entry::<String, String>()? {
496 if key == "match[]" {
497 matches.push(value);
498 }
499 }
500 Ok(matches)
501 }
502 }
503 Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
504 }
505}
506
507macro_rules! handle_schema_err {
514 ($result:expr) => {
515 match $result {
516 Ok(v) => Some(v),
517 Err(err) => {
518 if err.status_code() == StatusCode::TableNotFound
519 || err.status_code() == StatusCode::TableColumnNotFound
520 {
521 None
523 } else {
524 return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
525 }
526 }
527 }
528 };
529}
530
531#[axum_macros::debug_handler]
532#[tracing::instrument(
533 skip_all,
534 fields(protocol = "prometheus", request_type = "labels_query")
535)]
536pub async fn labels_query(
537 State(handler): State<PrometheusHandlerRef>,
538 Query(params): Query<LabelsQuery>,
539 Extension(mut query_ctx): Extension<QueryContext>,
540 Form(form_params): Form<LabelsQuery>,
541) -> PrometheusJsonResponse {
542 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
543 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
544 let query_ctx = Arc::new(query_ctx);
545
546 let mut queries = params.matches.0;
547 if queries.is_empty() {
548 queries = form_params.matches.0;
549 }
550
551 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
552 .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
553 .start_timer();
554
555 let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
557 {
558 Ok(labels) => labels,
559 Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
560 };
561 let _ = labels.insert(METRIC_NAME.to_string());
563
564 if queries.is_empty() {
566 let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
567 labels_vec.sort_unstable();
568 return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
569 }
570
571 let start = params
573 .start
574 .or(form_params.start)
575 .unwrap_or_else(yesterday_rfc3339);
576 let end = params
577 .end
578 .or(form_params.end)
579 .unwrap_or_else(current_time_rfc3339);
580 let lookback = params
581 .lookback
582 .or(form_params.lookback)
583 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
584
585 let mut fetched_labels = HashSet::new();
586 let _ = fetched_labels.insert(METRIC_NAME.to_string());
587
588 let mut merge_map = HashMap::new();
589 for query in queries {
590 let prom_query = PromQuery {
591 query,
592 start: start.clone(),
593 end: end.clone(),
594 step: DEFAULT_LOOKBACK_STRING.to_string(),
595 lookback: lookback.clone(),
596 alias: None,
597 };
598
599 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
600 handle_schema_err!(
601 retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
602 .await
603 );
604 }
605
606 fetched_labels.retain(|l| labels.contains(l));
608 let _ = labels.insert(METRIC_NAME.to_string());
609
610 let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
611 sorted_labels.sort();
612 let merge_map = merge_map
613 .into_iter()
614 .map(|(k, v)| (k, Value::from(v)))
615 .collect();
616 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
617 resp.resp_metrics = merge_map;
618 resp
619}
620
621async fn get_all_column_names(
623 catalog: &str,
624 schema: &str,
625 manager: &CatalogManagerRef,
626) -> std::result::Result<HashSet<String>, catalog::error::Error> {
627 let table_names = manager.table_names(catalog, schema, None).await?;
628
629 let mut labels = HashSet::new();
630 for table_name in table_names {
631 let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
632 continue;
633 };
634 for column in table.primary_key_columns() {
635 if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
636 && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
637 {
638 labels.insert(column.name);
639 }
640 }
641 }
642
643 Ok(labels)
644}
645
646async fn retrieve_series_from_query_result(
647 result: Result<Output>,
648 series: &mut Vec<HashMap<Column, String>>,
649 query_ctx: &QueryContext,
650 table_name: &str,
651 manager: &CatalogManagerRef,
652 metrics: &mut HashMap<String, u64>,
653) -> Result<()> {
654 let result = result?;
655
656 let table = manager
658 .table(
659 query_ctx.current_catalog(),
660 &query_ctx.current_schema(),
661 table_name,
662 Some(query_ctx),
663 )
664 .await?
665 .with_context(|| TableNotFoundSnafu {
666 catalog: query_ctx.current_catalog(),
667 schema: query_ctx.current_schema(),
668 table: table_name,
669 })?;
670 let tag_columns = table
671 .primary_key_columns()
672 .map(|c| c.name)
673 .collect::<HashSet<_>>();
674
675 match result.data {
676 OutputData::RecordBatches(batches) => {
677 record_batches_to_series(batches, series, table_name, &tag_columns)
678 }
679 OutputData::Stream(stream) => {
680 let batches = RecordBatches::try_collect(stream)
681 .await
682 .context(CollectRecordbatchSnafu)?;
683 record_batches_to_series(batches, series, table_name, &tag_columns)
684 }
685 OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
686 reason: "expected data result, but got affected rows".to_string(),
687 location: Location::default(),
688 }),
689 }?;
690
691 if let Some(ref plan) = result.meta.plan {
692 collect_plan_metrics(plan, &mut [metrics]);
693 }
694 Ok(())
695}
696
697async fn retrieve_labels_name_from_query_result(
699 result: Result<Output>,
700 labels: &mut HashSet<String>,
701 metrics: &mut HashMap<String, u64>,
702) -> Result<()> {
703 let result = result?;
704 match result.data {
705 OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
706 OutputData::Stream(stream) => {
707 let batches = RecordBatches::try_collect(stream)
708 .await
709 .context(CollectRecordbatchSnafu)?;
710 record_batches_to_labels_name(batches, labels)
711 }
712 OutputData::AffectedRows(_) => UnexpectedResultSnafu {
713 reason: "expected data result, but got affected rows".to_string(),
714 }
715 .fail(),
716 }?;
717 if let Some(ref plan) = result.meta.plan {
718 collect_plan_metrics(plan, &mut [metrics]);
719 }
720 Ok(())
721}
722
723fn record_batches_to_series(
724 batches: RecordBatches,
725 series: &mut Vec<HashMap<Column, String>>,
726 table_name: &str,
727 tag_columns: &HashSet<String>,
728) -> Result<()> {
729 for batch in batches.iter() {
730 let projection = batch
732 .schema
733 .column_schemas()
734 .iter()
735 .enumerate()
736 .filter_map(|(idx, col)| {
737 if tag_columns.contains(&col.name) {
738 Some(idx)
739 } else {
740 None
741 }
742 })
743 .collect::<Vec<_>>();
744 let batch = batch
745 .try_project(&projection)
746 .context(CollectRecordbatchSnafu)?;
747
748 let mut writer = RowWriter::new(&batch.schema, table_name);
749 writer.write(batch, series)?;
750 }
751 Ok(())
752}
753
754struct RowWriter {
761 template: HashMap<Column, Option<String>>,
764 current: Option<HashMap<Column, Option<String>>>,
766}
767
768impl RowWriter {
769 fn new(schema: &SchemaRef, table: &str) -> Self {
770 let mut template = schema
771 .column_schemas()
772 .iter()
773 .map(|x| (x.name.as_str().into(), None))
774 .collect::<HashMap<Column, Option<String>>>();
775 template.insert("__name__".into(), Some(table.to_string()));
776 Self {
777 template,
778 current: None,
779 }
780 }
781
782 fn insert(&mut self, column: ColumnRef, value: impl ToString) {
783 let current = self.current.get_or_insert_with(|| self.template.clone());
784 match current.get_mut(&column as &dyn AsColumnRef) {
785 Some(x) => {
786 let _ = x.insert(value.to_string());
787 }
788 None => {
789 let _ = current.insert(column.0.into(), Some(value.to_string()));
790 }
791 }
792 }
793
794 fn insert_bytes(&mut self, column_schema: &ColumnSchema, bytes: &[u8]) -> Result<()> {
795 let column_name = column_schema.name.as_str().into();
796
797 if column_schema.data_type.is_json() {
798 let s = jsonb_to_string(bytes).context(ConvertScalarValueSnafu)?;
799 self.insert(column_name, s);
800 } else {
801 let hex = bytes
802 .iter()
803 .map(|b| format!("{b:02x}"))
804 .collect::<Vec<String>>()
805 .join("");
806 self.insert(column_name, hex);
807 }
808 Ok(())
809 }
810
811 fn finish(&mut self) -> HashMap<Column, String> {
812 let Some(current) = self.current.take() else {
813 return HashMap::new();
814 };
815 current
816 .into_iter()
817 .filter_map(|(k, v)| v.map(|v| (k, v)))
818 .collect()
819 }
820
821 fn write(
822 &mut self,
823 record_batch: RecordBatch,
824 series: &mut Vec<HashMap<Column, String>>,
825 ) -> Result<()> {
826 let schema = record_batch.schema.clone();
827 let record_batch = record_batch.into_df_record_batch();
828 for i in 0..record_batch.num_rows() {
829 for (j, array) in record_batch.columns().iter().enumerate() {
830 let column = schema.column_name_by_index(j).into();
831
832 if array.is_null(i) {
833 self.insert(column, "Null");
834 continue;
835 }
836
837 match array.data_type() {
838 DataType::Null => {
839 self.insert(column, "Null");
840 }
841 DataType::Boolean => {
842 let array = array.as_boolean();
843 let v = array.value(i);
844 self.insert(column, v);
845 }
846 DataType::UInt8 => {
847 let array = array.as_primitive::<UInt8Type>();
848 let v = array.value(i);
849 self.insert(column, v);
850 }
851 DataType::UInt16 => {
852 let array = array.as_primitive::<UInt16Type>();
853 let v = array.value(i);
854 self.insert(column, v);
855 }
856 DataType::UInt32 => {
857 let array = array.as_primitive::<UInt32Type>();
858 let v = array.value(i);
859 self.insert(column, v);
860 }
861 DataType::UInt64 => {
862 let array = array.as_primitive::<UInt64Type>();
863 let v = array.value(i);
864 self.insert(column, v);
865 }
866 DataType::Int8 => {
867 let array = array.as_primitive::<Int8Type>();
868 let v = array.value(i);
869 self.insert(column, v);
870 }
871 DataType::Int16 => {
872 let array = array.as_primitive::<Int16Type>();
873 let v = array.value(i);
874 self.insert(column, v);
875 }
876 DataType::Int32 => {
877 let array = array.as_primitive::<Int32Type>();
878 let v = array.value(i);
879 self.insert(column, v);
880 }
881 DataType::Int64 => {
882 let array = array.as_primitive::<Int64Type>();
883 let v = array.value(i);
884 self.insert(column, v);
885 }
886 DataType::Float32 => {
887 let array = array.as_primitive::<Float32Type>();
888 let v = array.value(i);
889 self.insert(column, v);
890 }
891 DataType::Float64 => {
892 let array = array.as_primitive::<Float64Type>();
893 let v = array.value(i);
894 self.insert(column, v);
895 }
896 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
897 let v = datatypes::arrow_array::string_array_value(array, i);
898 self.insert(column, v);
899 }
900 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
901 let v = datatypes::arrow_array::binary_array_value(array, i);
902 let column_schema = &schema.column_schemas()[j];
903 self.insert_bytes(column_schema, v)?;
904 }
905 DataType::Date32 => {
906 let array = array.as_primitive::<Date32Type>();
907 let v = Date::new(array.value(i));
908 self.insert(column, v);
909 }
910 DataType::Date64 => {
911 let array = array.as_primitive::<Date64Type>();
912 let v = Date::new((array.value(i) / 86_400_000) as i32);
916 self.insert(column, v);
917 }
918 DataType::Timestamp(_, _) => {
919 let v = datatypes::arrow_array::timestamp_array_value(array, i);
920 self.insert(column, v.to_iso8601_string());
921 }
922 DataType::Time32(_) | DataType::Time64(_) => {
923 let v = datatypes::arrow_array::time_array_value(array, i);
924 self.insert(column, v.to_iso8601_string());
925 }
926 DataType::Interval(interval_unit) => match interval_unit {
927 IntervalUnit::YearMonth => {
928 let array = array.as_primitive::<IntervalYearMonthType>();
929 let v: IntervalYearMonth = array.value(i).into();
930 self.insert(column, v.to_iso8601_string());
931 }
932 IntervalUnit::DayTime => {
933 let array = array.as_primitive::<IntervalDayTimeType>();
934 let v: IntervalDayTime = array.value(i).into();
935 self.insert(column, v.to_iso8601_string());
936 }
937 IntervalUnit::MonthDayNano => {
938 let array = array.as_primitive::<IntervalMonthDayNanoType>();
939 let v: IntervalMonthDayNano = array.value(i).into();
940 self.insert(column, v.to_iso8601_string());
941 }
942 },
943 DataType::Duration(_) => {
944 let d = datatypes::arrow_array::duration_array_value(array, i);
945 self.insert(column, d);
946 }
947 DataType::List(_) => {
948 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
949 self.insert(column, v);
950 }
951 DataType::Struct(_) => {
952 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
953 self.insert(column, v);
954 }
955 DataType::Decimal128(precision, scale) => {
956 let array = array.as_primitive::<Decimal128Type>();
957 let v = Decimal128::new(array.value(i), *precision, *scale);
958 self.insert(column, v);
959 }
960 _ => {
961 return NotSupportedSnafu {
962 feat: format!("convert {} to http value", array.data_type()),
963 }
964 .fail();
965 }
966 }
967 }
968
969 series.push(self.finish())
970 }
971 Ok(())
972 }
973}
974
975#[derive(Debug, Clone, Copy, PartialEq)]
976struct ColumnRef<'a>(&'a str);
977
978impl<'a> From<&'a str> for ColumnRef<'a> {
979 fn from(s: &'a str) -> Self {
980 Self(s)
981 }
982}
983
984trait AsColumnRef {
985 fn as_ref(&self) -> ColumnRef<'_>;
986}
987
988impl AsColumnRef for Column {
989 fn as_ref(&self) -> ColumnRef<'_> {
990 self.0.as_str().into()
991 }
992}
993
994impl AsColumnRef for ColumnRef<'_> {
995 fn as_ref(&self) -> ColumnRef<'_> {
996 *self
997 }
998}
999
1000impl<'a> PartialEq for dyn AsColumnRef + 'a {
1001 fn eq(&self, other: &Self) -> bool {
1002 self.as_ref() == other.as_ref()
1003 }
1004}
1005
1006impl<'a> Eq for dyn AsColumnRef + 'a {}
1007
1008impl<'a> Hash for dyn AsColumnRef + 'a {
1009 fn hash<H: Hasher>(&self, state: &mut H) {
1010 self.as_ref().0.hash(state);
1011 }
1012}
1013
1014impl<'a> Borrow<dyn AsColumnRef + 'a> for Column {
1015 fn borrow(&self) -> &(dyn AsColumnRef + 'a) {
1016 self
1017 }
1018}
1019
1020fn record_batches_to_labels_name(
1022 batches: RecordBatches,
1023 labels: &mut HashSet<String>,
1024) -> Result<()> {
1025 let mut column_indices = Vec::new();
1026 let mut field_column_indices = Vec::new();
1027 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
1028 if let ConcreteDataType::Float64(_) = column.data_type {
1029 field_column_indices.push(i);
1030 }
1031 column_indices.push(i);
1032 }
1033
1034 if field_column_indices.is_empty() {
1035 return Err(Error::Internal {
1036 err_msg: "no value column found".to_string(),
1037 });
1038 }
1039
1040 for batch in batches.iter() {
1041 let names = column_indices
1042 .iter()
1043 .map(|c| batches.schema().column_name_by_index(*c).to_string())
1044 .collect::<Vec<_>>();
1045
1046 let field_columns = field_column_indices
1047 .iter()
1048 .map(|i| {
1049 let column = batch.column(*i);
1050 column.as_primitive::<Float64Type>()
1051 })
1052 .collect::<Vec<_>>();
1053
1054 for row_index in 0..batch.num_rows() {
1055 if field_columns.iter().all(|c| c.is_null(row_index)) {
1057 continue;
1058 }
1059
1060 names.iter().for_each(|name| {
1062 let _ = labels.insert(name.clone());
1063 });
1064 return Ok(());
1065 }
1066 }
1067 Ok(())
1068}
1069
1070pub(crate) fn retrieve_metric_name_and_result_type(
1071 promql: &str,
1072) -> Result<(Option<String>, ValueType)> {
1073 let promql_expr = promql_parser::parser::parse(promql)
1074 .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
1075 let metric_name = promql_expr_to_metric_name(&promql_expr);
1076 let result_type = promql_expr.value_type();
1077
1078 Ok((metric_name, result_type))
1079}
1080
1081pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
1084 if let Some(db) = db {
1085 parse_catalog_and_schema_from_db_string(db)
1086 } else {
1087 (
1088 ctx.current_catalog().to_string(),
1089 ctx.current_schema().clone(),
1090 )
1091 }
1092}
1093
1094pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
1096 if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
1097 ctx.set_current_catalog(catalog);
1098 ctx.set_current_schema(schema);
1099 }
1100}
1101
1102fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
1103 let mut metric_names = HashSet::new();
1104 collect_metric_names(expr, &mut metric_names);
1105
1106 if metric_names.len() == 1 {
1108 metric_names.into_iter().next()
1109 } else {
1110 None
1111 }
1112}
1113
1114fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
1116 match expr {
1117 PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
1118 match modifier {
1119 Some(LabelModifier::Include(labels))
1120 if !labels.labels.contains(&METRIC_NAME.to_string()) =>
1121 {
1122 metric_names.clear();
1123 return;
1124 }
1125 Some(LabelModifier::Exclude(labels))
1126 if labels.labels.contains(&METRIC_NAME.to_string()) =>
1127 {
1128 metric_names.clear();
1129 return;
1130 }
1131 _ => {}
1132 }
1133 collect_metric_names(expr, metric_names)
1134 }
1135 PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
1136 PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
1137 if matches!(
1138 op.id(),
1139 token::T_LAND | token::T_LOR | token::T_LUNLESS ) {
1143 collect_metric_names(lhs, metric_names)
1144 } else {
1145 metric_names.clear()
1146 }
1147 }
1148 PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
1149 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
1150 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1151 if let Some(name) = name {
1152 metric_names.insert(name.clone());
1153 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1154 metric_names.insert(matcher.value);
1155 }
1156 }
1157 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1158 let VectorSelector { name, matchers, .. } = vs;
1159 if let Some(name) = name {
1160 metric_names.insert(name.clone());
1161 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1162 metric_names.insert(matcher.value);
1163 }
1164 }
1165 PromqlExpr::Call(Call { args, .. }) => {
1166 args.args
1167 .iter()
1168 .for_each(|e| collect_metric_names(e, metric_names));
1169 }
1170 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1171 }
1172}
1173
1174fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
1175where
1176 F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
1177{
1178 match expr {
1179 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1180 PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
1181 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1182 find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
1183 }
1184 PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
1185 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1186 PromqlExpr::NumberLiteral(_) => None,
1187 PromqlExpr::StringLiteral(_) => None,
1188 PromqlExpr::Extension(_) => None,
1189 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
1190 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1191 let VectorSelector { name, matchers, .. } = vs;
1192
1193 f(name, matchers)
1194 }
1195 PromqlExpr::Call(Call { args, .. }) => args
1196 .args
1197 .iter()
1198 .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
1199 }
1200}
1201
1202fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
1204 find_metric_name_and_matchers(expr, |name, matchers| {
1205 if name.is_some() {
1207 return None;
1208 }
1209
1210 Some(matchers.find_matchers(METRIC_NAME))
1212 })
1213 .map(|matchers| {
1214 matchers
1215 .into_iter()
1216 .filter(|m| !matches!(m.op, MatchOp::Equal))
1217 .collect::<Vec<_>>()
1218 })
1219}
1220
1221fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
1224 match expr {
1225 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
1226 update_metric_name_matcher(expr, metric_name)
1227 }
1228 PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1229 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1230 update_metric_name_matcher(lhs, metric_name);
1231 update_metric_name_matcher(rhs, metric_name);
1232 }
1233 PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1234 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
1235 update_metric_name_matcher(expr, metric_name)
1236 }
1237 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1238 if name.is_some() {
1239 return;
1240 }
1241
1242 for m in &mut matchers.matchers {
1243 if m.name == METRIC_NAME {
1244 m.op = MatchOp::Equal;
1245 m.value = metric_name.to_string();
1246 }
1247 }
1248 }
1249 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1250 let VectorSelector { name, matchers, .. } = vs;
1251 if name.is_some() {
1252 return;
1253 }
1254
1255 for m in &mut matchers.matchers {
1256 if m.name == METRIC_NAME {
1257 m.op = MatchOp::Equal;
1258 m.value = metric_name.to_string();
1259 }
1260 }
1261 }
1262 PromqlExpr::Call(Call { args, .. }) => {
1263 args.args.iter_mut().for_each(|e| {
1264 update_metric_name_matcher(e, metric_name);
1265 });
1266 }
1267 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1268 }
1269}
1270
1271#[derive(Debug, Default, Serialize, Deserialize)]
1272pub struct LabelValueQuery {
1273 start: Option<String>,
1274 end: Option<String>,
1275 lookback: Option<String>,
1276 #[serde(flatten)]
1277 matches: Matches,
1278 db: Option<String>,
1279 limit: Option<usize>,
1280}
1281
1282#[axum_macros::debug_handler]
1283#[tracing::instrument(
1284 skip_all,
1285 fields(protocol = "prometheus", request_type = "label_values_query")
1286)]
1287pub async fn label_values_query(
1288 State(handler): State<PrometheusHandlerRef>,
1289 Path(label_name): Path<String>,
1290 Extension(mut query_ctx): Extension<QueryContext>,
1291 Query(params): Query<LabelValueQuery>,
1292) -> PrometheusJsonResponse {
1293 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
1294 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1295 let query_ctx = Arc::new(query_ctx);
1296
1297 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1298 .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1299 .start_timer();
1300
1301 if label_name == METRIC_NAME_LABEL {
1302 let catalog_manager = handler.catalog_manager();
1303
1304 let mut table_names = try_call_return_response!(
1305 retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1306 );
1307
1308 truncate_results(&mut table_names, params.limit);
1309 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1310 } else if label_name == FIELD_NAME_LABEL {
1311 let field_columns = handle_schema_err!(
1312 retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1313 )
1314 .unwrap_or_default();
1315 let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1316 field_columns.sort_unstable();
1317 truncate_results(&mut field_columns, params.limit);
1318 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1319 } else if is_database_selection_label(&label_name) {
1320 let catalog_manager = handler.catalog_manager();
1321
1322 let mut schema_names = try_call_return_response!(
1323 retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1324 );
1325 truncate_results(&mut schema_names, params.limit);
1326 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1327 }
1328
1329 let queries = params.matches.0;
1330 if queries.is_empty() {
1331 return PrometheusJsonResponse::error(
1332 StatusCode::InvalidArguments,
1333 "match[] parameter is required",
1334 );
1335 }
1336
1337 let start = params.start.unwrap_or_else(yesterday_rfc3339);
1338 let end = params.end.unwrap_or_else(current_time_rfc3339);
1339 let mut label_values = HashSet::new();
1340
1341 let start = try_call_return_response!(
1342 QueryLanguageParser::parse_promql_timestamp(&start)
1343 .context(ParseTimestampSnafu { timestamp: &start })
1344 );
1345 let end = try_call_return_response!(
1346 QueryLanguageParser::parse_promql_timestamp(&end)
1347 .context(ParseTimestampSnafu { timestamp: &end })
1348 );
1349
1350 for query in queries {
1351 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1352 let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1353 return PrometheusJsonResponse::error(
1354 StatusCode::InvalidArguments,
1355 "expected vector selector",
1356 );
1357 };
1358 let Some(name) = take_metric_name(&mut vector_selector) else {
1359 return PrometheusJsonResponse::error(
1360 StatusCode::InvalidArguments,
1361 "expected metric name",
1362 );
1363 };
1364 let VectorSelector { matchers, .. } = vector_selector;
1365 let matchers = matchers.matchers;
1367 let result = handler
1368 .query_label_values(name, label_name.clone(), matchers, start, end, &query_ctx)
1369 .await;
1370 if let Some(result) = handle_schema_err!(result) {
1371 label_values.extend(result.into_iter());
1372 }
1373 }
1374
1375 let mut label_values: Vec<_> = label_values.into_iter().collect();
1376 label_values.sort_unstable();
1377 truncate_results(&mut label_values, params.limit);
1378
1379 PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1380}
1381
1382fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1383 if let Some(limit) = limit
1384 && limit > 0
1385 && label_values.len() >= limit
1386 {
1387 label_values.truncate(limit);
1388 }
1389}
1390
1391fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1394 if let Some(name) = selector.name.take() {
1395 return Some(name);
1396 }
1397
1398 let (pos, matcher) = selector
1399 .matchers
1400 .matchers
1401 .iter()
1402 .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1403 let name = matcher.value.clone();
1404 selector.matchers.matchers.remove(pos);
1406
1407 Some(name)
1408}
1409
1410async fn retrieve_table_names(
1411 query_ctx: &QueryContext,
1412 catalog_manager: CatalogManagerRef,
1413 matches: Vec<String>,
1414) -> Result<Vec<String>> {
1415 let catalog = query_ctx.current_catalog();
1416 let schema = query_ctx.current_schema();
1417
1418 let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1419 let mut table_names = Vec::new();
1420
1421 let name_matcher = matches
1423 .first()
1424 .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1425 .and_then(|expr| {
1426 if let PromqlExpr::VectorSelector(vector_selector) = expr {
1427 let matchers = vector_selector.matchers.matchers;
1428 for matcher in matchers {
1429 if matcher.name == METRIC_NAME_LABEL {
1430 return Some(matcher);
1431 }
1432 }
1433
1434 None
1435 } else {
1436 None
1437 }
1438 });
1439
1440 while let Some(table) = tables_stream.next().await {
1441 let table = table?;
1442 if !table
1443 .table_info()
1444 .meta
1445 .options
1446 .extra_options
1447 .contains_key(LOGICAL_TABLE_METADATA_KEY)
1448 {
1449 continue;
1451 }
1452
1453 let table_name = &table.table_info().name;
1454
1455 if let Some(matcher) = &name_matcher {
1456 match &matcher.op {
1457 MatchOp::Equal => {
1458 if table_name == &matcher.value {
1459 table_names.push(table_name.clone());
1460 }
1461 }
1462 MatchOp::Re(reg) => {
1463 if reg.is_match(table_name) {
1464 table_names.push(table_name.clone());
1465 }
1466 }
1467 _ => {
1468 table_names.push(table_name.clone());
1471 }
1472 }
1473 } else {
1474 table_names.push(table_name.clone());
1475 }
1476 }
1477
1478 table_names.sort_unstable();
1479 Ok(table_names)
1480}
1481
1482async fn retrieve_field_names(
1483 query_ctx: &QueryContext,
1484 manager: CatalogManagerRef,
1485 matches: Vec<String>,
1486) -> Result<HashSet<String>> {
1487 let mut field_columns = HashSet::new();
1488 let catalog = query_ctx.current_catalog();
1489 let schema = query_ctx.current_schema();
1490
1491 if matches.is_empty() {
1492 while let Some(table) = manager
1494 .tables(catalog, &schema, Some(query_ctx))
1495 .next()
1496 .await
1497 {
1498 let table = table?;
1499 for column in table.field_columns() {
1500 field_columns.insert(column.name);
1501 }
1502 }
1503 return Ok(field_columns);
1504 }
1505
1506 for table_name in matches {
1507 let table = manager
1508 .table(catalog, &schema, &table_name, Some(query_ctx))
1509 .await?
1510 .with_context(|| TableNotFoundSnafu {
1511 catalog: catalog.to_string(),
1512 schema: schema.clone(),
1513 table: table_name.clone(),
1514 })?;
1515
1516 for column in table.field_columns() {
1517 field_columns.insert(column.name);
1518 }
1519 }
1520 Ok(field_columns)
1521}
1522
1523async fn retrieve_schema_names(
1524 query_ctx: &QueryContext,
1525 catalog_manager: CatalogManagerRef,
1526 matches: Vec<String>,
1527) -> Result<Vec<String>> {
1528 let mut schemas = Vec::new();
1529 let catalog = query_ctx.current_catalog();
1530
1531 let candidate_schemas = catalog_manager
1532 .schema_names(catalog, Some(query_ctx))
1533 .await?;
1534
1535 for schema in candidate_schemas {
1536 let mut found = true;
1537 for match_item in &matches {
1538 if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1539 let exists = catalog_manager
1540 .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1541 .await?;
1542 if !exists {
1543 found = false;
1544 break;
1545 }
1546 }
1547 }
1548
1549 if found {
1550 schemas.push(schema);
1551 }
1552 }
1553
1554 schemas.sort_unstable();
1555
1556 Ok(schemas)
1557}
1558
1559fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1564 let promql_expr = promql_parser::parser::parse(query).ok()?;
1565 promql_expr_to_metric_name(&promql_expr)
1566}
1567
1568#[derive(Debug, Default, Serialize, Deserialize)]
1569pub struct SeriesQuery {
1570 start: Option<String>,
1571 end: Option<String>,
1572 lookback: Option<String>,
1573 #[serde(flatten)]
1574 matches: Matches,
1575 db: Option<String>,
1576}
1577
1578#[axum_macros::debug_handler]
1579#[tracing::instrument(
1580 skip_all,
1581 fields(protocol = "prometheus", request_type = "series_query")
1582)]
1583pub async fn series_query(
1584 State(handler): State<PrometheusHandlerRef>,
1585 Query(params): Query<SeriesQuery>,
1586 Extension(mut query_ctx): Extension<QueryContext>,
1587 Form(form_params): Form<SeriesQuery>,
1588) -> PrometheusJsonResponse {
1589 let mut queries: Vec<String> = params.matches.0;
1590 if queries.is_empty() {
1591 queries = form_params.matches.0;
1592 }
1593 if queries.is_empty() {
1594 return PrometheusJsonResponse::error(
1595 StatusCode::Unsupported,
1596 "match[] parameter is required",
1597 );
1598 }
1599 let start = params
1600 .start
1601 .or(form_params.start)
1602 .unwrap_or_else(yesterday_rfc3339);
1603 let end = params
1604 .end
1605 .or(form_params.end)
1606 .unwrap_or_else(current_time_rfc3339);
1607 let lookback = params
1608 .lookback
1609 .or(form_params.lookback)
1610 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1611
1612 if let Some(db) = ¶ms.db {
1614 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1615 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1616 }
1617 let query_ctx = Arc::new(query_ctx);
1618
1619 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1620 .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1621 .start_timer();
1622
1623 let mut series = Vec::new();
1624 let mut merge_map = HashMap::new();
1625 for query in queries {
1626 let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1627 let prom_query = PromQuery {
1628 query,
1629 start: start.clone(),
1630 end: end.clone(),
1631 step: DEFAULT_LOOKBACK_STRING.to_string(),
1633 lookback: lookback.clone(),
1634 alias: None,
1635 };
1636 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1637
1638 handle_schema_err!(
1639 retrieve_series_from_query_result(
1640 result,
1641 &mut series,
1642 &query_ctx,
1643 &table_name,
1644 &handler.catalog_manager(),
1645 &mut merge_map,
1646 )
1647 .await
1648 );
1649 }
1650 let merge_map = merge_map
1651 .into_iter()
1652 .map(|(k, v)| (k, Value::from(v)))
1653 .collect();
1654 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1655 resp.resp_metrics = merge_map;
1656 resp
1657}
1658
1659#[derive(Debug, Default, Serialize, Deserialize)]
1660pub struct ParseQuery {
1661 query: Option<String>,
1662 db: Option<String>,
1663}
1664
1665#[axum_macros::debug_handler]
1666#[tracing::instrument(
1667 skip_all,
1668 fields(protocol = "prometheus", request_type = "parse_query")
1669)]
1670pub async fn parse_query(
1671 State(_handler): State<PrometheusHandlerRef>,
1672 Query(params): Query<ParseQuery>,
1673 Extension(_query_ctx): Extension<QueryContext>,
1674 Form(form_params): Form<ParseQuery>,
1675) -> PrometheusJsonResponse {
1676 if let Some(query) = params.query.or(form_params.query) {
1677 let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1678 PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1679 } else {
1680 PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1681 }
1682}
1683
1684#[cfg(test)]
1685mod tests {
1686 use promql_parser::parser::value::ValueType;
1687
1688 use super::*;
1689
1690 struct TestCase {
1691 name: &'static str,
1692 promql: &'static str,
1693 expected_metric: Option<&'static str>,
1694 expected_type: ValueType,
1695 should_error: bool,
1696 }
1697
1698 #[test]
1699 fn test_retrieve_metric_name_and_result_type() {
1700 let test_cases = &[
1701 TestCase {
1703 name: "simple metric",
1704 promql: "cpu_usage",
1705 expected_metric: Some("cpu_usage"),
1706 expected_type: ValueType::Vector,
1707 should_error: false,
1708 },
1709 TestCase {
1710 name: "metric with selector",
1711 promql: r#"cpu_usage{instance="localhost"}"#,
1712 expected_metric: Some("cpu_usage"),
1713 expected_type: ValueType::Vector,
1714 should_error: false,
1715 },
1716 TestCase {
1717 name: "metric with range selector",
1718 promql: "cpu_usage[5m]",
1719 expected_metric: Some("cpu_usage"),
1720 expected_type: ValueType::Matrix,
1721 should_error: false,
1722 },
1723 TestCase {
1724 name: "metric with __name__ matcher",
1725 promql: r#"{__name__="cpu_usage"}"#,
1726 expected_metric: Some("cpu_usage"),
1727 expected_type: ValueType::Vector,
1728 should_error: false,
1729 },
1730 TestCase {
1731 name: "metric with unary operator",
1732 promql: "-cpu_usage",
1733 expected_metric: None,
1734 expected_type: ValueType::Vector,
1735 should_error: false,
1736 },
1737 TestCase {
1739 name: "metric with aggregation",
1740 promql: "sum(cpu_usage)",
1741 expected_metric: Some("cpu_usage"),
1742 expected_type: ValueType::Vector,
1743 should_error: false,
1744 },
1745 TestCase {
1746 name: "complex aggregation",
1747 promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1748 expected_metric: None,
1749 expected_type: ValueType::Vector,
1750 should_error: false,
1751 },
1752 TestCase {
1753 name: "complex aggregation",
1754 promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1755 expected_metric: Some("cpu_usage"),
1756 expected_type: ValueType::Vector,
1757 should_error: false,
1758 },
1759 TestCase {
1760 name: "complex aggregation",
1761 promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1762 expected_metric: Some("cpu_usage"),
1763 expected_type: ValueType::Vector,
1764 should_error: false,
1765 },
1766 TestCase {
1768 name: "same metric addition",
1769 promql: "cpu_usage + cpu_usage",
1770 expected_metric: None,
1771 expected_type: ValueType::Vector,
1772 should_error: false,
1773 },
1774 TestCase {
1775 name: "metric with scalar addition",
1776 promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1777 expected_metric: None,
1778 expected_type: ValueType::Vector,
1779 should_error: false,
1780 },
1781 TestCase {
1783 name: "different metrics addition",
1784 promql: "cpu_usage + memory_usage",
1785 expected_metric: None,
1786 expected_type: ValueType::Vector,
1787 should_error: false,
1788 },
1789 TestCase {
1790 name: "different metrics subtraction",
1791 promql: "network_in - network_out",
1792 expected_metric: None,
1793 expected_type: ValueType::Vector,
1794 should_error: false,
1795 },
1796 TestCase {
1798 name: "unless with different metrics",
1799 promql: "cpu_usage unless memory_usage",
1800 expected_metric: Some("cpu_usage"),
1801 expected_type: ValueType::Vector,
1802 should_error: false,
1803 },
1804 TestCase {
1805 name: "unless with same metric",
1806 promql: "cpu_usage unless cpu_usage",
1807 expected_metric: Some("cpu_usage"),
1808 expected_type: ValueType::Vector,
1809 should_error: false,
1810 },
1811 TestCase {
1813 name: "basic subquery",
1814 promql: "cpu_usage[5m:1m]",
1815 expected_metric: Some("cpu_usage"),
1816 expected_type: ValueType::Matrix,
1817 should_error: false,
1818 },
1819 TestCase {
1820 name: "subquery with multiple metrics",
1821 promql: "(cpu_usage + memory_usage)[5m:1m]",
1822 expected_metric: None,
1823 expected_type: ValueType::Matrix,
1824 should_error: false,
1825 },
1826 TestCase {
1828 name: "scalar value",
1829 promql: "42",
1830 expected_metric: None,
1831 expected_type: ValueType::Scalar,
1832 should_error: false,
1833 },
1834 TestCase {
1835 name: "string literal",
1836 promql: r#""hello world""#,
1837 expected_metric: None,
1838 expected_type: ValueType::String,
1839 should_error: false,
1840 },
1841 TestCase {
1843 name: "invalid syntax",
1844 promql: "cpu_usage{invalid=",
1845 expected_metric: None,
1846 expected_type: ValueType::Vector,
1847 should_error: true,
1848 },
1849 TestCase {
1850 name: "empty query",
1851 promql: "",
1852 expected_metric: None,
1853 expected_type: ValueType::Vector,
1854 should_error: true,
1855 },
1856 TestCase {
1857 name: "malformed brackets",
1858 promql: "cpu_usage[5m",
1859 expected_metric: None,
1860 expected_type: ValueType::Vector,
1861 should_error: true,
1862 },
1863 ];
1864
1865 for test_case in test_cases {
1866 let result = retrieve_metric_name_and_result_type(test_case.promql);
1867
1868 if test_case.should_error {
1869 assert!(
1870 result.is_err(),
1871 "Test '{}' should have failed but succeeded with: {:?}",
1872 test_case.name,
1873 result
1874 );
1875 } else {
1876 let (metric_name, value_type) = result.unwrap_or_else(|e| {
1877 panic!(
1878 "Test '{}' should have succeeded but failed with error: {}",
1879 test_case.name, e
1880 )
1881 });
1882
1883 let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1884 assert_eq!(
1885 metric_name, expected_metric_name,
1886 "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1887 test_case.name, expected_metric_name, metric_name
1888 );
1889
1890 assert_eq!(
1891 value_type, test_case.expected_type,
1892 "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1893 test_case.name, test_case.expected_type, value_type
1894 );
1895 }
1896 }
1897 }
1898}