1use std::borrow::Borrow;
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::hash::{Hash, Hasher};
20use std::sync::Arc;
21
22use arrow::array::AsArray;
23use arrow::datatypes::{
24 Date32Type, Date64Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType,
25 DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type,
26 Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
27 Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
28 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
29 TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
30};
31use arrow_schema::{DataType, IntervalUnit, TimeUnit};
32use axum::extract::{Path, Query, State};
33use axum::{Extension, Form};
34use catalog::CatalogManagerRef;
35use common_catalog::parse_catalog_and_schema_from_db_string;
36use common_decimal::Decimal128;
37use common_error::ext::ErrorExt;
38use common_error::status_code::StatusCode;
39use common_query::{Output, OutputData};
40use common_recordbatch::{RecordBatch, RecordBatches};
41use common_telemetry::{debug, tracing};
42use common_time::time::Time;
43use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
44use common_time::{
45 Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp,
46};
47use common_version::OwnedBuildInfo;
48use datafusion_common::ScalarValue;
49use datatypes::prelude::ConcreteDataType;
50use datatypes::scalars::ScalarVector;
51use datatypes::schema::{ColumnSchema, SchemaRef};
52use datatypes::types::jsonb_to_string;
53use datatypes::vectors::Float64Vector;
54use futures::StreamExt;
55use futures::future::join_all;
56use itertools::Itertools;
57use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
58use promql_parser::parser::token::{self};
59use promql_parser::parser::value::ValueType;
60use promql_parser::parser::{
61 AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
62 SubqueryExpr, UnaryExpr, VectorSelector,
63};
64use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
65use query::promql::planner::normalize_matcher;
66use serde::de::{self, MapAccess, Visitor};
67use serde::{Deserialize, Serialize};
68use serde_json::Value;
69use session::context::{QueryContext, QueryContextRef};
70use snafu::{Location, OptionExt, ResultExt};
71use store_api::metric_engine_consts::{
72 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
73};
74
75pub use super::result::prometheus_resp::PrometheusJsonResponse;
76use crate::error::{
77 CatalogSnafu, CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error,
78 InvalidQuerySnafu, NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu,
79 UnexpectedResultSnafu,
80};
81use crate::http::header::collect_plan_metrics;
82use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
83use crate::prometheus_handler::PrometheusHandlerRef;
84
85#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
87pub struct PromSeriesVector {
88 pub metric: BTreeMap<String, String>,
89 #[serde(skip_serializing_if = "Option::is_none")]
90 pub value: Option<(f64, String)>,
91}
92
93#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
95pub struct PromSeriesMatrix {
96 pub metric: BTreeMap<String, String>,
97 pub values: Vec<(f64, String)>,
98}
99
100#[derive(Debug, Serialize, Deserialize, PartialEq)]
102#[serde(untagged)]
103pub enum PromQueryResult {
104 Matrix(Vec<PromSeriesMatrix>),
105 Vector(Vec<PromSeriesVector>),
106 Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
107 String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
108}
109
110impl Default for PromQueryResult {
111 fn default() -> Self {
112 PromQueryResult::Matrix(Default::default())
113 }
114}
115
116#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
117pub struct PromData {
118 #[serde(rename = "resultType")]
119 pub result_type: String,
120 pub result: PromQueryResult,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
126pub struct Column(Arc<String>);
127
128impl From<&str> for Column {
129 fn from(s: &str) -> Self {
130 Self(Arc::new(s.to_string()))
131 }
132}
133
134#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
135#[serde(untagged)]
136pub enum PrometheusResponse {
137 PromData(PromData),
138 Labels(Vec<String>),
139 Series(Vec<HashMap<Column, String>>),
140 LabelValues(Vec<String>),
141 FormatQuery(String),
142 BuildInfo(OwnedBuildInfo),
143 #[serde(skip_deserializing)]
144 ParseResult(promql_parser::parser::Expr),
145 #[default]
146 None,
147}
148
149impl PrometheusResponse {
150 fn append(&mut self, other: PrometheusResponse) {
154 match (self, other) {
155 (
156 PrometheusResponse::PromData(PromData {
157 result: PromQueryResult::Matrix(lhs),
158 ..
159 }),
160 PrometheusResponse::PromData(PromData {
161 result: PromQueryResult::Matrix(rhs),
162 ..
163 }),
164 ) => {
165 lhs.extend(rhs);
166 }
167
168 (
169 PrometheusResponse::PromData(PromData {
170 result: PromQueryResult::Vector(lhs),
171 ..
172 }),
173 PrometheusResponse::PromData(PromData {
174 result: PromQueryResult::Vector(rhs),
175 ..
176 }),
177 ) => {
178 lhs.extend(rhs);
179 }
180 _ => {
181 }
183 }
184 }
185
186 pub fn is_none(&self) -> bool {
187 matches!(self, PrometheusResponse::None)
188 }
189}
190
191#[derive(Debug, Default, Serialize, Deserialize)]
192pub struct FormatQuery {
193 query: Option<String>,
194}
195
196#[axum_macros::debug_handler]
197#[tracing::instrument(
198 skip_all,
199 fields(protocol = "prometheus", request_type = "format_query")
200)]
201pub async fn format_query(
202 State(_handler): State<PrometheusHandlerRef>,
203 Query(params): Query<InstantQuery>,
204 Extension(_query_ctx): Extension<QueryContext>,
205 Form(form_params): Form<InstantQuery>,
206) -> PrometheusJsonResponse {
207 let query = params.query.or(form_params.query).unwrap_or_default();
208 match promql_parser::parser::parse(&query) {
209 Ok(expr) => {
210 let pretty = expr.prettify();
211 PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
212 }
213 Err(reason) => {
214 let err = InvalidQuerySnafu { reason }.build();
215 PrometheusJsonResponse::error(err.status_code(), err.output_msg())
216 }
217 }
218}
219
220#[derive(Debug, Default, Serialize, Deserialize)]
221pub struct BuildInfoQuery {}
222
223#[axum_macros::debug_handler]
224#[tracing::instrument(
225 skip_all,
226 fields(protocol = "prometheus", request_type = "build_info_query")
227)]
228pub async fn build_info_query() -> PrometheusJsonResponse {
229 let build_info = common_version::build_info().clone();
230 PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
231}
232
233#[derive(Debug, Default, Serialize, Deserialize)]
234pub struct InstantQuery {
235 query: Option<String>,
236 lookback: Option<String>,
237 time: Option<String>,
238 timeout: Option<String>,
239 db: Option<String>,
240}
241
242macro_rules! try_call_return_response {
245 ($handle: expr) => {
246 match $handle {
247 Ok(res) => res,
248 Err(err) => {
249 let msg = err.to_string();
250 return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
251 }
252 }
253 };
254}
255
256#[axum_macros::debug_handler]
257#[tracing::instrument(
258 skip_all,
259 fields(protocol = "prometheus", request_type = "instant_query")
260)]
261pub async fn instant_query(
262 State(handler): State<PrometheusHandlerRef>,
263 Query(params): Query<InstantQuery>,
264 Extension(mut query_ctx): Extension<QueryContext>,
265 Form(form_params): Form<InstantQuery>,
266) -> PrometheusJsonResponse {
267 let time = params
269 .time
270 .or(form_params.time)
271 .unwrap_or_else(current_time_rfc3339);
272 let prom_query = PromQuery {
273 query: params.query.or(form_params.query).unwrap_or_default(),
274 start: time.clone(),
275 end: time,
276 step: "1s".to_string(),
277 lookback: params
278 .lookback
279 .or(form_params.lookback)
280 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
281 alias: None,
282 };
283
284 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
285
286 if let Some(db) = ¶ms.db {
288 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
289 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
290 }
291 let query_ctx = Arc::new(query_ctx);
292
293 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
294 .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
295 .start_timer();
296
297 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
298 && !name_matchers.is_empty()
299 {
300 debug!("Find metric name matchers: {:?}", name_matchers);
301
302 let metric_names =
303 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
304
305 debug!("Find metric names: {:?}", metric_names);
306
307 if metric_names.is_empty() {
308 let result_type = promql_expr.value_type();
309
310 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
311 result_type: result_type.to_string(),
312 ..Default::default()
313 }));
314 }
315
316 let responses = join_all(metric_names.into_iter().map(|metric| {
317 let mut prom_query = prom_query.clone();
318 let mut promql_expr = promql_expr.clone();
319 let query_ctx = query_ctx.clone();
320 let handler = handler.clone();
321
322 async move {
323 update_metric_name_matcher(&mut promql_expr, &metric);
324 let new_query = promql_expr.to_string();
325 debug!(
326 "Updated promql, before: {}, after: {}",
327 &prom_query.query, new_query
328 );
329 prom_query.query = new_query;
330
331 do_instant_query(&handler, &prom_query, query_ctx).await
332 }
333 }))
334 .await;
335
336 responses
337 .into_iter()
338 .reduce(|mut acc, resp| {
339 acc.data.append(resp.data);
340 acc
341 })
342 .unwrap()
343 } else {
344 do_instant_query(&handler, &prom_query, query_ctx).await
345 }
346}
347
348async fn do_instant_query(
350 handler: &PrometheusHandlerRef,
351 prom_query: &PromQuery,
352 query_ctx: QueryContextRef,
353) -> PrometheusJsonResponse {
354 let result = handler.do_query(prom_query, query_ctx).await;
355 let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
356 Ok((metric_name, result_type)) => (metric_name, result_type),
357 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
358 };
359 PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
360}
361
362#[derive(Debug, Default, Serialize, Deserialize)]
363pub struct RangeQuery {
364 query: Option<String>,
365 start: Option<String>,
366 end: Option<String>,
367 step: Option<String>,
368 lookback: Option<String>,
369 timeout: Option<String>,
370 db: Option<String>,
371}
372
373#[axum_macros::debug_handler]
374#[tracing::instrument(
375 skip_all,
376 fields(protocol = "prometheus", request_type = "range_query")
377)]
378pub async fn range_query(
379 State(handler): State<PrometheusHandlerRef>,
380 Query(params): Query<RangeQuery>,
381 Extension(mut query_ctx): Extension<QueryContext>,
382 Form(form_params): Form<RangeQuery>,
383) -> PrometheusJsonResponse {
384 let prom_query = PromQuery {
385 query: params.query.or(form_params.query).unwrap_or_default(),
386 start: params.start.or(form_params.start).unwrap_or_default(),
387 end: params.end.or(form_params.end).unwrap_or_default(),
388 step: params.step.or(form_params.step).unwrap_or_default(),
389 lookback: params
390 .lookback
391 .or(form_params.lookback)
392 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
393 alias: None,
394 };
395
396 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
397
398 if let Some(db) = ¶ms.db {
400 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
401 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
402 }
403 let query_ctx = Arc::new(query_ctx);
404 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
405 .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
406 .start_timer();
407
408 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
409 && !name_matchers.is_empty()
410 {
411 debug!("Find metric name matchers: {:?}", name_matchers);
412
413 let metric_names =
414 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
415
416 debug!("Find metric names: {:?}", metric_names);
417
418 if metric_names.is_empty() {
419 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
420 result_type: ValueType::Matrix.to_string(),
421 ..Default::default()
422 }));
423 }
424
425 let responses = join_all(metric_names.into_iter().map(|metric| {
426 let mut prom_query = prom_query.clone();
427 let mut promql_expr = promql_expr.clone();
428 let query_ctx = query_ctx.clone();
429 let handler = handler.clone();
430
431 async move {
432 update_metric_name_matcher(&mut promql_expr, &metric);
433 let new_query = promql_expr.to_string();
434 debug!(
435 "Updated promql, before: {}, after: {}",
436 &prom_query.query, new_query
437 );
438 prom_query.query = new_query;
439
440 do_range_query(&handler, &prom_query, query_ctx).await
441 }
442 }))
443 .await;
444
445 responses
447 .into_iter()
448 .reduce(|mut acc, resp| {
449 acc.data.append(resp.data);
450 acc
451 })
452 .unwrap()
453 } else {
454 do_range_query(&handler, &prom_query, query_ctx).await
455 }
456}
457
458async fn do_range_query(
460 handler: &PrometheusHandlerRef,
461 prom_query: &PromQuery,
462 query_ctx: QueryContextRef,
463) -> PrometheusJsonResponse {
464 let result = handler.do_query(prom_query, query_ctx).await;
465 let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
466 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
467 Ok((metric_name, _)) => metric_name,
468 };
469 PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
470}
471
472#[derive(Debug, Default, Serialize)]
473struct Matches(Vec<String>);
474
475#[derive(Debug, Default, Serialize, Deserialize)]
476pub struct LabelsQuery {
477 start: Option<String>,
478 end: Option<String>,
479 lookback: Option<String>,
480 #[serde(flatten)]
481 matches: Matches,
482 db: Option<String>,
483}
484
485impl<'de> Deserialize<'de> for Matches {
487 fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
488 where
489 D: de::Deserializer<'de>,
490 {
491 struct MatchesVisitor;
492
493 impl<'d> Visitor<'d> for MatchesVisitor {
494 type Value = Vec<String>;
495
496 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
497 formatter.write_str("a string")
498 }
499
500 fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
501 where
502 M: MapAccess<'d>,
503 {
504 let mut matches = Vec::new();
505 while let Some((key, value)) = access.next_entry::<String, String>()? {
506 if key == "match[]" {
507 matches.push(value);
508 }
509 }
510 Ok(matches)
511 }
512 }
513 Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
514 }
515}
516
517macro_rules! handle_schema_err {
524 ($result:expr) => {
525 match $result {
526 Ok(v) => Some(v),
527 Err(err) => {
528 if err.status_code() == StatusCode::TableNotFound
529 || err.status_code() == StatusCode::TableColumnNotFound
530 {
531 None
533 } else {
534 return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
535 }
536 }
537 }
538 };
539}
540
541#[axum_macros::debug_handler]
542#[tracing::instrument(
543 skip_all,
544 fields(protocol = "prometheus", request_type = "labels_query")
545)]
546pub async fn labels_query(
547 State(handler): State<PrometheusHandlerRef>,
548 Query(params): Query<LabelsQuery>,
549 Extension(mut query_ctx): Extension<QueryContext>,
550 Form(form_params): Form<LabelsQuery>,
551) -> PrometheusJsonResponse {
552 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
553 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
554 let query_ctx = Arc::new(query_ctx);
555
556 let mut queries = params.matches.0;
557 if queries.is_empty() {
558 queries = form_params.matches.0;
559 }
560
561 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
562 .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
563 .start_timer();
564
565 let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
567 {
568 Ok(labels) => labels,
569 Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
570 };
571 let _ = labels.insert(METRIC_NAME.to_string());
573
574 if queries.is_empty() {
576 let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
577 labels_vec.sort_unstable();
578 return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
579 }
580
581 let start = params
583 .start
584 .or(form_params.start)
585 .unwrap_or_else(yesterday_rfc3339);
586 let end = params
587 .end
588 .or(form_params.end)
589 .unwrap_or_else(current_time_rfc3339);
590 let lookback = params
591 .lookback
592 .or(form_params.lookback)
593 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
594
595 let mut fetched_labels = HashSet::new();
596 let _ = fetched_labels.insert(METRIC_NAME.to_string());
597
598 let mut merge_map = HashMap::new();
599 for query in queries {
600 let prom_query = PromQuery {
601 query,
602 start: start.clone(),
603 end: end.clone(),
604 step: DEFAULT_LOOKBACK_STRING.to_string(),
605 lookback: lookback.clone(),
606 alias: None,
607 };
608
609 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
610 handle_schema_err!(
611 retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
612 .await
613 );
614 }
615
616 fetched_labels.retain(|l| labels.contains(l));
618 let _ = labels.insert(METRIC_NAME.to_string());
619
620 let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
621 sorted_labels.sort();
622 let merge_map = merge_map
623 .into_iter()
624 .map(|(k, v)| (k, Value::from(v)))
625 .collect();
626 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
627 resp.resp_metrics = merge_map;
628 resp
629}
630
631async fn get_all_column_names(
633 catalog: &str,
634 schema: &str,
635 manager: &CatalogManagerRef,
636) -> std::result::Result<HashSet<String>, catalog::error::Error> {
637 let table_names = manager.table_names(catalog, schema, None).await?;
638
639 let mut labels = HashSet::new();
640 for table_name in table_names {
641 let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
642 continue;
643 };
644 for column in table.primary_key_columns() {
645 if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
646 && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
647 {
648 labels.insert(column.name);
649 }
650 }
651 }
652
653 Ok(labels)
654}
655
656async fn retrieve_series_from_query_result(
657 result: Result<Output>,
658 series: &mut Vec<HashMap<Column, String>>,
659 query_ctx: &QueryContext,
660 table_name: &str,
661 manager: &CatalogManagerRef,
662 metrics: &mut HashMap<String, u64>,
663) -> Result<()> {
664 let result = result?;
665
666 let table = manager
668 .table(
669 query_ctx.current_catalog(),
670 &query_ctx.current_schema(),
671 table_name,
672 Some(query_ctx),
673 )
674 .await
675 .context(CatalogSnafu)?
676 .with_context(|| TableNotFoundSnafu {
677 catalog: query_ctx.current_catalog(),
678 schema: query_ctx.current_schema(),
679 table: table_name,
680 })?;
681 let tag_columns = table
682 .primary_key_columns()
683 .map(|c| c.name)
684 .collect::<HashSet<_>>();
685
686 match result.data {
687 OutputData::RecordBatches(batches) => {
688 record_batches_to_series(batches, series, table_name, &tag_columns)
689 }
690 OutputData::Stream(stream) => {
691 let batches = RecordBatches::try_collect(stream)
692 .await
693 .context(CollectRecordbatchSnafu)?;
694 record_batches_to_series(batches, series, table_name, &tag_columns)
695 }
696 OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
697 reason: "expected data result, but got affected rows".to_string(),
698 location: Location::default(),
699 }),
700 }?;
701
702 if let Some(ref plan) = result.meta.plan {
703 collect_plan_metrics(plan, &mut [metrics]);
704 }
705 Ok(())
706}
707
708async fn retrieve_labels_name_from_query_result(
710 result: Result<Output>,
711 labels: &mut HashSet<String>,
712 metrics: &mut HashMap<String, u64>,
713) -> Result<()> {
714 let result = result?;
715 match result.data {
716 OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
717 OutputData::Stream(stream) => {
718 let batches = RecordBatches::try_collect(stream)
719 .await
720 .context(CollectRecordbatchSnafu)?;
721 record_batches_to_labels_name(batches, labels)
722 }
723 OutputData::AffectedRows(_) => UnexpectedResultSnafu {
724 reason: "expected data result, but got affected rows".to_string(),
725 }
726 .fail(),
727 }?;
728 if let Some(ref plan) = result.meta.plan {
729 collect_plan_metrics(plan, &mut [metrics]);
730 }
731 Ok(())
732}
733
734fn record_batches_to_series(
735 batches: RecordBatches,
736 series: &mut Vec<HashMap<Column, String>>,
737 table_name: &str,
738 tag_columns: &HashSet<String>,
739) -> Result<()> {
740 for batch in batches.iter() {
741 let projection = batch
743 .schema
744 .column_schemas()
745 .iter()
746 .enumerate()
747 .filter_map(|(idx, col)| {
748 if tag_columns.contains(&col.name) {
749 Some(idx)
750 } else {
751 None
752 }
753 })
754 .collect::<Vec<_>>();
755 let batch = batch
756 .try_project(&projection)
757 .context(CollectRecordbatchSnafu)?;
758
759 let mut writer = RowWriter::new(&batch.schema, table_name);
760 writer.write(batch, series)?;
761 }
762 Ok(())
763}
764
765struct RowWriter {
772 template: HashMap<Column, Option<String>>,
775 current: Option<HashMap<Column, Option<String>>>,
777}
778
779impl RowWriter {
780 fn new(schema: &SchemaRef, table: &str) -> Self {
781 let mut template = schema
782 .column_schemas()
783 .iter()
784 .map(|x| (x.name.as_str().into(), None))
785 .collect::<HashMap<Column, Option<String>>>();
786 template.insert("__name__".into(), Some(table.to_string()));
787 Self {
788 template,
789 current: None,
790 }
791 }
792
793 fn insert(&mut self, column: ColumnRef, value: impl ToString) {
794 let current = self.current.get_or_insert_with(|| self.template.clone());
795 match current.get_mut(&column as &dyn AsColumnRef) {
796 Some(x) => {
797 let _ = x.insert(value.to_string());
798 }
799 None => {
800 let _ = current.insert(column.0.into(), Some(value.to_string()));
801 }
802 }
803 }
804
805 fn insert_bytes(&mut self, column_schema: &ColumnSchema, bytes: &[u8]) -> Result<()> {
806 let column_name = column_schema.name.as_str().into();
807
808 if column_schema.data_type.is_json() {
809 let s = jsonb_to_string(bytes).context(ConvertScalarValueSnafu)?;
810 self.insert(column_name, s);
811 } else {
812 let hex = bytes
813 .iter()
814 .map(|b| format!("{b:02x}"))
815 .collect::<Vec<String>>()
816 .join("");
817 self.insert(column_name, hex);
818 }
819 Ok(())
820 }
821
822 fn finish(&mut self) -> HashMap<Column, String> {
823 let Some(current) = self.current.take() else {
824 return HashMap::new();
825 };
826 current
827 .into_iter()
828 .filter_map(|(k, v)| v.map(|v| (k, v)))
829 .collect()
830 }
831
832 fn write(
833 &mut self,
834 record_batch: RecordBatch,
835 series: &mut Vec<HashMap<Column, String>>,
836 ) -> Result<()> {
837 let schema = record_batch.schema.clone();
838 let record_batch = record_batch.into_df_record_batch();
839 for i in 0..record_batch.num_rows() {
840 for (j, array) in record_batch.columns().iter().enumerate() {
841 let column = schema.column_name_by_index(j).into();
842
843 if array.is_null(i) {
844 self.insert(column, "Null");
845 continue;
846 }
847
848 match array.data_type() {
849 DataType::Null => {
850 self.insert(column, "Null");
851 }
852 DataType::Boolean => {
853 let array = array.as_boolean();
854 let v = array.value(i);
855 self.insert(column, v);
856 }
857 DataType::UInt8 => {
858 let array = array.as_primitive::<UInt8Type>();
859 let v = array.value(i);
860 self.insert(column, v);
861 }
862 DataType::UInt16 => {
863 let array = array.as_primitive::<UInt16Type>();
864 let v = array.value(i);
865 self.insert(column, v);
866 }
867 DataType::UInt32 => {
868 let array = array.as_primitive::<UInt32Type>();
869 let v = array.value(i);
870 self.insert(column, v);
871 }
872 DataType::UInt64 => {
873 let array = array.as_primitive::<UInt64Type>();
874 let v = array.value(i);
875 self.insert(column, v);
876 }
877 DataType::Int8 => {
878 let array = array.as_primitive::<Int8Type>();
879 let v = array.value(i);
880 self.insert(column, v);
881 }
882 DataType::Int16 => {
883 let array = array.as_primitive::<Int16Type>();
884 let v = array.value(i);
885 self.insert(column, v);
886 }
887 DataType::Int32 => {
888 let array = array.as_primitive::<Int32Type>();
889 let v = array.value(i);
890 self.insert(column, v);
891 }
892 DataType::Int64 => {
893 let array = array.as_primitive::<Int64Type>();
894 let v = array.value(i);
895 self.insert(column, v);
896 }
897 DataType::Float32 => {
898 let array = array.as_primitive::<Float32Type>();
899 let v = array.value(i);
900 self.insert(column, v);
901 }
902 DataType::Float64 => {
903 let array = array.as_primitive::<Float64Type>();
904 let v = array.value(i);
905 self.insert(column, v);
906 }
907 DataType::Utf8 => {
908 let array = array.as_string::<i32>();
909 let v = array.value(i);
910 self.insert(column, v);
911 }
912 DataType::LargeUtf8 => {
913 let array = array.as_string::<i64>();
914 let v = array.value(i);
915 self.insert(column, v);
916 }
917 DataType::Utf8View => {
918 let array = array.as_string_view();
919 let v = array.value(i);
920 self.insert(column, v);
921 }
922 DataType::Binary => {
923 let array = array.as_binary::<i32>();
924 let v = array.value(i);
925 let column_schema = &schema.column_schemas()[j];
926 self.insert_bytes(column_schema, v)?;
927 }
928 DataType::LargeBinary => {
929 let array = array.as_binary::<i64>();
930 let v = array.value(i);
931 let column_schema = &schema.column_schemas()[j];
932 self.insert_bytes(column_schema, v)?;
933 }
934 DataType::BinaryView => {
935 let array = array.as_binary_view();
936 let v = array.value(i);
937 let column_schema = &schema.column_schemas()[j];
938 self.insert_bytes(column_schema, v)?;
939 }
940 DataType::Date32 => {
941 let array = array.as_primitive::<Date32Type>();
942 let v = Date::new(array.value(i));
943 self.insert(column, v);
944 }
945 DataType::Date64 => {
946 let array = array.as_primitive::<Date64Type>();
947 let v = Date::new((array.value(i) / 86_400_000) as i32);
951 self.insert(column, v);
952 }
953 DataType::Timestamp(time_unit, _) => {
954 let v = match time_unit {
955 TimeUnit::Second => {
956 let array = array.as_primitive::<TimestampSecondType>();
957 array.value(i)
958 }
959 TimeUnit::Millisecond => {
960 let array = array.as_primitive::<TimestampMillisecondType>();
961 array.value(i)
962 }
963 TimeUnit::Microsecond => {
964 let array = array.as_primitive::<TimestampMicrosecondType>();
965 array.value(i)
966 }
967 TimeUnit::Nanosecond => {
968 let array = array.as_primitive::<TimestampNanosecondType>();
969 array.value(i)
970 }
971 };
972 let v = Timestamp::new(v, time_unit.into());
973 self.insert(column, v.to_iso8601_string());
974 }
975 DataType::Time32(time_unit) | DataType::Time64(time_unit) => {
976 let v = match time_unit {
977 TimeUnit::Second => {
978 let array = array.as_primitive::<Time32SecondType>();
979 Time::new_second(array.value(i) as i64)
980 }
981 TimeUnit::Millisecond => {
982 let array = array.as_primitive::<Time32MillisecondType>();
983 Time::new_millisecond(array.value(i) as i64)
984 }
985 TimeUnit::Microsecond => {
986 let array = array.as_primitive::<Time64MicrosecondType>();
987 Time::new_microsecond(array.value(i))
988 }
989 TimeUnit::Nanosecond => {
990 let array = array.as_primitive::<Time64NanosecondType>();
991 Time::new_nanosecond(array.value(i))
992 }
993 };
994 self.insert(column, v.to_iso8601_string());
995 }
996 DataType::Interval(interval_unit) => match interval_unit {
997 IntervalUnit::YearMonth => {
998 let array = array.as_primitive::<IntervalYearMonthType>();
999 let v: IntervalYearMonth = array.value(i).into();
1000 self.insert(column, v.to_iso8601_string());
1001 }
1002 IntervalUnit::DayTime => {
1003 let array = array.as_primitive::<IntervalDayTimeType>();
1004 let v: IntervalDayTime = array.value(i).into();
1005 self.insert(column, v.to_iso8601_string());
1006 }
1007 IntervalUnit::MonthDayNano => {
1008 let array = array.as_primitive::<IntervalMonthDayNanoType>();
1009 let v: IntervalMonthDayNano = array.value(i).into();
1010 self.insert(column, v.to_iso8601_string());
1011 }
1012 },
1013 DataType::Duration(time_unit) => {
1014 let v = match time_unit {
1015 TimeUnit::Second => {
1016 let array = array.as_primitive::<DurationSecondType>();
1017 array.value(i)
1018 }
1019 TimeUnit::Millisecond => {
1020 let array = array.as_primitive::<DurationMillisecondType>();
1021 array.value(i)
1022 }
1023 TimeUnit::Microsecond => {
1024 let array = array.as_primitive::<DurationMicrosecondType>();
1025 array.value(i)
1026 }
1027 TimeUnit::Nanosecond => {
1028 let array = array.as_primitive::<DurationNanosecondType>();
1029 array.value(i)
1030 }
1031 };
1032 let d = Duration::new(v, time_unit.into());
1033 self.insert(column, d);
1034 }
1035 DataType::List(_) => {
1036 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
1037 self.insert(column, v);
1038 }
1039 DataType::Struct(_) => {
1040 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
1041 self.insert(column, v);
1042 }
1043 DataType::Decimal128(precision, scale) => {
1044 let array = array.as_primitive::<Decimal128Type>();
1045 let v = Decimal128::new(array.value(i), *precision, *scale);
1046 self.insert(column, v);
1047 }
1048 _ => {
1049 return NotSupportedSnafu {
1050 feat: format!("convert {} to http value", array.data_type()),
1051 }
1052 .fail();
1053 }
1054 }
1055 }
1056
1057 series.push(self.finish())
1058 }
1059 Ok(())
1060 }
1061}
1062
1063#[derive(Debug, Clone, Copy, PartialEq)]
1064struct ColumnRef<'a>(&'a str);
1065
1066impl<'a> From<&'a str> for ColumnRef<'a> {
1067 fn from(s: &'a str) -> Self {
1068 Self(s)
1069 }
1070}
1071
1072trait AsColumnRef {
1073 fn as_ref(&self) -> ColumnRef<'_>;
1074}
1075
1076impl AsColumnRef for Column {
1077 fn as_ref(&self) -> ColumnRef<'_> {
1078 self.0.as_str().into()
1079 }
1080}
1081
1082impl AsColumnRef for ColumnRef<'_> {
1083 fn as_ref(&self) -> ColumnRef<'_> {
1084 *self
1085 }
1086}
1087
1088impl<'a> PartialEq for dyn AsColumnRef + 'a {
1089 fn eq(&self, other: &Self) -> bool {
1090 self.as_ref() == other.as_ref()
1091 }
1092}
1093
1094impl<'a> Eq for dyn AsColumnRef + 'a {}
1095
1096impl<'a> Hash for dyn AsColumnRef + 'a {
1097 fn hash<H: Hasher>(&self, state: &mut H) {
1098 self.as_ref().0.hash(state);
1099 }
1100}
1101
1102impl<'a> Borrow<dyn AsColumnRef + 'a> for Column {
1103 fn borrow(&self) -> &(dyn AsColumnRef + 'a) {
1104 self
1105 }
1106}
1107
1108fn record_batches_to_labels_name(
1110 batches: RecordBatches,
1111 labels: &mut HashSet<String>,
1112) -> Result<()> {
1113 let mut column_indices = Vec::new();
1114 let mut field_column_indices = Vec::new();
1115 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
1116 if let ConcreteDataType::Float64(_) = column.data_type {
1117 field_column_indices.push(i);
1118 }
1119 column_indices.push(i);
1120 }
1121
1122 if field_column_indices.is_empty() {
1123 return Err(Error::Internal {
1124 err_msg: "no value column found".to_string(),
1125 });
1126 }
1127
1128 for batch in batches.iter() {
1129 let names = column_indices
1130 .iter()
1131 .map(|c| batches.schema().column_name_by_index(*c).to_string())
1132 .collect::<Vec<_>>();
1133
1134 let field_columns = field_column_indices
1135 .iter()
1136 .map(|i| {
1137 batch
1138 .column(*i)
1139 .as_any()
1140 .downcast_ref::<Float64Vector>()
1141 .unwrap()
1142 })
1143 .collect::<Vec<_>>();
1144
1145 for row_index in 0..batch.num_rows() {
1146 if field_columns
1148 .iter()
1149 .all(|c| c.get_data(row_index).is_none())
1150 {
1151 continue;
1152 }
1153
1154 names.iter().for_each(|name| {
1156 let _ = labels.insert(name.clone());
1157 });
1158 return Ok(());
1159 }
1160 }
1161 Ok(())
1162}
1163
1164pub(crate) fn retrieve_metric_name_and_result_type(
1165 promql: &str,
1166) -> Result<(Option<String>, ValueType)> {
1167 let promql_expr = promql_parser::parser::parse(promql)
1168 .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
1169 let metric_name = promql_expr_to_metric_name(&promql_expr);
1170 let result_type = promql_expr.value_type();
1171
1172 Ok((metric_name, result_type))
1173}
1174
1175pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
1178 if let Some(db) = db {
1179 parse_catalog_and_schema_from_db_string(db)
1180 } else {
1181 (
1182 ctx.current_catalog().to_string(),
1183 ctx.current_schema().clone(),
1184 )
1185 }
1186}
1187
1188pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
1190 if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
1191 ctx.set_current_catalog(catalog);
1192 ctx.set_current_schema(schema);
1193 }
1194}
1195
1196fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
1197 let mut metric_names = HashSet::new();
1198 collect_metric_names(expr, &mut metric_names);
1199
1200 if metric_names.len() == 1 {
1202 metric_names.into_iter().next()
1203 } else {
1204 None
1205 }
1206}
1207
1208fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
1210 match expr {
1211 PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
1212 match modifier {
1213 Some(LabelModifier::Include(labels)) => {
1214 if !labels.labels.contains(&METRIC_NAME.to_string()) {
1215 metric_names.clear();
1216 return;
1217 }
1218 }
1219 Some(LabelModifier::Exclude(labels)) => {
1220 if labels.labels.contains(&METRIC_NAME.to_string()) {
1221 metric_names.clear();
1222 return;
1223 }
1224 }
1225 _ => {}
1226 }
1227 collect_metric_names(expr, metric_names)
1228 }
1229 PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
1230 PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
1231 if matches!(
1232 op.id(),
1233 token::T_LAND | token::T_LOR | token::T_LUNLESS ) {
1237 collect_metric_names(lhs, metric_names)
1238 } else {
1239 metric_names.clear()
1240 }
1241 }
1242 PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
1243 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
1244 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1245 if let Some(name) = name {
1246 metric_names.insert(name.clone());
1247 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1248 metric_names.insert(matcher.value);
1249 }
1250 }
1251 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1252 let VectorSelector { name, matchers, .. } = vs;
1253 if let Some(name) = name {
1254 metric_names.insert(name.clone());
1255 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1256 metric_names.insert(matcher.value);
1257 }
1258 }
1259 PromqlExpr::Call(Call { args, .. }) => {
1260 args.args
1261 .iter()
1262 .for_each(|e| collect_metric_names(e, metric_names));
1263 }
1264 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1265 }
1266}
1267
1268fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
1269where
1270 F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
1271{
1272 match expr {
1273 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1274 PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
1275 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1276 find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
1277 }
1278 PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
1279 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1280 PromqlExpr::NumberLiteral(_) => None,
1281 PromqlExpr::StringLiteral(_) => None,
1282 PromqlExpr::Extension(_) => None,
1283 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
1284 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1285 let VectorSelector { name, matchers, .. } = vs;
1286
1287 f(name, matchers)
1288 }
1289 PromqlExpr::Call(Call { args, .. }) => args
1290 .args
1291 .iter()
1292 .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
1293 }
1294}
1295
1296fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
1298 find_metric_name_and_matchers(expr, |name, matchers| {
1299 if name.is_some() {
1301 return None;
1302 }
1303
1304 Some(matchers.find_matchers(METRIC_NAME))
1306 })
1307 .map(|matchers| {
1308 matchers
1309 .into_iter()
1310 .filter(|m| !matches!(m.op, MatchOp::Equal))
1311 .map(normalize_matcher)
1312 .collect::<Vec<_>>()
1313 })
1314}
1315
1316fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
1319 match expr {
1320 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
1321 update_metric_name_matcher(expr, metric_name)
1322 }
1323 PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1324 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1325 update_metric_name_matcher(lhs, metric_name);
1326 update_metric_name_matcher(rhs, metric_name);
1327 }
1328 PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1329 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
1330 update_metric_name_matcher(expr, metric_name)
1331 }
1332 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1333 if name.is_some() {
1334 return;
1335 }
1336
1337 for m in &mut matchers.matchers {
1338 if m.name == METRIC_NAME {
1339 m.op = MatchOp::Equal;
1340 m.value = metric_name.to_string();
1341 }
1342 }
1343 }
1344 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1345 let VectorSelector { name, matchers, .. } = vs;
1346 if name.is_some() {
1347 return;
1348 }
1349
1350 for m in &mut matchers.matchers {
1351 if m.name == METRIC_NAME {
1352 m.op = MatchOp::Equal;
1353 m.value = metric_name.to_string();
1354 }
1355 }
1356 }
1357 PromqlExpr::Call(Call { args, .. }) => {
1358 args.args.iter_mut().for_each(|e| {
1359 update_metric_name_matcher(e, metric_name);
1360 });
1361 }
1362 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1363 }
1364}
1365
1366#[derive(Debug, Default, Serialize, Deserialize)]
1367pub struct LabelValueQuery {
1368 start: Option<String>,
1369 end: Option<String>,
1370 lookback: Option<String>,
1371 #[serde(flatten)]
1372 matches: Matches,
1373 db: Option<String>,
1374 limit: Option<usize>,
1375}
1376
1377#[axum_macros::debug_handler]
1378#[tracing::instrument(
1379 skip_all,
1380 fields(protocol = "prometheus", request_type = "label_values_query")
1381)]
1382pub async fn label_values_query(
1383 State(handler): State<PrometheusHandlerRef>,
1384 Path(label_name): Path<String>,
1385 Extension(mut query_ctx): Extension<QueryContext>,
1386 Query(params): Query<LabelValueQuery>,
1387) -> PrometheusJsonResponse {
1388 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
1389 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1390 let query_ctx = Arc::new(query_ctx);
1391
1392 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1393 .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1394 .start_timer();
1395
1396 if label_name == METRIC_NAME_LABEL {
1397 let catalog_manager = handler.catalog_manager();
1398
1399 let mut table_names = try_call_return_response!(
1400 retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1401 );
1402
1403 truncate_results(&mut table_names, params.limit);
1404 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1405 } else if label_name == FIELD_NAME_LABEL {
1406 let field_columns = handle_schema_err!(
1407 retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1408 )
1409 .unwrap_or_default();
1410 let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1411 field_columns.sort_unstable();
1412 truncate_results(&mut field_columns, params.limit);
1413 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1414 } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
1415 let catalog_manager = handler.catalog_manager();
1416
1417 let mut schema_names = try_call_return_response!(
1418 retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1419 );
1420 truncate_results(&mut schema_names, params.limit);
1421 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1422 }
1423
1424 let queries = params.matches.0;
1425 if queries.is_empty() {
1426 return PrometheusJsonResponse::error(
1427 StatusCode::InvalidArguments,
1428 "match[] parameter is required",
1429 );
1430 }
1431
1432 let start = params.start.unwrap_or_else(yesterday_rfc3339);
1433 let end = params.end.unwrap_or_else(current_time_rfc3339);
1434 let mut label_values = HashSet::new();
1435
1436 let start = try_call_return_response!(
1437 QueryLanguageParser::parse_promql_timestamp(&start)
1438 .context(ParseTimestampSnafu { timestamp: &start })
1439 );
1440 let end = try_call_return_response!(
1441 QueryLanguageParser::parse_promql_timestamp(&end)
1442 .context(ParseTimestampSnafu { timestamp: &end })
1443 );
1444
1445 for query in queries {
1446 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1447 let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1448 return PrometheusJsonResponse::error(
1449 StatusCode::InvalidArguments,
1450 "expected vector selector",
1451 );
1452 };
1453 let Some(name) = take_metric_name(&mut vector_selector) else {
1454 return PrometheusJsonResponse::error(
1455 StatusCode::InvalidArguments,
1456 "expected metric name",
1457 );
1458 };
1459 let VectorSelector { matchers, .. } = vector_selector;
1460 let matchers = matchers.matchers;
1462 let result = handler
1463 .query_label_values(name, label_name.clone(), matchers, start, end, &query_ctx)
1464 .await;
1465 if let Some(result) = handle_schema_err!(result) {
1466 label_values.extend(result.into_iter());
1467 }
1468 }
1469
1470 let mut label_values: Vec<_> = label_values.into_iter().collect();
1471 label_values.sort_unstable();
1472 truncate_results(&mut label_values, params.limit);
1473
1474 PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1475}
1476
1477fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1478 if let Some(limit) = limit
1479 && limit > 0
1480 && label_values.len() >= limit
1481 {
1482 label_values.truncate(limit);
1483 }
1484}
1485
1486fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1489 if let Some(name) = selector.name.take() {
1490 return Some(name);
1491 }
1492
1493 let (pos, matcher) = selector
1494 .matchers
1495 .matchers
1496 .iter()
1497 .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1498 let name = matcher.value.clone();
1499 selector.matchers.matchers.remove(pos);
1501
1502 Some(name)
1503}
1504
1505async fn retrieve_table_names(
1506 query_ctx: &QueryContext,
1507 catalog_manager: CatalogManagerRef,
1508 matches: Vec<String>,
1509) -> Result<Vec<String>> {
1510 let catalog = query_ctx.current_catalog();
1511 let schema = query_ctx.current_schema();
1512
1513 let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1514 let mut table_names = Vec::new();
1515
1516 let name_matcher = matches
1518 .first()
1519 .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1520 .and_then(|expr| {
1521 if let PromqlExpr::VectorSelector(vector_selector) = expr {
1522 let matchers = vector_selector.matchers.matchers;
1523 for matcher in matchers {
1524 if matcher.name == METRIC_NAME_LABEL {
1525 return Some(matcher);
1526 }
1527 }
1528
1529 None
1530 } else {
1531 None
1532 }
1533 });
1534
1535 while let Some(table) = tables_stream.next().await {
1536 let table = table.context(CatalogSnafu)?;
1537 if !table
1538 .table_info()
1539 .meta
1540 .options
1541 .extra_options
1542 .contains_key(LOGICAL_TABLE_METADATA_KEY)
1543 {
1544 continue;
1546 }
1547
1548 let table_name = &table.table_info().name;
1549
1550 if let Some(matcher) = &name_matcher {
1551 match &matcher.op {
1552 MatchOp::Equal => {
1553 if table_name == &matcher.value {
1554 table_names.push(table_name.clone());
1555 }
1556 }
1557 MatchOp::Re(reg) => {
1558 if reg.is_match(table_name) {
1559 table_names.push(table_name.clone());
1560 }
1561 }
1562 _ => {
1563 table_names.push(table_name.clone());
1566 }
1567 }
1568 } else {
1569 table_names.push(table_name.clone());
1570 }
1571 }
1572
1573 table_names.sort_unstable();
1574 Ok(table_names)
1575}
1576
1577async fn retrieve_field_names(
1578 query_ctx: &QueryContext,
1579 manager: CatalogManagerRef,
1580 matches: Vec<String>,
1581) -> Result<HashSet<String>> {
1582 let mut field_columns = HashSet::new();
1583 let catalog = query_ctx.current_catalog();
1584 let schema = query_ctx.current_schema();
1585
1586 if matches.is_empty() {
1587 while let Some(table) = manager
1589 .tables(catalog, &schema, Some(query_ctx))
1590 .next()
1591 .await
1592 {
1593 let table = table.context(CatalogSnafu)?;
1594 for column in table.field_columns() {
1595 field_columns.insert(column.name);
1596 }
1597 }
1598 return Ok(field_columns);
1599 }
1600
1601 for table_name in matches {
1602 let table = manager
1603 .table(catalog, &schema, &table_name, Some(query_ctx))
1604 .await
1605 .context(CatalogSnafu)?
1606 .with_context(|| TableNotFoundSnafu {
1607 catalog: catalog.to_string(),
1608 schema: schema.clone(),
1609 table: table_name.clone(),
1610 })?;
1611
1612 for column in table.field_columns() {
1613 field_columns.insert(column.name);
1614 }
1615 }
1616 Ok(field_columns)
1617}
1618
1619async fn retrieve_schema_names(
1620 query_ctx: &QueryContext,
1621 catalog_manager: CatalogManagerRef,
1622 matches: Vec<String>,
1623) -> Result<Vec<String>> {
1624 let mut schemas = Vec::new();
1625 let catalog = query_ctx.current_catalog();
1626
1627 let candidate_schemas = catalog_manager
1628 .schema_names(catalog, Some(query_ctx))
1629 .await
1630 .context(CatalogSnafu)?;
1631
1632 for schema in candidate_schemas {
1633 let mut found = true;
1634 for match_item in &matches {
1635 if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1636 let exists = catalog_manager
1637 .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1638 .await
1639 .context(CatalogSnafu)?;
1640 if !exists {
1641 found = false;
1642 break;
1643 }
1644 }
1645 }
1646
1647 if found {
1648 schemas.push(schema);
1649 }
1650 }
1651
1652 schemas.sort_unstable();
1653
1654 Ok(schemas)
1655}
1656
1657fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1662 let promql_expr = promql_parser::parser::parse(query).ok()?;
1663 promql_expr_to_metric_name(&promql_expr)
1664}
1665
1666#[derive(Debug, Default, Serialize, Deserialize)]
1667pub struct SeriesQuery {
1668 start: Option<String>,
1669 end: Option<String>,
1670 lookback: Option<String>,
1671 #[serde(flatten)]
1672 matches: Matches,
1673 db: Option<String>,
1674}
1675
1676#[axum_macros::debug_handler]
1677#[tracing::instrument(
1678 skip_all,
1679 fields(protocol = "prometheus", request_type = "series_query")
1680)]
1681pub async fn series_query(
1682 State(handler): State<PrometheusHandlerRef>,
1683 Query(params): Query<SeriesQuery>,
1684 Extension(mut query_ctx): Extension<QueryContext>,
1685 Form(form_params): Form<SeriesQuery>,
1686) -> PrometheusJsonResponse {
1687 let mut queries: Vec<String> = params.matches.0;
1688 if queries.is_empty() {
1689 queries = form_params.matches.0;
1690 }
1691 if queries.is_empty() {
1692 return PrometheusJsonResponse::error(
1693 StatusCode::Unsupported,
1694 "match[] parameter is required",
1695 );
1696 }
1697 let start = params
1698 .start
1699 .or(form_params.start)
1700 .unwrap_or_else(yesterday_rfc3339);
1701 let end = params
1702 .end
1703 .or(form_params.end)
1704 .unwrap_or_else(current_time_rfc3339);
1705 let lookback = params
1706 .lookback
1707 .or(form_params.lookback)
1708 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1709
1710 if let Some(db) = ¶ms.db {
1712 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1713 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1714 }
1715 let query_ctx = Arc::new(query_ctx);
1716
1717 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1718 .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1719 .start_timer();
1720
1721 let mut series = Vec::new();
1722 let mut merge_map = HashMap::new();
1723 for query in queries {
1724 let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1725 let prom_query = PromQuery {
1726 query,
1727 start: start.clone(),
1728 end: end.clone(),
1729 step: DEFAULT_LOOKBACK_STRING.to_string(),
1731 lookback: lookback.clone(),
1732 alias: None,
1733 };
1734 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1735
1736 handle_schema_err!(
1737 retrieve_series_from_query_result(
1738 result,
1739 &mut series,
1740 &query_ctx,
1741 &table_name,
1742 &handler.catalog_manager(),
1743 &mut merge_map,
1744 )
1745 .await
1746 );
1747 }
1748 let merge_map = merge_map
1749 .into_iter()
1750 .map(|(k, v)| (k, Value::from(v)))
1751 .collect();
1752 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1753 resp.resp_metrics = merge_map;
1754 resp
1755}
1756
1757#[derive(Debug, Default, Serialize, Deserialize)]
1758pub struct ParseQuery {
1759 query: Option<String>,
1760 db: Option<String>,
1761}
1762
1763#[axum_macros::debug_handler]
1764#[tracing::instrument(
1765 skip_all,
1766 fields(protocol = "prometheus", request_type = "parse_query")
1767)]
1768pub async fn parse_query(
1769 State(_handler): State<PrometheusHandlerRef>,
1770 Query(params): Query<ParseQuery>,
1771 Extension(_query_ctx): Extension<QueryContext>,
1772 Form(form_params): Form<ParseQuery>,
1773) -> PrometheusJsonResponse {
1774 if let Some(query) = params.query.or(form_params.query) {
1775 let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1776 PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1777 } else {
1778 PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1779 }
1780}
1781
1782#[cfg(test)]
1783mod tests {
1784 use promql_parser::parser::value::ValueType;
1785
1786 use super::*;
1787
1788 struct TestCase {
1789 name: &'static str,
1790 promql: &'static str,
1791 expected_metric: Option<&'static str>,
1792 expected_type: ValueType,
1793 should_error: bool,
1794 }
1795
1796 #[test]
1797 fn test_retrieve_metric_name_and_result_type() {
1798 let test_cases = &[
1799 TestCase {
1801 name: "simple metric",
1802 promql: "cpu_usage",
1803 expected_metric: Some("cpu_usage"),
1804 expected_type: ValueType::Vector,
1805 should_error: false,
1806 },
1807 TestCase {
1808 name: "metric with selector",
1809 promql: r#"cpu_usage{instance="localhost"}"#,
1810 expected_metric: Some("cpu_usage"),
1811 expected_type: ValueType::Vector,
1812 should_error: false,
1813 },
1814 TestCase {
1815 name: "metric with range selector",
1816 promql: "cpu_usage[5m]",
1817 expected_metric: Some("cpu_usage"),
1818 expected_type: ValueType::Matrix,
1819 should_error: false,
1820 },
1821 TestCase {
1822 name: "metric with __name__ matcher",
1823 promql: r#"{__name__="cpu_usage"}"#,
1824 expected_metric: Some("cpu_usage"),
1825 expected_type: ValueType::Vector,
1826 should_error: false,
1827 },
1828 TestCase {
1829 name: "metric with unary operator",
1830 promql: "-cpu_usage",
1831 expected_metric: None,
1832 expected_type: ValueType::Vector,
1833 should_error: false,
1834 },
1835 TestCase {
1837 name: "metric with aggregation",
1838 promql: "sum(cpu_usage)",
1839 expected_metric: Some("cpu_usage"),
1840 expected_type: ValueType::Vector,
1841 should_error: false,
1842 },
1843 TestCase {
1844 name: "complex aggregation",
1845 promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1846 expected_metric: None,
1847 expected_type: ValueType::Vector,
1848 should_error: false,
1849 },
1850 TestCase {
1851 name: "complex aggregation",
1852 promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1853 expected_metric: Some("cpu_usage"),
1854 expected_type: ValueType::Vector,
1855 should_error: false,
1856 },
1857 TestCase {
1858 name: "complex aggregation",
1859 promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1860 expected_metric: Some("cpu_usage"),
1861 expected_type: ValueType::Vector,
1862 should_error: false,
1863 },
1864 TestCase {
1866 name: "same metric addition",
1867 promql: "cpu_usage + cpu_usage",
1868 expected_metric: None,
1869 expected_type: ValueType::Vector,
1870 should_error: false,
1871 },
1872 TestCase {
1873 name: "metric with scalar addition",
1874 promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1875 expected_metric: None,
1876 expected_type: ValueType::Vector,
1877 should_error: false,
1878 },
1879 TestCase {
1881 name: "different metrics addition",
1882 promql: "cpu_usage + memory_usage",
1883 expected_metric: None,
1884 expected_type: ValueType::Vector,
1885 should_error: false,
1886 },
1887 TestCase {
1888 name: "different metrics subtraction",
1889 promql: "network_in - network_out",
1890 expected_metric: None,
1891 expected_type: ValueType::Vector,
1892 should_error: false,
1893 },
1894 TestCase {
1896 name: "unless with different metrics",
1897 promql: "cpu_usage unless memory_usage",
1898 expected_metric: Some("cpu_usage"),
1899 expected_type: ValueType::Vector,
1900 should_error: false,
1901 },
1902 TestCase {
1903 name: "unless with same metric",
1904 promql: "cpu_usage unless cpu_usage",
1905 expected_metric: Some("cpu_usage"),
1906 expected_type: ValueType::Vector,
1907 should_error: false,
1908 },
1909 TestCase {
1911 name: "basic subquery",
1912 promql: "cpu_usage[5m:1m]",
1913 expected_metric: Some("cpu_usage"),
1914 expected_type: ValueType::Matrix,
1915 should_error: false,
1916 },
1917 TestCase {
1918 name: "subquery with multiple metrics",
1919 promql: "(cpu_usage + memory_usage)[5m:1m]",
1920 expected_metric: None,
1921 expected_type: ValueType::Matrix,
1922 should_error: false,
1923 },
1924 TestCase {
1926 name: "scalar value",
1927 promql: "42",
1928 expected_metric: None,
1929 expected_type: ValueType::Scalar,
1930 should_error: false,
1931 },
1932 TestCase {
1933 name: "string literal",
1934 promql: r#""hello world""#,
1935 expected_metric: None,
1936 expected_type: ValueType::String,
1937 should_error: false,
1938 },
1939 TestCase {
1941 name: "invalid syntax",
1942 promql: "cpu_usage{invalid=",
1943 expected_metric: None,
1944 expected_type: ValueType::Vector,
1945 should_error: true,
1946 },
1947 TestCase {
1948 name: "empty query",
1949 promql: "",
1950 expected_metric: None,
1951 expected_type: ValueType::Vector,
1952 should_error: true,
1953 },
1954 TestCase {
1955 name: "malformed brackets",
1956 promql: "cpu_usage[5m",
1957 expected_metric: None,
1958 expected_type: ValueType::Vector,
1959 should_error: true,
1960 },
1961 ];
1962
1963 for test_case in test_cases {
1964 let result = retrieve_metric_name_and_result_type(test_case.promql);
1965
1966 if test_case.should_error {
1967 assert!(
1968 result.is_err(),
1969 "Test '{}' should have failed but succeeded with: {:?}",
1970 test_case.name,
1971 result
1972 );
1973 } else {
1974 let (metric_name, value_type) = result.unwrap_or_else(|e| {
1975 panic!(
1976 "Test '{}' should have succeeded but failed with error: {}",
1977 test_case.name, e
1978 )
1979 });
1980
1981 let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1982 assert_eq!(
1983 metric_name, expected_metric_name,
1984 "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1985 test_case.name, expected_metric_name, metric_name
1986 );
1987
1988 assert_eq!(
1989 value_type, test_case.expected_type,
1990 "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1991 test_case.name, test_case.expected_type, value_type
1992 );
1993 }
1994 }
1995 }
1996}