1use std::borrow::Borrow;
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::hash::{Hash, Hasher};
20use std::sync::Arc;
21
22use arrow::array::{Array, AsArray};
23use arrow::datatypes::{
24 Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
25 Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
26 UInt8Type, UInt16Type, UInt32Type, UInt64Type,
27};
28use arrow_schema::{DataType, IntervalUnit};
29use axum::extract::{Path, Query, State};
30use axum::{Extension, Form};
31use catalog::CatalogManagerRef;
32use common_catalog::parse_catalog_and_schema_from_db_string;
33use common_decimal::Decimal128;
34use common_error::ext::ErrorExt;
35use common_error::status_code::StatusCode;
36use common_query::{Output, OutputData};
37use common_recordbatch::{RecordBatch, RecordBatches};
38use common_telemetry::{debug, tracing};
39use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
40use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
41use common_version::OwnedBuildInfo;
42use datafusion_common::ScalarValue;
43use datatypes::prelude::ConcreteDataType;
44use datatypes::schema::{ColumnSchema, SchemaRef};
45use datatypes::types::jsonb_to_string;
46use futures::StreamExt;
47use futures::future::join_all;
48use itertools::Itertools;
49use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
50use promql_parser::parser::token::{self};
51use promql_parser::parser::value::ValueType;
52use promql_parser::parser::{
53 AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
54 SubqueryExpr, UnaryExpr, VectorSelector,
55};
56use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
57use query::promql::planner::normalize_matcher;
58use serde::de::{self, MapAccess, Visitor};
59use serde::{Deserialize, Serialize};
60use serde_json::Value;
61use session::context::{QueryContext, QueryContextRef};
62use snafu::{Location, OptionExt, ResultExt};
63use store_api::metric_engine_consts::{
64 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
65};
66
67pub use super::result::prometheus_resp::PrometheusJsonResponse;
68use crate::error::{
69 CatalogSnafu, CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error,
70 InvalidQuerySnafu, NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu,
71 UnexpectedResultSnafu,
72};
73use crate::http::header::collect_plan_metrics;
74use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
75use crate::prometheus_handler::PrometheusHandlerRef;
76
77#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
79pub struct PromSeriesVector {
80 pub metric: BTreeMap<String, String>,
81 #[serde(skip_serializing_if = "Option::is_none")]
82 pub value: Option<(f64, String)>,
83}
84
85#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
87pub struct PromSeriesMatrix {
88 pub metric: BTreeMap<String, String>,
89 pub values: Vec<(f64, String)>,
90}
91
92#[derive(Debug, Serialize, Deserialize, PartialEq)]
94#[serde(untagged)]
95pub enum PromQueryResult {
96 Matrix(Vec<PromSeriesMatrix>),
97 Vector(Vec<PromSeriesVector>),
98 Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
99 String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
100}
101
102impl Default for PromQueryResult {
103 fn default() -> Self {
104 PromQueryResult::Matrix(Default::default())
105 }
106}
107
108#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
109pub struct PromData {
110 #[serde(rename = "resultType")]
111 pub result_type: String,
112 pub result: PromQueryResult,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
118pub struct Column(Arc<String>);
119
120impl From<&str> for Column {
121 fn from(s: &str) -> Self {
122 Self(Arc::new(s.to_string()))
123 }
124}
125
126#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
127#[serde(untagged)]
128pub enum PrometheusResponse {
129 PromData(PromData),
130 Labels(Vec<String>),
131 Series(Vec<HashMap<Column, String>>),
132 LabelValues(Vec<String>),
133 FormatQuery(String),
134 BuildInfo(OwnedBuildInfo),
135 #[serde(skip_deserializing)]
136 ParseResult(promql_parser::parser::Expr),
137 #[default]
138 None,
139}
140
141impl PrometheusResponse {
142 fn append(&mut self, other: PrometheusResponse) {
146 match (self, other) {
147 (
148 PrometheusResponse::PromData(PromData {
149 result: PromQueryResult::Matrix(lhs),
150 ..
151 }),
152 PrometheusResponse::PromData(PromData {
153 result: PromQueryResult::Matrix(rhs),
154 ..
155 }),
156 ) => {
157 lhs.extend(rhs);
158 }
159
160 (
161 PrometheusResponse::PromData(PromData {
162 result: PromQueryResult::Vector(lhs),
163 ..
164 }),
165 PrometheusResponse::PromData(PromData {
166 result: PromQueryResult::Vector(rhs),
167 ..
168 }),
169 ) => {
170 lhs.extend(rhs);
171 }
172 _ => {
173 }
175 }
176 }
177
178 pub fn is_none(&self) -> bool {
179 matches!(self, PrometheusResponse::None)
180 }
181}
182
183#[derive(Debug, Default, Serialize, Deserialize)]
184pub struct FormatQuery {
185 query: Option<String>,
186}
187
188#[axum_macros::debug_handler]
189#[tracing::instrument(
190 skip_all,
191 fields(protocol = "prometheus", request_type = "format_query")
192)]
193pub async fn format_query(
194 State(_handler): State<PrometheusHandlerRef>,
195 Query(params): Query<InstantQuery>,
196 Extension(_query_ctx): Extension<QueryContext>,
197 Form(form_params): Form<InstantQuery>,
198) -> PrometheusJsonResponse {
199 let query = params.query.or(form_params.query).unwrap_or_default();
200 match promql_parser::parser::parse(&query) {
201 Ok(expr) => {
202 let pretty = expr.prettify();
203 PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
204 }
205 Err(reason) => {
206 let err = InvalidQuerySnafu { reason }.build();
207 PrometheusJsonResponse::error(err.status_code(), err.output_msg())
208 }
209 }
210}
211
212#[derive(Debug, Default, Serialize, Deserialize)]
213pub struct BuildInfoQuery {}
214
215#[axum_macros::debug_handler]
216#[tracing::instrument(
217 skip_all,
218 fields(protocol = "prometheus", request_type = "build_info_query")
219)]
220pub async fn build_info_query() -> PrometheusJsonResponse {
221 let build_info = common_version::build_info().clone();
222 PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
223}
224
225#[derive(Debug, Default, Serialize, Deserialize)]
226pub struct InstantQuery {
227 query: Option<String>,
228 lookback: Option<String>,
229 time: Option<String>,
230 timeout: Option<String>,
231 db: Option<String>,
232}
233
234macro_rules! try_call_return_response {
237 ($handle: expr) => {
238 match $handle {
239 Ok(res) => res,
240 Err(err) => {
241 let msg = err.to_string();
242 return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
243 }
244 }
245 };
246}
247
248#[axum_macros::debug_handler]
249#[tracing::instrument(
250 skip_all,
251 fields(protocol = "prometheus", request_type = "instant_query")
252)]
253pub async fn instant_query(
254 State(handler): State<PrometheusHandlerRef>,
255 Query(params): Query<InstantQuery>,
256 Extension(mut query_ctx): Extension<QueryContext>,
257 Form(form_params): Form<InstantQuery>,
258) -> PrometheusJsonResponse {
259 let time = params
261 .time
262 .or(form_params.time)
263 .unwrap_or_else(current_time_rfc3339);
264 let prom_query = PromQuery {
265 query: params.query.or(form_params.query).unwrap_or_default(),
266 start: time.clone(),
267 end: time,
268 step: "1s".to_string(),
269 lookback: params
270 .lookback
271 .or(form_params.lookback)
272 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
273 alias: None,
274 };
275
276 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
277
278 if let Some(db) = ¶ms.db {
280 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
281 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
282 }
283 let query_ctx = Arc::new(query_ctx);
284
285 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
286 .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
287 .start_timer();
288
289 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
290 && !name_matchers.is_empty()
291 {
292 debug!("Find metric name matchers: {:?}", name_matchers);
293
294 let metric_names =
295 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
296
297 debug!("Find metric names: {:?}", metric_names);
298
299 if metric_names.is_empty() {
300 let result_type = promql_expr.value_type();
301
302 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
303 result_type: result_type.to_string(),
304 ..Default::default()
305 }));
306 }
307
308 let responses = join_all(metric_names.into_iter().map(|metric| {
309 let mut prom_query = prom_query.clone();
310 let mut promql_expr = promql_expr.clone();
311 let query_ctx = query_ctx.clone();
312 let handler = handler.clone();
313
314 async move {
315 update_metric_name_matcher(&mut promql_expr, &metric);
316 let new_query = promql_expr.to_string();
317 debug!(
318 "Updated promql, before: {}, after: {}",
319 &prom_query.query, new_query
320 );
321 prom_query.query = new_query;
322
323 do_instant_query(&handler, &prom_query, query_ctx).await
324 }
325 }))
326 .await;
327
328 responses
329 .into_iter()
330 .reduce(|mut acc, resp| {
331 acc.data.append(resp.data);
332 acc
333 })
334 .unwrap()
335 } else {
336 do_instant_query(&handler, &prom_query, query_ctx).await
337 }
338}
339
340async fn do_instant_query(
342 handler: &PrometheusHandlerRef,
343 prom_query: &PromQuery,
344 query_ctx: QueryContextRef,
345) -> PrometheusJsonResponse {
346 let result = handler.do_query(prom_query, query_ctx).await;
347 let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
348 Ok((metric_name, result_type)) => (metric_name, result_type),
349 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
350 };
351 PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
352}
353
354#[derive(Debug, Default, Serialize, Deserialize)]
355pub struct RangeQuery {
356 query: Option<String>,
357 start: Option<String>,
358 end: Option<String>,
359 step: Option<String>,
360 lookback: Option<String>,
361 timeout: Option<String>,
362 db: Option<String>,
363}
364
365#[axum_macros::debug_handler]
366#[tracing::instrument(
367 skip_all,
368 fields(protocol = "prometheus", request_type = "range_query")
369)]
370pub async fn range_query(
371 State(handler): State<PrometheusHandlerRef>,
372 Query(params): Query<RangeQuery>,
373 Extension(mut query_ctx): Extension<QueryContext>,
374 Form(form_params): Form<RangeQuery>,
375) -> PrometheusJsonResponse {
376 let prom_query = PromQuery {
377 query: params.query.or(form_params.query).unwrap_or_default(),
378 start: params.start.or(form_params.start).unwrap_or_default(),
379 end: params.end.or(form_params.end).unwrap_or_default(),
380 step: params.step.or(form_params.step).unwrap_or_default(),
381 lookback: params
382 .lookback
383 .or(form_params.lookback)
384 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
385 alias: None,
386 };
387
388 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
389
390 if let Some(db) = ¶ms.db {
392 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
393 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
394 }
395 let query_ctx = Arc::new(query_ctx);
396 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
397 .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
398 .start_timer();
399
400 if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
401 && !name_matchers.is_empty()
402 {
403 debug!("Find metric name matchers: {:?}", name_matchers);
404
405 let metric_names =
406 try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
407
408 debug!("Find metric names: {:?}", metric_names);
409
410 if metric_names.is_empty() {
411 return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
412 result_type: ValueType::Matrix.to_string(),
413 ..Default::default()
414 }));
415 }
416
417 let responses = join_all(metric_names.into_iter().map(|metric| {
418 let mut prom_query = prom_query.clone();
419 let mut promql_expr = promql_expr.clone();
420 let query_ctx = query_ctx.clone();
421 let handler = handler.clone();
422
423 async move {
424 update_metric_name_matcher(&mut promql_expr, &metric);
425 let new_query = promql_expr.to_string();
426 debug!(
427 "Updated promql, before: {}, after: {}",
428 &prom_query.query, new_query
429 );
430 prom_query.query = new_query;
431
432 do_range_query(&handler, &prom_query, query_ctx).await
433 }
434 }))
435 .await;
436
437 responses
439 .into_iter()
440 .reduce(|mut acc, resp| {
441 acc.data.append(resp.data);
442 acc
443 })
444 .unwrap()
445 } else {
446 do_range_query(&handler, &prom_query, query_ctx).await
447 }
448}
449
450async fn do_range_query(
452 handler: &PrometheusHandlerRef,
453 prom_query: &PromQuery,
454 query_ctx: QueryContextRef,
455) -> PrometheusJsonResponse {
456 let result = handler.do_query(prom_query, query_ctx).await;
457 let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
458 Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
459 Ok((metric_name, _)) => metric_name,
460 };
461 PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
462}
463
464#[derive(Debug, Default, Serialize)]
465struct Matches(Vec<String>);
466
467#[derive(Debug, Default, Serialize, Deserialize)]
468pub struct LabelsQuery {
469 start: Option<String>,
470 end: Option<String>,
471 lookback: Option<String>,
472 #[serde(flatten)]
473 matches: Matches,
474 db: Option<String>,
475}
476
477impl<'de> Deserialize<'de> for Matches {
479 fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
480 where
481 D: de::Deserializer<'de>,
482 {
483 struct MatchesVisitor;
484
485 impl<'d> Visitor<'d> for MatchesVisitor {
486 type Value = Vec<String>;
487
488 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
489 formatter.write_str("a string")
490 }
491
492 fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
493 where
494 M: MapAccess<'d>,
495 {
496 let mut matches = Vec::new();
497 while let Some((key, value)) = access.next_entry::<String, String>()? {
498 if key == "match[]" {
499 matches.push(value);
500 }
501 }
502 Ok(matches)
503 }
504 }
505 Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
506 }
507}
508
509macro_rules! handle_schema_err {
516 ($result:expr) => {
517 match $result {
518 Ok(v) => Some(v),
519 Err(err) => {
520 if err.status_code() == StatusCode::TableNotFound
521 || err.status_code() == StatusCode::TableColumnNotFound
522 {
523 None
525 } else {
526 return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
527 }
528 }
529 }
530 };
531}
532
533#[axum_macros::debug_handler]
534#[tracing::instrument(
535 skip_all,
536 fields(protocol = "prometheus", request_type = "labels_query")
537)]
538pub async fn labels_query(
539 State(handler): State<PrometheusHandlerRef>,
540 Query(params): Query<LabelsQuery>,
541 Extension(mut query_ctx): Extension<QueryContext>,
542 Form(form_params): Form<LabelsQuery>,
543) -> PrometheusJsonResponse {
544 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
545 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
546 let query_ctx = Arc::new(query_ctx);
547
548 let mut queries = params.matches.0;
549 if queries.is_empty() {
550 queries = form_params.matches.0;
551 }
552
553 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
554 .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
555 .start_timer();
556
557 let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
559 {
560 Ok(labels) => labels,
561 Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
562 };
563 let _ = labels.insert(METRIC_NAME.to_string());
565
566 if queries.is_empty() {
568 let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
569 labels_vec.sort_unstable();
570 return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
571 }
572
573 let start = params
575 .start
576 .or(form_params.start)
577 .unwrap_or_else(yesterday_rfc3339);
578 let end = params
579 .end
580 .or(form_params.end)
581 .unwrap_or_else(current_time_rfc3339);
582 let lookback = params
583 .lookback
584 .or(form_params.lookback)
585 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
586
587 let mut fetched_labels = HashSet::new();
588 let _ = fetched_labels.insert(METRIC_NAME.to_string());
589
590 let mut merge_map = HashMap::new();
591 for query in queries {
592 let prom_query = PromQuery {
593 query,
594 start: start.clone(),
595 end: end.clone(),
596 step: DEFAULT_LOOKBACK_STRING.to_string(),
597 lookback: lookback.clone(),
598 alias: None,
599 };
600
601 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
602 handle_schema_err!(
603 retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
604 .await
605 );
606 }
607
608 fetched_labels.retain(|l| labels.contains(l));
610 let _ = labels.insert(METRIC_NAME.to_string());
611
612 let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
613 sorted_labels.sort();
614 let merge_map = merge_map
615 .into_iter()
616 .map(|(k, v)| (k, Value::from(v)))
617 .collect();
618 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
619 resp.resp_metrics = merge_map;
620 resp
621}
622
623async fn get_all_column_names(
625 catalog: &str,
626 schema: &str,
627 manager: &CatalogManagerRef,
628) -> std::result::Result<HashSet<String>, catalog::error::Error> {
629 let table_names = manager.table_names(catalog, schema, None).await?;
630
631 let mut labels = HashSet::new();
632 for table_name in table_names {
633 let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
634 continue;
635 };
636 for column in table.primary_key_columns() {
637 if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
638 && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
639 {
640 labels.insert(column.name);
641 }
642 }
643 }
644
645 Ok(labels)
646}
647
648async fn retrieve_series_from_query_result(
649 result: Result<Output>,
650 series: &mut Vec<HashMap<Column, String>>,
651 query_ctx: &QueryContext,
652 table_name: &str,
653 manager: &CatalogManagerRef,
654 metrics: &mut HashMap<String, u64>,
655) -> Result<()> {
656 let result = result?;
657
658 let table = manager
660 .table(
661 query_ctx.current_catalog(),
662 &query_ctx.current_schema(),
663 table_name,
664 Some(query_ctx),
665 )
666 .await
667 .context(CatalogSnafu)?
668 .with_context(|| TableNotFoundSnafu {
669 catalog: query_ctx.current_catalog(),
670 schema: query_ctx.current_schema(),
671 table: table_name,
672 })?;
673 let tag_columns = table
674 .primary_key_columns()
675 .map(|c| c.name)
676 .collect::<HashSet<_>>();
677
678 match result.data {
679 OutputData::RecordBatches(batches) => {
680 record_batches_to_series(batches, series, table_name, &tag_columns)
681 }
682 OutputData::Stream(stream) => {
683 let batches = RecordBatches::try_collect(stream)
684 .await
685 .context(CollectRecordbatchSnafu)?;
686 record_batches_to_series(batches, series, table_name, &tag_columns)
687 }
688 OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
689 reason: "expected data result, but got affected rows".to_string(),
690 location: Location::default(),
691 }),
692 }?;
693
694 if let Some(ref plan) = result.meta.plan {
695 collect_plan_metrics(plan, &mut [metrics]);
696 }
697 Ok(())
698}
699
700async fn retrieve_labels_name_from_query_result(
702 result: Result<Output>,
703 labels: &mut HashSet<String>,
704 metrics: &mut HashMap<String, u64>,
705) -> Result<()> {
706 let result = result?;
707 match result.data {
708 OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
709 OutputData::Stream(stream) => {
710 let batches = RecordBatches::try_collect(stream)
711 .await
712 .context(CollectRecordbatchSnafu)?;
713 record_batches_to_labels_name(batches, labels)
714 }
715 OutputData::AffectedRows(_) => UnexpectedResultSnafu {
716 reason: "expected data result, but got affected rows".to_string(),
717 }
718 .fail(),
719 }?;
720 if let Some(ref plan) = result.meta.plan {
721 collect_plan_metrics(plan, &mut [metrics]);
722 }
723 Ok(())
724}
725
726fn record_batches_to_series(
727 batches: RecordBatches,
728 series: &mut Vec<HashMap<Column, String>>,
729 table_name: &str,
730 tag_columns: &HashSet<String>,
731) -> Result<()> {
732 for batch in batches.iter() {
733 let projection = batch
735 .schema
736 .column_schemas()
737 .iter()
738 .enumerate()
739 .filter_map(|(idx, col)| {
740 if tag_columns.contains(&col.name) {
741 Some(idx)
742 } else {
743 None
744 }
745 })
746 .collect::<Vec<_>>();
747 let batch = batch
748 .try_project(&projection)
749 .context(CollectRecordbatchSnafu)?;
750
751 let mut writer = RowWriter::new(&batch.schema, table_name);
752 writer.write(batch, series)?;
753 }
754 Ok(())
755}
756
757struct RowWriter {
764 template: HashMap<Column, Option<String>>,
767 current: Option<HashMap<Column, Option<String>>>,
769}
770
771impl RowWriter {
772 fn new(schema: &SchemaRef, table: &str) -> Self {
773 let mut template = schema
774 .column_schemas()
775 .iter()
776 .map(|x| (x.name.as_str().into(), None))
777 .collect::<HashMap<Column, Option<String>>>();
778 template.insert("__name__".into(), Some(table.to_string()));
779 Self {
780 template,
781 current: None,
782 }
783 }
784
785 fn insert(&mut self, column: ColumnRef, value: impl ToString) {
786 let current = self.current.get_or_insert_with(|| self.template.clone());
787 match current.get_mut(&column as &dyn AsColumnRef) {
788 Some(x) => {
789 let _ = x.insert(value.to_string());
790 }
791 None => {
792 let _ = current.insert(column.0.into(), Some(value.to_string()));
793 }
794 }
795 }
796
797 fn insert_bytes(&mut self, column_schema: &ColumnSchema, bytes: &[u8]) -> Result<()> {
798 let column_name = column_schema.name.as_str().into();
799
800 if column_schema.data_type.is_json() {
801 let s = jsonb_to_string(bytes).context(ConvertScalarValueSnafu)?;
802 self.insert(column_name, s);
803 } else {
804 let hex = bytes
805 .iter()
806 .map(|b| format!("{b:02x}"))
807 .collect::<Vec<String>>()
808 .join("");
809 self.insert(column_name, hex);
810 }
811 Ok(())
812 }
813
814 fn finish(&mut self) -> HashMap<Column, String> {
815 let Some(current) = self.current.take() else {
816 return HashMap::new();
817 };
818 current
819 .into_iter()
820 .filter_map(|(k, v)| v.map(|v| (k, v)))
821 .collect()
822 }
823
824 fn write(
825 &mut self,
826 record_batch: RecordBatch,
827 series: &mut Vec<HashMap<Column, String>>,
828 ) -> Result<()> {
829 let schema = record_batch.schema.clone();
830 let record_batch = record_batch.into_df_record_batch();
831 for i in 0..record_batch.num_rows() {
832 for (j, array) in record_batch.columns().iter().enumerate() {
833 let column = schema.column_name_by_index(j).into();
834
835 if array.is_null(i) {
836 self.insert(column, "Null");
837 continue;
838 }
839
840 match array.data_type() {
841 DataType::Null => {
842 self.insert(column, "Null");
843 }
844 DataType::Boolean => {
845 let array = array.as_boolean();
846 let v = array.value(i);
847 self.insert(column, v);
848 }
849 DataType::UInt8 => {
850 let array = array.as_primitive::<UInt8Type>();
851 let v = array.value(i);
852 self.insert(column, v);
853 }
854 DataType::UInt16 => {
855 let array = array.as_primitive::<UInt16Type>();
856 let v = array.value(i);
857 self.insert(column, v);
858 }
859 DataType::UInt32 => {
860 let array = array.as_primitive::<UInt32Type>();
861 let v = array.value(i);
862 self.insert(column, v);
863 }
864 DataType::UInt64 => {
865 let array = array.as_primitive::<UInt64Type>();
866 let v = array.value(i);
867 self.insert(column, v);
868 }
869 DataType::Int8 => {
870 let array = array.as_primitive::<Int8Type>();
871 let v = array.value(i);
872 self.insert(column, v);
873 }
874 DataType::Int16 => {
875 let array = array.as_primitive::<Int16Type>();
876 let v = array.value(i);
877 self.insert(column, v);
878 }
879 DataType::Int32 => {
880 let array = array.as_primitive::<Int32Type>();
881 let v = array.value(i);
882 self.insert(column, v);
883 }
884 DataType::Int64 => {
885 let array = array.as_primitive::<Int64Type>();
886 let v = array.value(i);
887 self.insert(column, v);
888 }
889 DataType::Float32 => {
890 let array = array.as_primitive::<Float32Type>();
891 let v = array.value(i);
892 self.insert(column, v);
893 }
894 DataType::Float64 => {
895 let array = array.as_primitive::<Float64Type>();
896 let v = array.value(i);
897 self.insert(column, v);
898 }
899 DataType::Utf8 => {
900 let array = array.as_string::<i32>();
901 let v = array.value(i);
902 self.insert(column, v);
903 }
904 DataType::LargeUtf8 => {
905 let array = array.as_string::<i64>();
906 let v = array.value(i);
907 self.insert(column, v);
908 }
909 DataType::Utf8View => {
910 let array = array.as_string_view();
911 let v = array.value(i);
912 self.insert(column, v);
913 }
914 DataType::Binary => {
915 let array = array.as_binary::<i32>();
916 let v = array.value(i);
917 let column_schema = &schema.column_schemas()[j];
918 self.insert_bytes(column_schema, v)?;
919 }
920 DataType::LargeBinary => {
921 let array = array.as_binary::<i64>();
922 let v = array.value(i);
923 let column_schema = &schema.column_schemas()[j];
924 self.insert_bytes(column_schema, v)?;
925 }
926 DataType::BinaryView => {
927 let array = array.as_binary_view();
928 let v = array.value(i);
929 let column_schema = &schema.column_schemas()[j];
930 self.insert_bytes(column_schema, v)?;
931 }
932 DataType::Date32 => {
933 let array = array.as_primitive::<Date32Type>();
934 let v = Date::new(array.value(i));
935 self.insert(column, v);
936 }
937 DataType::Date64 => {
938 let array = array.as_primitive::<Date64Type>();
939 let v = Date::new((array.value(i) / 86_400_000) as i32);
943 self.insert(column, v);
944 }
945 DataType::Timestamp(_, _) => {
946 let v = datatypes::arrow_array::timestamp_array_value(array, i);
947 self.insert(column, v.to_iso8601_string());
948 }
949 DataType::Time32(_) | DataType::Time64(_) => {
950 let v = datatypes::arrow_array::time_array_value(array, i);
951 self.insert(column, v.to_iso8601_string());
952 }
953 DataType::Interval(interval_unit) => match interval_unit {
954 IntervalUnit::YearMonth => {
955 let array = array.as_primitive::<IntervalYearMonthType>();
956 let v: IntervalYearMonth = array.value(i).into();
957 self.insert(column, v.to_iso8601_string());
958 }
959 IntervalUnit::DayTime => {
960 let array = array.as_primitive::<IntervalDayTimeType>();
961 let v: IntervalDayTime = array.value(i).into();
962 self.insert(column, v.to_iso8601_string());
963 }
964 IntervalUnit::MonthDayNano => {
965 let array = array.as_primitive::<IntervalMonthDayNanoType>();
966 let v: IntervalMonthDayNano = array.value(i).into();
967 self.insert(column, v.to_iso8601_string());
968 }
969 },
970 DataType::Duration(_) => {
971 let d = datatypes::arrow_array::duration_array_value(array, i);
972 self.insert(column, d);
973 }
974 DataType::List(_) => {
975 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
976 self.insert(column, v);
977 }
978 DataType::Struct(_) => {
979 let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
980 self.insert(column, v);
981 }
982 DataType::Decimal128(precision, scale) => {
983 let array = array.as_primitive::<Decimal128Type>();
984 let v = Decimal128::new(array.value(i), *precision, *scale);
985 self.insert(column, v);
986 }
987 _ => {
988 return NotSupportedSnafu {
989 feat: format!("convert {} to http value", array.data_type()),
990 }
991 .fail();
992 }
993 }
994 }
995
996 series.push(self.finish())
997 }
998 Ok(())
999 }
1000}
1001
1002#[derive(Debug, Clone, Copy, PartialEq)]
1003struct ColumnRef<'a>(&'a str);
1004
1005impl<'a> From<&'a str> for ColumnRef<'a> {
1006 fn from(s: &'a str) -> Self {
1007 Self(s)
1008 }
1009}
1010
1011trait AsColumnRef {
1012 fn as_ref(&self) -> ColumnRef<'_>;
1013}
1014
1015impl AsColumnRef for Column {
1016 fn as_ref(&self) -> ColumnRef<'_> {
1017 self.0.as_str().into()
1018 }
1019}
1020
1021impl AsColumnRef for ColumnRef<'_> {
1022 fn as_ref(&self) -> ColumnRef<'_> {
1023 *self
1024 }
1025}
1026
1027impl<'a> PartialEq for dyn AsColumnRef + 'a {
1028 fn eq(&self, other: &Self) -> bool {
1029 self.as_ref() == other.as_ref()
1030 }
1031}
1032
1033impl<'a> Eq for dyn AsColumnRef + 'a {}
1034
1035impl<'a> Hash for dyn AsColumnRef + 'a {
1036 fn hash<H: Hasher>(&self, state: &mut H) {
1037 self.as_ref().0.hash(state);
1038 }
1039}
1040
1041impl<'a> Borrow<dyn AsColumnRef + 'a> for Column {
1042 fn borrow(&self) -> &(dyn AsColumnRef + 'a) {
1043 self
1044 }
1045}
1046
1047fn record_batches_to_labels_name(
1049 batches: RecordBatches,
1050 labels: &mut HashSet<String>,
1051) -> Result<()> {
1052 let mut column_indices = Vec::new();
1053 let mut field_column_indices = Vec::new();
1054 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
1055 if let ConcreteDataType::Float64(_) = column.data_type {
1056 field_column_indices.push(i);
1057 }
1058 column_indices.push(i);
1059 }
1060
1061 if field_column_indices.is_empty() {
1062 return Err(Error::Internal {
1063 err_msg: "no value column found".to_string(),
1064 });
1065 }
1066
1067 for batch in batches.iter() {
1068 let names = column_indices
1069 .iter()
1070 .map(|c| batches.schema().column_name_by_index(*c).to_string())
1071 .collect::<Vec<_>>();
1072
1073 let field_columns = field_column_indices
1074 .iter()
1075 .map(|i| {
1076 let column = batch.column(*i);
1077 column.as_primitive::<Float64Type>()
1078 })
1079 .collect::<Vec<_>>();
1080
1081 for row_index in 0..batch.num_rows() {
1082 if field_columns.iter().all(|c| c.is_null(row_index)) {
1084 continue;
1085 }
1086
1087 names.iter().for_each(|name| {
1089 let _ = labels.insert(name.clone());
1090 });
1091 return Ok(());
1092 }
1093 }
1094 Ok(())
1095}
1096
1097pub(crate) fn retrieve_metric_name_and_result_type(
1098 promql: &str,
1099) -> Result<(Option<String>, ValueType)> {
1100 let promql_expr = promql_parser::parser::parse(promql)
1101 .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
1102 let metric_name = promql_expr_to_metric_name(&promql_expr);
1103 let result_type = promql_expr.value_type();
1104
1105 Ok((metric_name, result_type))
1106}
1107
1108pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
1111 if let Some(db) = db {
1112 parse_catalog_and_schema_from_db_string(db)
1113 } else {
1114 (
1115 ctx.current_catalog().to_string(),
1116 ctx.current_schema().clone(),
1117 )
1118 }
1119}
1120
1121pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
1123 if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
1124 ctx.set_current_catalog(catalog);
1125 ctx.set_current_schema(schema);
1126 }
1127}
1128
1129fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
1130 let mut metric_names = HashSet::new();
1131 collect_metric_names(expr, &mut metric_names);
1132
1133 if metric_names.len() == 1 {
1135 metric_names.into_iter().next()
1136 } else {
1137 None
1138 }
1139}
1140
1141fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
1143 match expr {
1144 PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
1145 match modifier {
1146 Some(LabelModifier::Include(labels)) => {
1147 if !labels.labels.contains(&METRIC_NAME.to_string()) {
1148 metric_names.clear();
1149 return;
1150 }
1151 }
1152 Some(LabelModifier::Exclude(labels)) => {
1153 if labels.labels.contains(&METRIC_NAME.to_string()) {
1154 metric_names.clear();
1155 return;
1156 }
1157 }
1158 _ => {}
1159 }
1160 collect_metric_names(expr, metric_names)
1161 }
1162 PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
1163 PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
1164 if matches!(
1165 op.id(),
1166 token::T_LAND | token::T_LOR | token::T_LUNLESS ) {
1170 collect_metric_names(lhs, metric_names)
1171 } else {
1172 metric_names.clear()
1173 }
1174 }
1175 PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
1176 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
1177 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1178 if let Some(name) = name {
1179 metric_names.insert(name.clone());
1180 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1181 metric_names.insert(matcher.value);
1182 }
1183 }
1184 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1185 let VectorSelector { name, matchers, .. } = vs;
1186 if let Some(name) = name {
1187 metric_names.insert(name.clone());
1188 } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1189 metric_names.insert(matcher.value);
1190 }
1191 }
1192 PromqlExpr::Call(Call { args, .. }) => {
1193 args.args
1194 .iter()
1195 .for_each(|e| collect_metric_names(e, metric_names));
1196 }
1197 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1198 }
1199}
1200
1201fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
1202where
1203 F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
1204{
1205 match expr {
1206 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1207 PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
1208 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1209 find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
1210 }
1211 PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
1212 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1213 PromqlExpr::NumberLiteral(_) => None,
1214 PromqlExpr::StringLiteral(_) => None,
1215 PromqlExpr::Extension(_) => None,
1216 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
1217 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1218 let VectorSelector { name, matchers, .. } = vs;
1219
1220 f(name, matchers)
1221 }
1222 PromqlExpr::Call(Call { args, .. }) => args
1223 .args
1224 .iter()
1225 .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
1226 }
1227}
1228
1229fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
1231 find_metric_name_and_matchers(expr, |name, matchers| {
1232 if name.is_some() {
1234 return None;
1235 }
1236
1237 Some(matchers.find_matchers(METRIC_NAME))
1239 })
1240 .map(|matchers| {
1241 matchers
1242 .into_iter()
1243 .filter(|m| !matches!(m.op, MatchOp::Equal))
1244 .map(normalize_matcher)
1245 .collect::<Vec<_>>()
1246 })
1247}
1248
1249fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
1252 match expr {
1253 PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
1254 update_metric_name_matcher(expr, metric_name)
1255 }
1256 PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1257 PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1258 update_metric_name_matcher(lhs, metric_name);
1259 update_metric_name_matcher(rhs, metric_name);
1260 }
1261 PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1262 PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
1263 update_metric_name_matcher(expr, metric_name)
1264 }
1265 PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1266 if name.is_some() {
1267 return;
1268 }
1269
1270 for m in &mut matchers.matchers {
1271 if m.name == METRIC_NAME {
1272 m.op = MatchOp::Equal;
1273 m.value = metric_name.to_string();
1274 }
1275 }
1276 }
1277 PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1278 let VectorSelector { name, matchers, .. } = vs;
1279 if name.is_some() {
1280 return;
1281 }
1282
1283 for m in &mut matchers.matchers {
1284 if m.name == METRIC_NAME {
1285 m.op = MatchOp::Equal;
1286 m.value = metric_name.to_string();
1287 }
1288 }
1289 }
1290 PromqlExpr::Call(Call { args, .. }) => {
1291 args.args.iter_mut().for_each(|e| {
1292 update_metric_name_matcher(e, metric_name);
1293 });
1294 }
1295 PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1296 }
1297}
1298
1299#[derive(Debug, Default, Serialize, Deserialize)]
1300pub struct LabelValueQuery {
1301 start: Option<String>,
1302 end: Option<String>,
1303 lookback: Option<String>,
1304 #[serde(flatten)]
1305 matches: Matches,
1306 db: Option<String>,
1307 limit: Option<usize>,
1308}
1309
1310#[axum_macros::debug_handler]
1311#[tracing::instrument(
1312 skip_all,
1313 fields(protocol = "prometheus", request_type = "label_values_query")
1314)]
1315pub async fn label_values_query(
1316 State(handler): State<PrometheusHandlerRef>,
1317 Path(label_name): Path<String>,
1318 Extension(mut query_ctx): Extension<QueryContext>,
1319 Query(params): Query<LabelValueQuery>,
1320) -> PrometheusJsonResponse {
1321 let (catalog, schema) = get_catalog_schema(¶ms.db, &query_ctx);
1322 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1323 let query_ctx = Arc::new(query_ctx);
1324
1325 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1326 .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1327 .start_timer();
1328
1329 if label_name == METRIC_NAME_LABEL {
1330 let catalog_manager = handler.catalog_manager();
1331
1332 let mut table_names = try_call_return_response!(
1333 retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1334 );
1335
1336 truncate_results(&mut table_names, params.limit);
1337 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1338 } else if label_name == FIELD_NAME_LABEL {
1339 let field_columns = handle_schema_err!(
1340 retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1341 )
1342 .unwrap_or_default();
1343 let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1344 field_columns.sort_unstable();
1345 truncate_results(&mut field_columns, params.limit);
1346 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1347 } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
1348 let catalog_manager = handler.catalog_manager();
1349
1350 let mut schema_names = try_call_return_response!(
1351 retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1352 );
1353 truncate_results(&mut schema_names, params.limit);
1354 return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1355 }
1356
1357 let queries = params.matches.0;
1358 if queries.is_empty() {
1359 return PrometheusJsonResponse::error(
1360 StatusCode::InvalidArguments,
1361 "match[] parameter is required",
1362 );
1363 }
1364
1365 let start = params.start.unwrap_or_else(yesterday_rfc3339);
1366 let end = params.end.unwrap_or_else(current_time_rfc3339);
1367 let mut label_values = HashSet::new();
1368
1369 let start = try_call_return_response!(
1370 QueryLanguageParser::parse_promql_timestamp(&start)
1371 .context(ParseTimestampSnafu { timestamp: &start })
1372 );
1373 let end = try_call_return_response!(
1374 QueryLanguageParser::parse_promql_timestamp(&end)
1375 .context(ParseTimestampSnafu { timestamp: &end })
1376 );
1377
1378 for query in queries {
1379 let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1380 let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1381 return PrometheusJsonResponse::error(
1382 StatusCode::InvalidArguments,
1383 "expected vector selector",
1384 );
1385 };
1386 let Some(name) = take_metric_name(&mut vector_selector) else {
1387 return PrometheusJsonResponse::error(
1388 StatusCode::InvalidArguments,
1389 "expected metric name",
1390 );
1391 };
1392 let VectorSelector { matchers, .. } = vector_selector;
1393 let matchers = matchers.matchers;
1395 let result = handler
1396 .query_label_values(name, label_name.clone(), matchers, start, end, &query_ctx)
1397 .await;
1398 if let Some(result) = handle_schema_err!(result) {
1399 label_values.extend(result.into_iter());
1400 }
1401 }
1402
1403 let mut label_values: Vec<_> = label_values.into_iter().collect();
1404 label_values.sort_unstable();
1405 truncate_results(&mut label_values, params.limit);
1406
1407 PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1408}
1409
1410fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1411 if let Some(limit) = limit
1412 && limit > 0
1413 && label_values.len() >= limit
1414 {
1415 label_values.truncate(limit);
1416 }
1417}
1418
1419fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1422 if let Some(name) = selector.name.take() {
1423 return Some(name);
1424 }
1425
1426 let (pos, matcher) = selector
1427 .matchers
1428 .matchers
1429 .iter()
1430 .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1431 let name = matcher.value.clone();
1432 selector.matchers.matchers.remove(pos);
1434
1435 Some(name)
1436}
1437
1438async fn retrieve_table_names(
1439 query_ctx: &QueryContext,
1440 catalog_manager: CatalogManagerRef,
1441 matches: Vec<String>,
1442) -> Result<Vec<String>> {
1443 let catalog = query_ctx.current_catalog();
1444 let schema = query_ctx.current_schema();
1445
1446 let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1447 let mut table_names = Vec::new();
1448
1449 let name_matcher = matches
1451 .first()
1452 .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1453 .and_then(|expr| {
1454 if let PromqlExpr::VectorSelector(vector_selector) = expr {
1455 let matchers = vector_selector.matchers.matchers;
1456 for matcher in matchers {
1457 if matcher.name == METRIC_NAME_LABEL {
1458 return Some(matcher);
1459 }
1460 }
1461
1462 None
1463 } else {
1464 None
1465 }
1466 });
1467
1468 while let Some(table) = tables_stream.next().await {
1469 let table = table.context(CatalogSnafu)?;
1470 if !table
1471 .table_info()
1472 .meta
1473 .options
1474 .extra_options
1475 .contains_key(LOGICAL_TABLE_METADATA_KEY)
1476 {
1477 continue;
1479 }
1480
1481 let table_name = &table.table_info().name;
1482
1483 if let Some(matcher) = &name_matcher {
1484 match &matcher.op {
1485 MatchOp::Equal => {
1486 if table_name == &matcher.value {
1487 table_names.push(table_name.clone());
1488 }
1489 }
1490 MatchOp::Re(reg) => {
1491 if reg.is_match(table_name) {
1492 table_names.push(table_name.clone());
1493 }
1494 }
1495 _ => {
1496 table_names.push(table_name.clone());
1499 }
1500 }
1501 } else {
1502 table_names.push(table_name.clone());
1503 }
1504 }
1505
1506 table_names.sort_unstable();
1507 Ok(table_names)
1508}
1509
1510async fn retrieve_field_names(
1511 query_ctx: &QueryContext,
1512 manager: CatalogManagerRef,
1513 matches: Vec<String>,
1514) -> Result<HashSet<String>> {
1515 let mut field_columns = HashSet::new();
1516 let catalog = query_ctx.current_catalog();
1517 let schema = query_ctx.current_schema();
1518
1519 if matches.is_empty() {
1520 while let Some(table) = manager
1522 .tables(catalog, &schema, Some(query_ctx))
1523 .next()
1524 .await
1525 {
1526 let table = table.context(CatalogSnafu)?;
1527 for column in table.field_columns() {
1528 field_columns.insert(column.name);
1529 }
1530 }
1531 return Ok(field_columns);
1532 }
1533
1534 for table_name in matches {
1535 let table = manager
1536 .table(catalog, &schema, &table_name, Some(query_ctx))
1537 .await
1538 .context(CatalogSnafu)?
1539 .with_context(|| TableNotFoundSnafu {
1540 catalog: catalog.to_string(),
1541 schema: schema.clone(),
1542 table: table_name.clone(),
1543 })?;
1544
1545 for column in table.field_columns() {
1546 field_columns.insert(column.name);
1547 }
1548 }
1549 Ok(field_columns)
1550}
1551
1552async fn retrieve_schema_names(
1553 query_ctx: &QueryContext,
1554 catalog_manager: CatalogManagerRef,
1555 matches: Vec<String>,
1556) -> Result<Vec<String>> {
1557 let mut schemas = Vec::new();
1558 let catalog = query_ctx.current_catalog();
1559
1560 let candidate_schemas = catalog_manager
1561 .schema_names(catalog, Some(query_ctx))
1562 .await
1563 .context(CatalogSnafu)?;
1564
1565 for schema in candidate_schemas {
1566 let mut found = true;
1567 for match_item in &matches {
1568 if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1569 let exists = catalog_manager
1570 .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1571 .await
1572 .context(CatalogSnafu)?;
1573 if !exists {
1574 found = false;
1575 break;
1576 }
1577 }
1578 }
1579
1580 if found {
1581 schemas.push(schema);
1582 }
1583 }
1584
1585 schemas.sort_unstable();
1586
1587 Ok(schemas)
1588}
1589
1590fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1595 let promql_expr = promql_parser::parser::parse(query).ok()?;
1596 promql_expr_to_metric_name(&promql_expr)
1597}
1598
1599#[derive(Debug, Default, Serialize, Deserialize)]
1600pub struct SeriesQuery {
1601 start: Option<String>,
1602 end: Option<String>,
1603 lookback: Option<String>,
1604 #[serde(flatten)]
1605 matches: Matches,
1606 db: Option<String>,
1607}
1608
1609#[axum_macros::debug_handler]
1610#[tracing::instrument(
1611 skip_all,
1612 fields(protocol = "prometheus", request_type = "series_query")
1613)]
1614pub async fn series_query(
1615 State(handler): State<PrometheusHandlerRef>,
1616 Query(params): Query<SeriesQuery>,
1617 Extension(mut query_ctx): Extension<QueryContext>,
1618 Form(form_params): Form<SeriesQuery>,
1619) -> PrometheusJsonResponse {
1620 let mut queries: Vec<String> = params.matches.0;
1621 if queries.is_empty() {
1622 queries = form_params.matches.0;
1623 }
1624 if queries.is_empty() {
1625 return PrometheusJsonResponse::error(
1626 StatusCode::Unsupported,
1627 "match[] parameter is required",
1628 );
1629 }
1630 let start = params
1631 .start
1632 .or(form_params.start)
1633 .unwrap_or_else(yesterday_rfc3339);
1634 let end = params
1635 .end
1636 .or(form_params.end)
1637 .unwrap_or_else(current_time_rfc3339);
1638 let lookback = params
1639 .lookback
1640 .or(form_params.lookback)
1641 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1642
1643 if let Some(db) = ¶ms.db {
1645 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1646 try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1647 }
1648 let query_ctx = Arc::new(query_ctx);
1649
1650 let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1651 .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1652 .start_timer();
1653
1654 let mut series = Vec::new();
1655 let mut merge_map = HashMap::new();
1656 for query in queries {
1657 let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1658 let prom_query = PromQuery {
1659 query,
1660 start: start.clone(),
1661 end: end.clone(),
1662 step: DEFAULT_LOOKBACK_STRING.to_string(),
1664 lookback: lookback.clone(),
1665 alias: None,
1666 };
1667 let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1668
1669 handle_schema_err!(
1670 retrieve_series_from_query_result(
1671 result,
1672 &mut series,
1673 &query_ctx,
1674 &table_name,
1675 &handler.catalog_manager(),
1676 &mut merge_map,
1677 )
1678 .await
1679 );
1680 }
1681 let merge_map = merge_map
1682 .into_iter()
1683 .map(|(k, v)| (k, Value::from(v)))
1684 .collect();
1685 let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1686 resp.resp_metrics = merge_map;
1687 resp
1688}
1689
1690#[derive(Debug, Default, Serialize, Deserialize)]
1691pub struct ParseQuery {
1692 query: Option<String>,
1693 db: Option<String>,
1694}
1695
1696#[axum_macros::debug_handler]
1697#[tracing::instrument(
1698 skip_all,
1699 fields(protocol = "prometheus", request_type = "parse_query")
1700)]
1701pub async fn parse_query(
1702 State(_handler): State<PrometheusHandlerRef>,
1703 Query(params): Query<ParseQuery>,
1704 Extension(_query_ctx): Extension<QueryContext>,
1705 Form(form_params): Form<ParseQuery>,
1706) -> PrometheusJsonResponse {
1707 if let Some(query) = params.query.or(form_params.query) {
1708 let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1709 PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1710 } else {
1711 PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1712 }
1713}
1714
1715#[cfg(test)]
1716mod tests {
1717 use promql_parser::parser::value::ValueType;
1718
1719 use super::*;
1720
1721 struct TestCase {
1722 name: &'static str,
1723 promql: &'static str,
1724 expected_metric: Option<&'static str>,
1725 expected_type: ValueType,
1726 should_error: bool,
1727 }
1728
1729 #[test]
1730 fn test_retrieve_metric_name_and_result_type() {
1731 let test_cases = &[
1732 TestCase {
1734 name: "simple metric",
1735 promql: "cpu_usage",
1736 expected_metric: Some("cpu_usage"),
1737 expected_type: ValueType::Vector,
1738 should_error: false,
1739 },
1740 TestCase {
1741 name: "metric with selector",
1742 promql: r#"cpu_usage{instance="localhost"}"#,
1743 expected_metric: Some("cpu_usage"),
1744 expected_type: ValueType::Vector,
1745 should_error: false,
1746 },
1747 TestCase {
1748 name: "metric with range selector",
1749 promql: "cpu_usage[5m]",
1750 expected_metric: Some("cpu_usage"),
1751 expected_type: ValueType::Matrix,
1752 should_error: false,
1753 },
1754 TestCase {
1755 name: "metric with __name__ matcher",
1756 promql: r#"{__name__="cpu_usage"}"#,
1757 expected_metric: Some("cpu_usage"),
1758 expected_type: ValueType::Vector,
1759 should_error: false,
1760 },
1761 TestCase {
1762 name: "metric with unary operator",
1763 promql: "-cpu_usage",
1764 expected_metric: None,
1765 expected_type: ValueType::Vector,
1766 should_error: false,
1767 },
1768 TestCase {
1770 name: "metric with aggregation",
1771 promql: "sum(cpu_usage)",
1772 expected_metric: Some("cpu_usage"),
1773 expected_type: ValueType::Vector,
1774 should_error: false,
1775 },
1776 TestCase {
1777 name: "complex aggregation",
1778 promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1779 expected_metric: None,
1780 expected_type: ValueType::Vector,
1781 should_error: false,
1782 },
1783 TestCase {
1784 name: "complex aggregation",
1785 promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1786 expected_metric: Some("cpu_usage"),
1787 expected_type: ValueType::Vector,
1788 should_error: false,
1789 },
1790 TestCase {
1791 name: "complex aggregation",
1792 promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1793 expected_metric: Some("cpu_usage"),
1794 expected_type: ValueType::Vector,
1795 should_error: false,
1796 },
1797 TestCase {
1799 name: "same metric addition",
1800 promql: "cpu_usage + cpu_usage",
1801 expected_metric: None,
1802 expected_type: ValueType::Vector,
1803 should_error: false,
1804 },
1805 TestCase {
1806 name: "metric with scalar addition",
1807 promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1808 expected_metric: None,
1809 expected_type: ValueType::Vector,
1810 should_error: false,
1811 },
1812 TestCase {
1814 name: "different metrics addition",
1815 promql: "cpu_usage + memory_usage",
1816 expected_metric: None,
1817 expected_type: ValueType::Vector,
1818 should_error: false,
1819 },
1820 TestCase {
1821 name: "different metrics subtraction",
1822 promql: "network_in - network_out",
1823 expected_metric: None,
1824 expected_type: ValueType::Vector,
1825 should_error: false,
1826 },
1827 TestCase {
1829 name: "unless with different metrics",
1830 promql: "cpu_usage unless memory_usage",
1831 expected_metric: Some("cpu_usage"),
1832 expected_type: ValueType::Vector,
1833 should_error: false,
1834 },
1835 TestCase {
1836 name: "unless with same metric",
1837 promql: "cpu_usage unless cpu_usage",
1838 expected_metric: Some("cpu_usage"),
1839 expected_type: ValueType::Vector,
1840 should_error: false,
1841 },
1842 TestCase {
1844 name: "basic subquery",
1845 promql: "cpu_usage[5m:1m]",
1846 expected_metric: Some("cpu_usage"),
1847 expected_type: ValueType::Matrix,
1848 should_error: false,
1849 },
1850 TestCase {
1851 name: "subquery with multiple metrics",
1852 promql: "(cpu_usage + memory_usage)[5m:1m]",
1853 expected_metric: None,
1854 expected_type: ValueType::Matrix,
1855 should_error: false,
1856 },
1857 TestCase {
1859 name: "scalar value",
1860 promql: "42",
1861 expected_metric: None,
1862 expected_type: ValueType::Scalar,
1863 should_error: false,
1864 },
1865 TestCase {
1866 name: "string literal",
1867 promql: r#""hello world""#,
1868 expected_metric: None,
1869 expected_type: ValueType::String,
1870 should_error: false,
1871 },
1872 TestCase {
1874 name: "invalid syntax",
1875 promql: "cpu_usage{invalid=",
1876 expected_metric: None,
1877 expected_type: ValueType::Vector,
1878 should_error: true,
1879 },
1880 TestCase {
1881 name: "empty query",
1882 promql: "",
1883 expected_metric: None,
1884 expected_type: ValueType::Vector,
1885 should_error: true,
1886 },
1887 TestCase {
1888 name: "malformed brackets",
1889 promql: "cpu_usage[5m",
1890 expected_metric: None,
1891 expected_type: ValueType::Vector,
1892 should_error: true,
1893 },
1894 ];
1895
1896 for test_case in test_cases {
1897 let result = retrieve_metric_name_and_result_type(test_case.promql);
1898
1899 if test_case.should_error {
1900 assert!(
1901 result.is_err(),
1902 "Test '{}' should have failed but succeeded with: {:?}",
1903 test_case.name,
1904 result
1905 );
1906 } else {
1907 let (metric_name, value_type) = result.unwrap_or_else(|e| {
1908 panic!(
1909 "Test '{}' should have succeeded but failed with error: {}",
1910 test_case.name, e
1911 )
1912 });
1913
1914 let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1915 assert_eq!(
1916 metric_name, expected_metric_name,
1917 "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1918 test_case.name, expected_metric_name, metric_name
1919 );
1920
1921 assert_eq!(
1922 value_type, test_case.expected_type,
1923 "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1924 test_case.name, test_case.expected_type, value_type
1925 );
1926 }
1927 }
1928 }
1929}