1use std::borrow::Borrow;
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::hash::{Hash, Hasher};
20use std::sync::Arc;
21
22use arrow::array::{Array, AsArray};
23use arrow::datatypes::{
24 Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
25 Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
26 UInt8Type, UInt16Type, UInt32Type, UInt64Type,
27};
28use arrow_schema::{DataType, IntervalUnit};
29use axum::extract::{Path, Query, State};
30use axum::{Extension, Form};
31use catalog::CatalogManagerRef;
32use common_catalog::parse_catalog_and_schema_from_db_string;
33use common_decimal::Decimal128;
34use common_error::ext::ErrorExt;
35use common_error::status_code::StatusCode;
36use common_query::{Output, OutputData};
37use common_recordbatch::{RecordBatch, RecordBatches};
38use common_telemetry::{debug, tracing};
39use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
40use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
41use common_version::OwnedBuildInfo;
42use datafusion_common::ScalarValue;
43use datatypes::prelude::ConcreteDataType;
44use datatypes::schema::{ColumnSchema, SchemaRef};
45use datatypes::types::jsonb_to_string;
46use futures::StreamExt;
47use futures::future::join_all;
48use itertools::Itertools;
49use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
50use promql_parser::parser::token::{self};
51use promql_parser::parser::value::ValueType;
52use promql_parser::parser::{
53 AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
54 SubqueryExpr, UnaryExpr, VectorSelector,
55};
56use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
57use serde::de::{self, MapAccess, Visitor};
58use serde::{Deserialize, Serialize};
59use serde_json::Value;
60use session::context::{QueryContext, QueryContextRef};
61use snafu::{Location, OptionExt, ResultExt};
62use store_api::metric_engine_consts::{
63 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
64};
65
66pub use super::result::prometheus_resp::PrometheusJsonResponse;
67use crate::error::{
68 CatalogSnafu, CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error,
69 InvalidQuerySnafu, NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu,
70 UnexpectedResultSnafu,
71};
72use crate::http::header::collect_plan_metrics;
73use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
74use crate::prometheus_handler::PrometheusHandlerRef;
75
76#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
78pub struct PromSeriesVector {
79 pub metric: BTreeMap<String, String>,
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub value: Option<(f64, String)>,
82}
83
84#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
86pub struct PromSeriesMatrix {
87 pub metric: BTreeMap<String, String>,
88 pub values: Vec<(f64, String)>,
89}
90
91#[derive(Debug, Serialize, Deserialize, PartialEq)]
93#[serde(untagged)]
94pub enum PromQueryResult {
95 Matrix(Vec<PromSeriesMatrix>),
96 Vector(Vec<PromSeriesVector>),
97 Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
98 String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
99}
100
101impl Default for PromQueryResult {
102 fn default() -> Self {
103 PromQueryResult::Matrix(Default::default())
104 }
105}
106
107#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
108pub struct PromData {
109 #[serde(rename = "resultType")]
110 pub result_type: String,
111 pub result: PromQueryResult,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
117pub struct Column(Arc<String>);
118
119impl From<&str> for Column {
120 fn from(s: &str) -> Self {
121 Self(Arc::new(s.to_string()))
122 }
123}
124
125#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
126#[serde(untagged)]
127pub enum PrometheusResponse {
128 PromData(PromData),
129 Labels(Vec<String>),
130 Series(Vec<HashMap<Column, String>>),
131 LabelValues(Vec<String>),
132 FormatQuery(String),
133 BuildInfo(OwnedBuildInfo),
134 #[serde(skip_deserializing)]
135 ParseResult(promql_parser::parser::Expr),
136 #[default]
137 None,
138}
139
140impl PrometheusResponse {
141 fn append(&mut self, other: PrometheusResponse) {
145 match (self, other) {
146 (
147 PrometheusResponse::PromData(PromData {
148 result: PromQueryResult::Matrix(lhs),
149 ..
150 }),
151 PrometheusResponse::PromData(PromData {
152 result: PromQueryResult::Matrix(rhs),
153 ..
154 }),
155 ) => {
156 lhs.extend(rhs);
157 }
158
159 (
160 PrometheusResponse::PromData(PromData {
161 result: PromQueryResult::Vector(lhs),
162 ..
163 }),
164 PrometheusResponse::PromData(PromData {
165 result: PromQueryResult::Vector(rhs),
166 ..
167 }),
168 ) => {
169 lhs.extend(rhs);
170 }
171 _ => {
172 }
174 }
175 }
176
177 pub fn is_none(&self) -> bool {
178 matches!(self, PrometheusResponse::None)
179 }
180}
181
182#[derive(Debug, Default, Serialize, Deserialize)]
183pub struct FormatQuery {
184 query: Option<String>,
185}
186
187#[axum_macros::debug_handler]
188#[tracing::instrument(
189 skip_all,
190 fields(protocol = "prometheus", request_type = "format_query")
191)]
192pub async fn format_query(
193 State(_handler): State<PrometheusHandlerRef>,
194 Query(params): Query<InstantQuery>,
195 Extension(_query_ctx): Extension<QueryContext>,
196 Form(form_params): Form<InstantQuery>,
197) -> PrometheusJsonResponse {
198 let query = params.query.or(form_params.query).unwrap_or_default();
199 match promql_parser::parser::parse(&query) {
200 Ok(expr) => {
201 let pretty = expr.prettify();
202 PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
203 }
204 Err(reason) => {
205 let err = InvalidQuerySnafu { reason }.build();
206 PrometheusJsonResponse::error(err.status_code(), err.output_msg())
207 }
208 }
209}
210
211#[derive(Debug, Default, Serialize, Deserialize)]
212pub struct BuildInfoQuery {}
213
214#[axum_macros::debug_handler]
215#[tracing::instrument(
216 skip_all,
217 fields(protocol = "prometheus", request_type = "build_info_query")
218)]
219pub async fn build_info_query() -> PrometheusJsonResponse {
220 let build_info = common_version::build_info().clone();
221 PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
222}
223
224#[derive(Debug, Default, Serialize, Deserialize)]
225pub struct InstantQuery {
226 query: Option<String>,
227 lookback: Option<String>,
228 time: Option<String>,
229 timeout: Option<String>,
230 db: Option<String>,
231}
232
233macro_rules! try_call_return_response {
236 ($handle: expr) => {
237 match $handle {
238 Ok(res) => res,
239 Err(err) => {
240 let msg = err.to_string();
241 return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
242 }
243 }
244 };
245}
246
247#[axum_macros::debug_handler]
248#[tracing::instrument(
249 skip_all,
250 fields(protocol = "prometheus", request_type = "instant_query")
251)]
252pub async fn instant_query(
253 State(handler): State<PrometheusHandlerRef>,
254 Query(params): Query<InstantQuery>,
255 Extension(mut query_ctx): Extension<QueryContext>,
256 Form(form_params): Form<InstantQuery>,
257) -> PrometheusJsonResponse {
258 let time = params
260 .time
261 .or(form_params.time)
262 .unwrap_or_else(current_time_rfc3339);
263 let prom_query = PromQuery {
264 query: params.query.or(form_params.query).unwrap_or_default(),
265 start: time.clone(),
266 end: time,
267 step: "1s".to_string(),
268 lookback: params
269 .lookback
270 .or(form_params.lookback)
271 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
272 alias: None,
273 };
274
275 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
276
277 if let Some(db) = ¶ms.db {
279 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
280 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
281 }
282 let query_ctx = Arc::new(query_ctx);
283
284 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
285 .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
286 .start_timer();
287
288 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
289 && !name_matchers.is_empty()
290 {
291 debug!("Find metric name matchers: {:?}", name_matchers);
292
293 let metric_names =
294 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
295
296 debug!("Find metric names: {:?}", metric_names);
297
298 if metric_names.is_empty() {
299 let result_type = promql_expr.value_type();
300
301 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
302 result_type: result_type.to_string(),
303 ..Default::default()
304 }));
305 }
306
307 let responses = join_all(metric_names.into_iter().map(|metric| {
308 let mut prom_query = prom_query.clone();
309 let mut promql_expr = promql_expr.clone();
310 let query_ctx = query_ctx.clone();
311 let handler = handler.clone();
312
313 async move {
314 update_metric_name_matcher(&mut promql_expr, &metric);
315 let new_query = promql_expr.to_string();
316 debug!(
317 "Updated promql, before: {}, after: {}",
318 &prom_query.query, new_query
319 );
320 prom_query.query = new_query;
321
322 do_instant_query(&handler, &prom_query, query_ctx).await
323 }
324 }))
325 .await;
326
327 responses
328 .into_iter()
329 .reduce(|mut acc, resp| {
330 acc.data.append(resp.data);
331 acc
332 })
333 .unwrap()
334 } else {
335 do_instant_query(&handler, &prom_query, query_ctx).await
336 }
337}
338
339async fn do_instant_query(
341 handler: &PrometheusHandlerRef,
342 prom_query: &PromQuery,
343 query_ctx: QueryContextRef,
344) -> PrometheusJsonResponse {
345 let result = handler.do_query(prom_query, query_ctx).await;
346 let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
347 Ok((metric_name, result_type)) => (metric_name, result_type),
348 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
349 };
350 PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
351}
352
353#[derive(Debug, Default, Serialize, Deserialize)]
354pub struct RangeQuery {
355 query: Option<String>,
356 start: Option<String>,
357 end: Option<String>,
358 step: Option<String>,
359 lookback: Option<String>,
360 timeout: Option<String>,
361 db: Option<String>,
362}
363
364#[axum_macros::debug_handler]
365#[tracing::instrument(
366 skip_all,
367 fields(protocol = "prometheus", request_type = "range_query")
368)]
369pub async fn range_query(
370 State(handler): State<PrometheusHandlerRef>,
371 Query(params): Query<RangeQuery>,
372 Extension(mut query_ctx): Extension<QueryContext>,
373 Form(form_params): Form<RangeQuery>,
374) -> PrometheusJsonResponse {
375 let prom_query = PromQuery {
376 query: params.query.or(form_params.query).unwrap_or_default(),
377 start: params.start.or(form_params.start).unwrap_or_default(),
378 end: params.end.or(form_params.end).unwrap_or_default(),
379 step: params.step.or(form_params.step).unwrap_or_default(),
380 lookback: params
381 .lookback
382 .or(form_params.lookback)
383 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
384 alias: None,
385 };
386
387 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
388
389 if let Some(db) = ¶ms.db {
391 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
392 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
393 }
394 let query_ctx = Arc::new(query_ctx);
395 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
396 .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
397 .start_timer();
398
399 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
400 && !name_matchers.is_empty()
401 {
402 debug!("Find metric name matchers: {:?}", name_matchers);
403
404 let metric_names =
405 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
406
407 debug!("Find metric names: {:?}", metric_names);
408
409 if metric_names.is_empty() {
410 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
411 result_type: ValueType::Matrix.to_string(),
412 ..Default::default()
413 }));
414 }
415
416 let responses = join_all(metric_names.into_iter().map(|metric| {
417 let mut prom_query = prom_query.clone();
418 let mut promql_expr = promql_expr.clone();
419 let query_ctx = query_ctx.clone();
420 let handler = handler.clone();
421
422 async move {
423 update_metric_name_matcher(&mut promql_expr, &metric);
424 let new_query = promql_expr.to_string();
425 debug!(
426 "Updated promql, before: {}, after: {}",
427 &prom_query.query, new_query
428 );
429 prom_query.query = new_query;
430
431 do_range_query(&handler, &prom_query, query_ctx).await
432 }
433 }))
434 .await;
435
436 responses
438 .into_iter()
439 .reduce(|mut acc, resp| {
440 acc.data.append(resp.data);
441 acc
442 })
443 .unwrap()
444 } else {
445 do_range_query(&handler, &prom_query, query_ctx).await
446 }
447}
448
449async fn do_range_query(
451 handler: &PrometheusHandlerRef,
452 prom_query: &PromQuery,
453 query_ctx: QueryContextRef,
454) -> PrometheusJsonResponse {
455 let result = handler.do_query(prom_query, query_ctx).await;
456 let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
457 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
458 Ok((metric_name, _)) => metric_name,
459 };
460 PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
461}
462
463#[derive(Debug, Default, Serialize)]
464struct Matches(Vec<String>);
465
466#[derive(Debug, Default, Serialize, Deserialize)]
467pub struct LabelsQuery {
468 start: Option<String>,
469 end: Option<String>,
470 lookback: Option<String>,
471 #[serde(flatten)]
472 matches: Matches,
473 db: Option<String>,
474}
475
476impl<'de> Deserialize<'de> for Matches {
478 fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
479 where
480 D: de::Deserializer<'de>,
481 {
482 struct MatchesVisitor;
483
484 impl<'d> Visitor<'d> for MatchesVisitor {
485 type Value = Vec<String>;
486
487 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
488 formatter.write_str("a string")
489 }
490
491 fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
492 where
493 M: MapAccess<'d>,
494 {
495 let mut matches = Vec::new();
496 while let Some((key, value)) = access.next_entry::<String, String>()? {
497 if key == "match[]" {
498 matches.push(value);
499 }
500 }
501 Ok(matches)
502 }
503 }
504 Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
505 }
506}
507
508macro_rules! handle_schema_err {
515 ($result:expr) => {
516 match $result {
517 Ok(v) => Some(v),
518 Err(err) => {
519 if err.status_code() == StatusCode::TableNotFound
520 || err.status_code() == StatusCode::TableColumnNotFound
521 {
522 None
524 } else {
525 return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
526 }
527 }
528 }
529 };
530}
531
532#[axum_macros::debug_handler]
533#[tracing::instrument(
534 skip_all,
535 fields(protocol = "prometheus", request_type = "labels_query")
536)]
537pub async fn labels_query(
538 State(handler): State<PrometheusHandlerRef>,
539 Query(params): Query<LabelsQuery>,
540 Extension(mut query_ctx): Extension<QueryContext>,
541 Form(form_params): Form<LabelsQuery>,
542) -> PrometheusJsonResponse {
543 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
544 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
545 let query_ctx = Arc::new(query_ctx);
546
547 let mut queries = params.matches.0;
548 if queries.is_empty() {
549 queries = form_params.matches.0;
550 }
551
552 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
553 .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
554 .start_timer();
555
556 let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
558 {
559 Ok(labels) => labels,
560 Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
561 };
562 let _ = labels.insert(METRIC_NAME.to_string());
564
565 if queries.is_empty() {
567 let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
568 labels_vec.sort_unstable();
569 return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
570 }
571
572 let start = params
574 .start
575 .or(form_params.start)
576 .unwrap_or_else(yesterday_rfc3339);
577 let end = params
578 .end
579 .or(form_params.end)
580 .unwrap_or_else(current_time_rfc3339);
581 let lookback = params
582 .lookback
583 .or(form_params.lookback)
584 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
585
586 let mut fetched_labels = HashSet::new();
587 let _ = fetched_labels.insert(METRIC_NAME.to_string());
588
589 let mut merge_map = HashMap::new();
590 for query in queries {
591 let prom_query = PromQuery {
592 query,
593 start: start.clone(),
594 end: end.clone(),
595 step: DEFAULT_LOOKBACK_STRING.to_string(),
596 lookback: lookback.clone(),
597 alias: None,
598 };
599
600 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
601 handle_schema_err!(
602 retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
603 .await
604 );
605 }
606
607 fetched_labels.retain(|l| labels.contains(l));
609 let _ = labels.insert(METRIC_NAME.to_string());
610
611 let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
612 sorted_labels.sort();
613 let merge_map = merge_map
614 .into_iter()
615 .map(|(k, v)| (k, Value::from(v)))
616 .collect();
617 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
618 resp.resp_metrics = merge_map;
619 resp
620}
621
622async fn get_all_column_names(
624 catalog: &str,
625 schema: &str,
626 manager: &CatalogManagerRef,
627) -> std::result::Result<HashSet<String>, catalog::error::Error> {
628 let table_names = manager.table_names(catalog, schema, None).await?;
629
630 let mut labels = HashSet::new();
631 for table_name in table_names {
632 let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
633 continue;
634 };
635 for column in table.primary_key_columns() {
636 if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
637 && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
638 {
639 labels.insert(column.name);
640 }
641 }
642 }
643
644 Ok(labels)
645}
646
647async fn retrieve_series_from_query_result(
648 result: Result<Output>,
649 series: &mut Vec<HashMap<Column, String>>,
650 query_ctx: &QueryContext,
651 table_name: &str,
652 manager: &CatalogManagerRef,
653 metrics: &mut HashMap<String, u64>,
654) -> Result<()> {
655 let result = result?;
656
657 let table = manager
659 .table(
660 query_ctx.current_catalog(),
661 &query_ctx.current_schema(),
662 table_name,
663 Some(query_ctx),
664 )
665 .await
666 .context(CatalogSnafu)?
667 .with_context(|| TableNotFoundSnafu {
668 catalog: query_ctx.current_catalog(),
669 schema: query_ctx.current_schema(),
670 table: table_name,
671 })?;
672 let tag_columns = table
673 .primary_key_columns()
674 .map(|c| c.name)
675 .collect::<HashSet<_>>();
676
677 match result.data {
678 OutputData::RecordBatches(batches) => {
679 record_batches_to_series(batches, series, table_name, &tag_columns)
680 }
681 OutputData::Stream(stream) => {
682 let batches = RecordBatches::try_collect(stream)
683 .await
684 .context(CollectRecordbatchSnafu)?;
685 record_batches_to_series(batches, series, table_name, &tag_columns)
686 }
687 OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
688 reason: "expected data result, but got affected rows".to_string(),
689 location: Location::default(),
690 }),
691 }?;
692
693 if let Some(ref plan) = result.meta.plan {
694 collect_plan_metrics(plan, &mut [metrics]);
695 }
696 Ok(())
697}
698
699async fn retrieve_labels_name_from_query_result(
701 result: Result<Output>,
702 labels: &mut HashSet<String>,
703 metrics: &mut HashMap<String, u64>,
704) -> Result<()> {
705 let result = result?;
706 match result.data {
707 OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
708 OutputData::Stream(stream) => {
709 let batches = RecordBatches::try_collect(stream)
710 .await
711 .context(CollectRecordbatchSnafu)?;
712 record_batches_to_labels_name(batches, labels)
713 }
714 OutputData::AffectedRows(_) => UnexpectedResultSnafu {
715 reason: "expected data result, but got affected rows".to_string(),
716 }
717 .fail(),
718 }?;
719 if let Some(ref plan) = result.meta.plan {
720 collect_plan_metrics(plan, &mut [metrics]);
721 }
722 Ok(())
723}
724
725fn record_batches_to_series(
726 batches: RecordBatches,
727 series: &mut Vec<HashMap<Column, String>>,
728 table_name: &str,
729 tag_columns: &HashSet<String>,
730) -> Result<()> {
731 for batch in batches.iter() {
732 let projection = batch
734 .schema
735 .column_schemas()
736 .iter()
737 .enumerate()
738 .filter_map(|(idx, col)| {
739 if tag_columns.contains(&col.name) {
740 Some(idx)
741 } else {
742 None
743 }
744 })
745 .collect::<Vec<_>>();
746 let batch = batch
747 .try_project(&projection)
748 .context(CollectRecordbatchSnafu)?;
749
750 let mut writer = RowWriter::new(&batch.schema, table_name);
751 writer.write(batch, series)?;
752 }
753 Ok(())
754}
755
756struct RowWriter {
763 template: HashMap<Column, Option<String>>,
766 current: Option<HashMap<Column, Option<String>>>,
768}
769
770impl RowWriter {
771 fn new(schema: &SchemaRef, table: &str) -> Self {
772 let mut template = schema
773 .column_schemas()
774 .iter()
775 .map(|x| (x.name.as_str().into(), None))
776 .collect::<HashMap<Column, Option<String>>>();
777 template.insert("__name__".into(), Some(table.to_string()));
778 Self {
779 template,
780 current: None,
781 }
782 }
783
784 fn insert(&mut self, column: ColumnRef, value: impl ToString) {
785 let current = self.current.get_or_insert_with(|| self.template.clone());
786 match current.get_mut(&column as &dyn AsColumnRef) {
787 Some(x) => {
788 let _ = x.insert(value.to_string());
789 }
790 None => {
791 let _ = current.insert(column.0.into(), Some(value.to_string()));
792 }
793 }
794 }
795
796 fn insert_bytes(&mut self, column_schema: &ColumnSchema, bytes: &[u8]) -> Result<()> {
797 let column_name = column_schema.name.as_str().into();
798
799 if column_schema.data_type.is_json() {
800 let s = jsonb_to_string(bytes).context(ConvertScalarValueSnafu)?;
801 self.insert(column_name, s);
802 } else {
803 let hex = bytes
804 .iter()
805 .map(|b| format!("{b:02x}"))
806 .collect::<Vec<String>>()
807 .join("");
808 self.insert(column_name, hex);
809 }
810 Ok(())
811 }
812
813 fn finish(&mut self) -> HashMap<Column, String> {
814 let Some(current) = self.current.take() else {
815 return HashMap::new();
816 };
817 current
818 .into_iter()
819 .filter_map(|(k, v)| v.map(|v| (k, v)))
820 .collect()
821 }
822
823 fn write(
824 &mut self,
825 record_batch: RecordBatch,
826 series: &mut Vec<HashMap<Column, String>>,
827 ) -> Result<()> {
828 let schema = record_batch.schema.clone();
829 let record_batch = record_batch.into_df_record_batch();
830 for i in 0..record_batch.num_rows() {
831 for (j, array) in record_batch.columns().iter().enumerate() {
832 let column = schema.column_name_by_index(j).into();
833
834 if array.is_null(i) {
835 self.insert(column, "Null");
836 continue;
837 }
838
839 match array.data_type() {
840 DataType::Null => {
841 self.insert(column, "Null");
842 }
843 DataType::Boolean => {
844 let array = array.as_boolean();
845 let v = array.value(i);
846 self.insert(column, v);
847 }
848 DataType::UInt8 => {
849 let array = array.as_primitive::<UInt8Type>();
850 let v = array.value(i);
851 self.insert(column, v);
852 }
853 DataType::UInt16 => {
854 let array = array.as_primitive::<UInt16Type>();
855 let v = array.value(i);
856 self.insert(column, v);
857 }
858 DataType::UInt32 => {
859 let array = array.as_primitive::<UInt32Type>();
860 let v = array.value(i);
861 self.insert(column, v);
862 }
863 DataType::UInt64 => {
864 let array = array.as_primitive::<UInt64Type>();
865 let v = array.value(i);
866 self.insert(column, v);
867 }
868 DataType::Int8 => {
869 let array = array.as_primitive::<Int8Type>();
870 let v = array.value(i);
871 self.insert(column, v);
872 }
873 DataType::Int16 => {
874 let array = array.as_primitive::<Int16Type>();
875 let v = array.value(i);
876 self.insert(column, v);
877 }
878 DataType::Int32 => {
879 let array = array.as_primitive::<Int32Type>();
880 let v = array.value(i);
881 self.insert(column, v);
882 }
883 DataType::Int64 => {
884 let array = array.as_primitive::<Int64Type>();
885 let v = array.value(i);
886 self.insert(column, v);
887 }
888 DataType::Float32 => {
889 let array = array.as_primitive::<Float32Type>();
890 let v = array.value(i);
891 self.insert(column, v);
892 }
893 DataType::Float64 => {
894 let array = array.as_primitive::<Float64Type>();
895 let v = array.value(i);
896 self.insert(column, v);
897 }
898 DataType::Utf8 => {
899 let array = array.as_string::<i32>();
900 let v = array.value(i);
901 self.insert(column, v);
902 }
903 DataType::LargeUtf8 => {
904 let array = array.as_string::<i64>();
905 let v = array.value(i);
906 self.insert(column, v);
907 }
908 DataType::Utf8View => {
909 let array = array.as_string_view();
910 let v = array.value(i);
911 self.insert(column, v);
912 }
913 DataType::Binary => {
914 let array = array.as_binary::<i32>();
915 let v = array.value(i);
916 let column_schema = &schema.column_schemas()[j];
917 self.insert_bytes(column_schema, v)?;
918 }
919 DataType::LargeBinary => {
920 let array = array.as_binary::<i64>();
921 let v = array.value(i);
922 let column_schema = &schema.column_schemas()[j];
923 self.insert_bytes(column_schema, v)?;
924 }
925 DataType::BinaryView => {
926 let array = array.as_binary_view();
927 let v = array.value(i);
928 let column_schema = &schema.column_schemas()[j];
929 self.insert_bytes(column_schema, v)?;
930 }
931 DataType::Date32 => {
932 let array = array.as_primitive::<Date32Type>();
933 let v = Date::new(array.value(i));
934 self.insert(column, v);
935 }
936 DataType::Date64 => {
937 let array = array.as_primitive::<Date64Type>();
938 let v = Date::new((array.value(i) / 86_400_000) as i32);
942 self.insert(column, v);
943 }
944 DataType::Timestamp(_, _) => {
945 let v = datatypes::arrow_array::timestamp_array_value(array, i);
946 self.insert(column, v.to_iso8601_string());
947 }
948 DataType::Time32(_) | DataType::Time64(_) => {
949 let v = datatypes::arrow_array::time_array_value(array, i);
950 self.insert(column, v.to_iso8601_string());
951 }
952 DataType::Interval(interval_unit) => match interval_unit {
953 IntervalUnit::YearMonth => {
954 let array = array.as_primitive::<IntervalYearMonthType>();
955 let v: IntervalYearMonth = array.value(i).into();
956 self.insert(column, v.to_iso8601_string());
957 }
958 IntervalUnit::DayTime => {
959 let array = array.as_primitive::<IntervalDayTimeType>();
960 let v: IntervalDayTime = array.value(i).into();
961 self.insert(column, v.to_iso8601_string());
962 }
963 IntervalUnit::MonthDayNano => {
964 let array = array.as_primitive::<IntervalMonthDayNanoType>();
965 let v: IntervalMonthDayNano = array.value(i).into();
966 self.insert(column, v.to_iso8601_string());
967 }
968 },
969 DataType::Duration(_) => {
970 let d = datatypes::arrow_array::duration_array_value(array, i);
971 self.insert(column, d);
972 }
973 DataType::List(_) => {
974 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
975 self.insert(column, v);
976 }
977 DataType::Struct(_) => {
978 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
979 self.insert(column, v);
980 }
981 DataType::Decimal128(precision, scale) => {
982 let array = array.as_primitive::<Decimal128Type>();
983 let v = Decimal128::new(array.value(i), *precision, *scale);
984 self.insert(column, v);
985 }
986 _ => {
987 return NotSupportedSnafu {
988 feat: format!("convert {} to http value", array.data_type()),
989 }
990 .fail();
991 }
992 }
993 }
994
995 series.push(self.finish())
996 }
997 Ok(())
998 }
999}
1000
1001#[derive(Debug, Clone, Copy, PartialEq)]
1002struct ColumnRef<'a>(&'a str);
1003
1004impl<'a> From<&'a str> for ColumnRef<'a> {
1005 fn from(s: &'a str) -> Self {
1006 Self(s)
1007 }
1008}
1009
1010trait AsColumnRef {
1011 fn as_ref(&self) -> ColumnRef<'_>;
1012}
1013
1014impl AsColumnRef for Column {
1015 fn as_ref(&self) -> ColumnRef<'_> {
1016 self.0.as_str().into()
1017 }
1018}
1019
1020impl AsColumnRef for ColumnRef<'_> {
1021 fn as_ref(&self) -> ColumnRef<'_> {
1022 *self
1023 }
1024}
1025
1026impl<'a> PartialEq for dyn AsColumnRef + 'a {
1027 fn eq(&self, other: &Self) -> bool {
1028 self.as_ref() == other.as_ref()
1029 }
1030}
1031
1032impl<'a> Eq for dyn AsColumnRef + 'a {}
1033
1034impl<'a> Hash for dyn AsColumnRef + 'a {
1035 fn hash<H: Hasher>(&self, state: &mut H) {
1036 self.as_ref().0.hash(state);
1037 }
1038}
1039
1040impl<'a> Borrow<dyn AsColumnRef + 'a> for Column {
1041 fn borrow(&self) -> &(dyn AsColumnRef + 'a) {
1042 self
1043 }
1044}
1045
1046fn record_batches_to_labels_name(
1048 batches: RecordBatches,
1049 labels: &mut HashSet<String>,
1050) -> Result<()> {
1051 let mut column_indices = Vec::new();
1052 let mut field_column_indices = Vec::new();
1053 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
1054 if let ConcreteDataType::Float64(_) = column.data_type {
1055 field_column_indices.push(i);
1056 }
1057 column_indices.push(i);
1058 }
1059
1060 if field_column_indices.is_empty() {
1061 return Err(Error::Internal {
1062 err_msg: "no value column found".to_string(),
1063 });
1064 }
1065
1066 for batch in batches.iter() {
1067 let names = column_indices
1068 .iter()
1069 .map(|c| batches.schema().column_name_by_index(*c).to_string())
1070 .collect::<Vec<_>>();
1071
1072 let field_columns = field_column_indices
1073 .iter()
1074 .map(|i| {
1075 let column = batch.column(*i);
1076 column.as_primitive::<Float64Type>()
1077 })
1078 .collect::<Vec<_>>();
1079
1080 for row_index in 0..batch.num_rows() {
1081 if field_columns.iter().all(|c| c.is_null(row_index)) {
1083 continue;
1084 }
1085
1086 names.iter().for_each(|name| {
1088 let _ = labels.insert(name.clone());
1089 });
1090 return Ok(());
1091 }
1092 }
1093 Ok(())
1094}
1095
1096pub(crate) fn retrieve_metric_name_and_result_type(
1097 promql: &str,
1098) -> Result<(Option<String>, ValueType)> {
1099 let promql_expr = promql_parser::parser::parse(promql)
1100 .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
1101 let metric_name = promql_expr_to_metric_name(&promql_expr);
1102 let result_type = promql_expr.value_type();
1103
1104 Ok((metric_name, result_type))
1105}
1106
1107pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
1110 if let Some(db) = db {
1111 parse_catalog_and_schema_from_db_string(db)
1112 } else {
1113 (
1114 ctx.current_catalog().to_string(),
1115 ctx.current_schema().clone(),
1116 )
1117 }
1118}
1119
1120pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
1122 if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
1123 ctx.set_current_catalog(catalog);
1124 ctx.set_current_schema(schema);
1125 }
1126}
1127
1128fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
1129 let mut metric_names = HashSet::new();
1130 collect_metric_names(expr, &mut metric_names);
1131
1132 if metric_names.len() == 1 {
1134 metric_names.into_iter().next()
1135 } else {
1136 None
1137 }
1138}
1139
1140fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
1142 match expr {
1143 PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
1144 match modifier {
1145 Some(LabelModifier::Include(labels)) => {
1146 if !labels.labels.contains(&METRIC_NAME.to_string()) {
1147 metric_names.clear();
1148 return;
1149 }
1150 }
1151 Some(LabelModifier::Exclude(labels)) => {
1152 if labels.labels.contains(&METRIC_NAME.to_string()) {
1153 metric_names.clear();
1154 return;
1155 }
1156 }
1157 _ => {}
1158 }
1159 collect_metric_names(expr, metric_names)
1160 }
1161 PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
1162 PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
1163 if matches!(
1164 op.id(),
1165 token::T_LAND | token::T_LOR | token::T_LUNLESS ) {
1169 collect_metric_names(lhs, metric_names)
1170 } else {
1171 metric_names.clear()
1172 }
1173 }
1174 PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
1175 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
1176 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1177 if let Some(name) = name {
1178 metric_names.insert(name.clone());
1179 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1180 metric_names.insert(matcher.value);
1181 }
1182 }
1183 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1184 let VectorSelector { name, matchers, .. } = vs;
1185 if let Some(name) = name {
1186 metric_names.insert(name.clone());
1187 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1188 metric_names.insert(matcher.value);
1189 }
1190 }
1191 PromqlExpr::Call(Call { args, .. }) => {
1192 args.args
1193 .iter()
1194 .for_each(|e| collect_metric_names(e, metric_names));
1195 }
1196 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1197 }
1198}
1199
1200fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
1201where
1202 F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
1203{
1204 match expr {
1205 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1206 PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
1207 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1208 find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
1209 }
1210 PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
1211 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1212 PromqlExpr::NumberLiteral(_) => None,
1213 PromqlExpr::StringLiteral(_) => None,
1214 PromqlExpr::Extension(_) => None,
1215 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
1216 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1217 let VectorSelector { name, matchers, .. } = vs;
1218
1219 f(name, matchers)
1220 }
1221 PromqlExpr::Call(Call { args, .. }) => args
1222 .args
1223 .iter()
1224 .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
1225 }
1226}
1227
1228fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
1230 find_metric_name_and_matchers(expr, |name, matchers| {
1231 if name.is_some() {
1233 return None;
1234 }
1235
1236 Some(matchers.find_matchers(METRIC_NAME))
1238 })
1239 .map(|matchers| {
1240 matchers
1241 .into_iter()
1242 .filter(|m| !matches!(m.op, MatchOp::Equal))
1243 .collect::<Vec<_>>()
1244 })
1245}
1246
1247fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
1250 match expr {
1251 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
1252 update_metric_name_matcher(expr, metric_name)
1253 }
1254 PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1255 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1256 update_metric_name_matcher(lhs, metric_name);
1257 update_metric_name_matcher(rhs, metric_name);
1258 }
1259 PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1260 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
1261 update_metric_name_matcher(expr, metric_name)
1262 }
1263 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1264 if name.is_some() {
1265 return;
1266 }
1267
1268 for m in &mut matchers.matchers {
1269 if m.name == METRIC_NAME {
1270 m.op = MatchOp::Equal;
1271 m.value = metric_name.to_string();
1272 }
1273 }
1274 }
1275 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1276 let VectorSelector { name, matchers, .. } = vs;
1277 if name.is_some() {
1278 return;
1279 }
1280
1281 for m in &mut matchers.matchers {
1282 if m.name == METRIC_NAME {
1283 m.op = MatchOp::Equal;
1284 m.value = metric_name.to_string();
1285 }
1286 }
1287 }
1288 PromqlExpr::Call(Call { args, .. }) => {
1289 args.args.iter_mut().for_each(|e| {
1290 update_metric_name_matcher(e, metric_name);
1291 });
1292 }
1293 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1294 }
1295}
1296
1297#[derive(Debug, Default, Serialize, Deserialize)]
1298pub struct LabelValueQuery {
1299 start: Option<String>,
1300 end: Option<String>,
1301 lookback: Option<String>,
1302 #[serde(flatten)]
1303 matches: Matches,
1304 db: Option<String>,
1305 limit: Option<usize>,
1306}
1307
1308#[axum_macros::debug_handler]
1309#[tracing::instrument(
1310 skip_all,
1311 fields(protocol = "prometheus", request_type = "label_values_query")
1312)]
1313pub async fn label_values_query(
1314 State(handler): State<PrometheusHandlerRef>,
1315 Path(label_name): Path<String>,
1316 Extension(mut query_ctx): Extension<QueryContext>,
1317 Query(params): Query<LabelValueQuery>,
1318) -> PrometheusJsonResponse {
1319 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
1320 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1321 let query_ctx = Arc::new(query_ctx);
1322
1323 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1324 .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1325 .start_timer();
1326
1327 if label_name == METRIC_NAME_LABEL {
1328 let catalog_manager = handler.catalog_manager();
1329
1330 let mut table_names = try_call_return_response!(
1331 retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1332 );
1333
1334 truncate_results(&mut table_names, params.limit);
1335 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1336 } else if label_name == FIELD_NAME_LABEL {
1337 let field_columns = handle_schema_err!(
1338 retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1339 )
1340 .unwrap_or_default();
1341 let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1342 field_columns.sort_unstable();
1343 truncate_results(&mut field_columns, params.limit);
1344 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1345 } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
1346 let catalog_manager = handler.catalog_manager();
1347
1348 let mut schema_names = try_call_return_response!(
1349 retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1350 );
1351 truncate_results(&mut schema_names, params.limit);
1352 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1353 }
1354
1355 let queries = params.matches.0;
1356 if queries.is_empty() {
1357 return PrometheusJsonResponse::error(
1358 StatusCode::InvalidArguments,
1359 "match[] parameter is required",
1360 );
1361 }
1362
1363 let start = params.start.unwrap_or_else(yesterday_rfc3339);
1364 let end = params.end.unwrap_or_else(current_time_rfc3339);
1365 let mut label_values = HashSet::new();
1366
1367 let start = try_call_return_response!(
1368 QueryLanguageParser::parse_promql_timestamp(&start)
1369 .context(ParseTimestampSnafu { timestamp: &start })
1370 );
1371 let end = try_call_return_response!(
1372 QueryLanguageParser::parse_promql_timestamp(&end)
1373 .context(ParseTimestampSnafu { timestamp: &end })
1374 );
1375
1376 for query in queries {
1377 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1378 let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1379 return PrometheusJsonResponse::error(
1380 StatusCode::InvalidArguments,
1381 "expected vector selector",
1382 );
1383 };
1384 let Some(name) = take_metric_name(&mut vector_selector) else {
1385 return PrometheusJsonResponse::error(
1386 StatusCode::InvalidArguments,
1387 "expected metric name",
1388 );
1389 };
1390 let VectorSelector { matchers, .. } = vector_selector;
1391 let matchers = matchers.matchers;
1393 let result = handler
1394 .query_label_values(name, label_name.clone(), matchers, start, end, &query_ctx)
1395 .await;
1396 if let Some(result) = handle_schema_err!(result) {
1397 label_values.extend(result.into_iter());
1398 }
1399 }
1400
1401 let mut label_values: Vec<_> = label_values.into_iter().collect();
1402 label_values.sort_unstable();
1403 truncate_results(&mut label_values, params.limit);
1404
1405 PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1406}
1407
1408fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1409 if let Some(limit) = limit
1410 && limit > 0
1411 && label_values.len() >= limit
1412 {
1413 label_values.truncate(limit);
1414 }
1415}
1416
1417fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1420 if let Some(name) = selector.name.take() {
1421 return Some(name);
1422 }
1423
1424 let (pos, matcher) = selector
1425 .matchers
1426 .matchers
1427 .iter()
1428 .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1429 let name = matcher.value.clone();
1430 selector.matchers.matchers.remove(pos);
1432
1433 Some(name)
1434}
1435
1436async fn retrieve_table_names(
1437 query_ctx: &QueryContext,
1438 catalog_manager: CatalogManagerRef,
1439 matches: Vec<String>,
1440) -> Result<Vec<String>> {
1441 let catalog = query_ctx.current_catalog();
1442 let schema = query_ctx.current_schema();
1443
1444 let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1445 let mut table_names = Vec::new();
1446
1447 let name_matcher = matches
1449 .first()
1450 .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1451 .and_then(|expr| {
1452 if let PromqlExpr::VectorSelector(vector_selector) = expr {
1453 let matchers = vector_selector.matchers.matchers;
1454 for matcher in matchers {
1455 if matcher.name == METRIC_NAME_LABEL {
1456 return Some(matcher);
1457 }
1458 }
1459
1460 None
1461 } else {
1462 None
1463 }
1464 });
1465
1466 while let Some(table) = tables_stream.next().await {
1467 let table = table.context(CatalogSnafu)?;
1468 if !table
1469 .table_info()
1470 .meta
1471 .options
1472 .extra_options
1473 .contains_key(LOGICAL_TABLE_METADATA_KEY)
1474 {
1475 continue;
1477 }
1478
1479 let table_name = &table.table_info().name;
1480
1481 if let Some(matcher) = &name_matcher {
1482 match &matcher.op {
1483 MatchOp::Equal => {
1484 if table_name == &matcher.value {
1485 table_names.push(table_name.clone());
1486 }
1487 }
1488 MatchOp::Re(reg) => {
1489 if reg.is_match(table_name) {
1490 table_names.push(table_name.clone());
1491 }
1492 }
1493 _ => {
1494 table_names.push(table_name.clone());
1497 }
1498 }
1499 } else {
1500 table_names.push(table_name.clone());
1501 }
1502 }
1503
1504 table_names.sort_unstable();
1505 Ok(table_names)
1506}
1507
1508async fn retrieve_field_names(
1509 query_ctx: &QueryContext,
1510 manager: CatalogManagerRef,
1511 matches: Vec<String>,
1512) -> Result<HashSet<String>> {
1513 let mut field_columns = HashSet::new();
1514 let catalog = query_ctx.current_catalog();
1515 let schema = query_ctx.current_schema();
1516
1517 if matches.is_empty() {
1518 while let Some(table) = manager
1520 .tables(catalog, &schema, Some(query_ctx))
1521 .next()
1522 .await
1523 {
1524 let table = table.context(CatalogSnafu)?;
1525 for column in table.field_columns() {
1526 field_columns.insert(column.name);
1527 }
1528 }
1529 return Ok(field_columns);
1530 }
1531
1532 for table_name in matches {
1533 let table = manager
1534 .table(catalog, &schema, &table_name, Some(query_ctx))
1535 .await
1536 .context(CatalogSnafu)?
1537 .with_context(|| TableNotFoundSnafu {
1538 catalog: catalog.to_string(),
1539 schema: schema.clone(),
1540 table: table_name.clone(),
1541 })?;
1542
1543 for column in table.field_columns() {
1544 field_columns.insert(column.name);
1545 }
1546 }
1547 Ok(field_columns)
1548}
1549
1550async fn retrieve_schema_names(
1551 query_ctx: &QueryContext,
1552 catalog_manager: CatalogManagerRef,
1553 matches: Vec<String>,
1554) -> Result<Vec<String>> {
1555 let mut schemas = Vec::new();
1556 let catalog = query_ctx.current_catalog();
1557
1558 let candidate_schemas = catalog_manager
1559 .schema_names(catalog, Some(query_ctx))
1560 .await
1561 .context(CatalogSnafu)?;
1562
1563 for schema in candidate_schemas {
1564 let mut found = true;
1565 for match_item in &matches {
1566 if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1567 let exists = catalog_manager
1568 .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1569 .await
1570 .context(CatalogSnafu)?;
1571 if !exists {
1572 found = false;
1573 break;
1574 }
1575 }
1576 }
1577
1578 if found {
1579 schemas.push(schema);
1580 }
1581 }
1582
1583 schemas.sort_unstable();
1584
1585 Ok(schemas)
1586}
1587
1588fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1593 let promql_expr = promql_parser::parser::parse(query).ok()?;
1594 promql_expr_to_metric_name(&promql_expr)
1595}
1596
1597#[derive(Debug, Default, Serialize, Deserialize)]
1598pub struct SeriesQuery {
1599 start: Option<String>,
1600 end: Option<String>,
1601 lookback: Option<String>,
1602 #[serde(flatten)]
1603 matches: Matches,
1604 db: Option<String>,
1605}
1606
1607#[axum_macros::debug_handler]
1608#[tracing::instrument(
1609 skip_all,
1610 fields(protocol = "prometheus", request_type = "series_query")
1611)]
1612pub async fn series_query(
1613 State(handler): State<PrometheusHandlerRef>,
1614 Query(params): Query<SeriesQuery>,
1615 Extension(mut query_ctx): Extension<QueryContext>,
1616 Form(form_params): Form<SeriesQuery>,
1617) -> PrometheusJsonResponse {
1618 let mut queries: Vec<String> = params.matches.0;
1619 if queries.is_empty() {
1620 queries = form_params.matches.0;
1621 }
1622 if queries.is_empty() {
1623 return PrometheusJsonResponse::error(
1624 StatusCode::Unsupported,
1625 "match[] parameter is required",
1626 );
1627 }
1628 let start = params
1629 .start
1630 .or(form_params.start)
1631 .unwrap_or_else(yesterday_rfc3339);
1632 let end = params
1633 .end
1634 .or(form_params.end)
1635 .unwrap_or_else(current_time_rfc3339);
1636 let lookback = params
1637 .lookback
1638 .or(form_params.lookback)
1639 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1640
1641 if let Some(db) = ¶ms.db {
1643 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1644 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1645 }
1646 let query_ctx = Arc::new(query_ctx);
1647
1648 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1649 .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1650 .start_timer();
1651
1652 let mut series = Vec::new();
1653 let mut merge_map = HashMap::new();
1654 for query in queries {
1655 let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1656 let prom_query = PromQuery {
1657 query,
1658 start: start.clone(),
1659 end: end.clone(),
1660 step: DEFAULT_LOOKBACK_STRING.to_string(),
1662 lookback: lookback.clone(),
1663 alias: None,
1664 };
1665 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1666
1667 handle_schema_err!(
1668 retrieve_series_from_query_result(
1669 result,
1670 &mut series,
1671 &query_ctx,
1672 &table_name,
1673 &handler.catalog_manager(),
1674 &mut merge_map,
1675 )
1676 .await
1677 );
1678 }
1679 let merge_map = merge_map
1680 .into_iter()
1681 .map(|(k, v)| (k, Value::from(v)))
1682 .collect();
1683 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1684 resp.resp_metrics = merge_map;
1685 resp
1686}
1687
1688#[derive(Debug, Default, Serialize, Deserialize)]
1689pub struct ParseQuery {
1690 query: Option<String>,
1691 db: Option<String>,
1692}
1693
1694#[axum_macros::debug_handler]
1695#[tracing::instrument(
1696 skip_all,
1697 fields(protocol = "prometheus", request_type = "parse_query")
1698)]
1699pub async fn parse_query(
1700 State(_handler): State<PrometheusHandlerRef>,
1701 Query(params): Query<ParseQuery>,
1702 Extension(_query_ctx): Extension<QueryContext>,
1703 Form(form_params): Form<ParseQuery>,
1704) -> PrometheusJsonResponse {
1705 if let Some(query) = params.query.or(form_params.query) {
1706 let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1707 PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1708 } else {
1709 PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1710 }
1711}
1712
1713#[cfg(test)]
1714mod tests {
1715 use promql_parser::parser::value::ValueType;
1716
1717 use super::*;
1718
1719 struct TestCase {
1720 name: &'static str,
1721 promql: &'static str,
1722 expected_metric: Option<&'static str>,
1723 expected_type: ValueType,
1724 should_error: bool,
1725 }
1726
1727 #[test]
1728 fn test_retrieve_metric_name_and_result_type() {
1729 let test_cases = &[
1730 TestCase {
1732 name: "simple metric",
1733 promql: "cpu_usage",
1734 expected_metric: Some("cpu_usage"),
1735 expected_type: ValueType::Vector,
1736 should_error: false,
1737 },
1738 TestCase {
1739 name: "metric with selector",
1740 promql: r#"cpu_usage{instance="localhost"}"#,
1741 expected_metric: Some("cpu_usage"),
1742 expected_type: ValueType::Vector,
1743 should_error: false,
1744 },
1745 TestCase {
1746 name: "metric with range selector",
1747 promql: "cpu_usage[5m]",
1748 expected_metric: Some("cpu_usage"),
1749 expected_type: ValueType::Matrix,
1750 should_error: false,
1751 },
1752 TestCase {
1753 name: "metric with __name__ matcher",
1754 promql: r#"{__name__="cpu_usage"}"#,
1755 expected_metric: Some("cpu_usage"),
1756 expected_type: ValueType::Vector,
1757 should_error: false,
1758 },
1759 TestCase {
1760 name: "metric with unary operator",
1761 promql: "-cpu_usage",
1762 expected_metric: None,
1763 expected_type: ValueType::Vector,
1764 should_error: false,
1765 },
1766 TestCase {
1768 name: "metric with aggregation",
1769 promql: "sum(cpu_usage)",
1770 expected_metric: Some("cpu_usage"),
1771 expected_type: ValueType::Vector,
1772 should_error: false,
1773 },
1774 TestCase {
1775 name: "complex aggregation",
1776 promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1777 expected_metric: None,
1778 expected_type: ValueType::Vector,
1779 should_error: false,
1780 },
1781 TestCase {
1782 name: "complex aggregation",
1783 promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1784 expected_metric: Some("cpu_usage"),
1785 expected_type: ValueType::Vector,
1786 should_error: false,
1787 },
1788 TestCase {
1789 name: "complex aggregation",
1790 promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1791 expected_metric: Some("cpu_usage"),
1792 expected_type: ValueType::Vector,
1793 should_error: false,
1794 },
1795 TestCase {
1797 name: "same metric addition",
1798 promql: "cpu_usage + cpu_usage",
1799 expected_metric: None,
1800 expected_type: ValueType::Vector,
1801 should_error: false,
1802 },
1803 TestCase {
1804 name: "metric with scalar addition",
1805 promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1806 expected_metric: None,
1807 expected_type: ValueType::Vector,
1808 should_error: false,
1809 },
1810 TestCase {
1812 name: "different metrics addition",
1813 promql: "cpu_usage + memory_usage",
1814 expected_metric: None,
1815 expected_type: ValueType::Vector,
1816 should_error: false,
1817 },
1818 TestCase {
1819 name: "different metrics subtraction",
1820 promql: "network_in - network_out",
1821 expected_metric: None,
1822 expected_type: ValueType::Vector,
1823 should_error: false,
1824 },
1825 TestCase {
1827 name: "unless with different metrics",
1828 promql: "cpu_usage unless memory_usage",
1829 expected_metric: Some("cpu_usage"),
1830 expected_type: ValueType::Vector,
1831 should_error: false,
1832 },
1833 TestCase {
1834 name: "unless with same metric",
1835 promql: "cpu_usage unless cpu_usage",
1836 expected_metric: Some("cpu_usage"),
1837 expected_type: ValueType::Vector,
1838 should_error: false,
1839 },
1840 TestCase {
1842 name: "basic subquery",
1843 promql: "cpu_usage[5m:1m]",
1844 expected_metric: Some("cpu_usage"),
1845 expected_type: ValueType::Matrix,
1846 should_error: false,
1847 },
1848 TestCase {
1849 name: "subquery with multiple metrics",
1850 promql: "(cpu_usage + memory_usage)[5m:1m]",
1851 expected_metric: None,
1852 expected_type: ValueType::Matrix,
1853 should_error: false,
1854 },
1855 TestCase {
1857 name: "scalar value",
1858 promql: "42",
1859 expected_metric: None,
1860 expected_type: ValueType::Scalar,
1861 should_error: false,
1862 },
1863 TestCase {
1864 name: "string literal",
1865 promql: r#""hello world""#,
1866 expected_metric: None,
1867 expected_type: ValueType::String,
1868 should_error: false,
1869 },
1870 TestCase {
1872 name: "invalid syntax",
1873 promql: "cpu_usage{invalid=",
1874 expected_metric: None,
1875 expected_type: ValueType::Vector,
1876 should_error: true,
1877 },
1878 TestCase {
1879 name: "empty query",
1880 promql: "",
1881 expected_metric: None,
1882 expected_type: ValueType::Vector,
1883 should_error: true,
1884 },
1885 TestCase {
1886 name: "malformed brackets",
1887 promql: "cpu_usage[5m",
1888 expected_metric: None,
1889 expected_type: ValueType::Vector,
1890 should_error: true,
1891 },
1892 ];
1893
1894 for test_case in test_cases {
1895 let result = retrieve_metric_name_and_result_type(test_case.promql);
1896
1897 if test_case.should_error {
1898 assert!(
1899 result.is_err(),
1900 "Test '{}' should have failed but succeeded with: {:?}",
1901 test_case.name,
1902 result
1903 );
1904 } else {
1905 let (metric_name, value_type) = result.unwrap_or_else(|e| {
1906 panic!(
1907 "Test '{}' should have succeeded but failed with error: {}",
1908 test_case.name, e
1909 )
1910 });
1911
1912 let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1913 assert_eq!(
1914 metric_name, expected_metric_name,
1915 "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1916 test_case.name, expected_metric_name, metric_name
1917 );
1918
1919 assert_eq!(
1920 value_type, test_case.expected_type,
1921 "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1922 test_case.name, test_case.expected_type, value_type
1923 );
1924 }
1925 }
1926 }
1927}