frontend/instance/
promql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::SystemTime;
16
17use catalog::information_schema::TABLES;
18use client::OutputData;
19use common_catalog::consts::INFORMATION_SCHEMA_NAME;
20use common_catalog::format_full_table_name;
21use common_recordbatch::util;
22use common_telemetry::tracing;
23use datatypes::prelude::Value;
24use promql_parser::label::{Matcher, Matchers};
25use query::promql;
26use query::promql::planner::PromPlanner;
27use servers::prometheus;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30
31use crate::error::{
32    CatalogSnafu, CollectRecordbatchSnafu, ExecLogicalPlanSnafu,
33    PrometheusLabelValuesQueryPlanSnafu, PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu,
34    Result, TableNotFoundSnafu,
35};
36use crate::instance::Instance;
37
38impl Instance {
39    /// Handles metric names query request, returns the names.
40    #[tracing::instrument(skip_all)]
41    pub(crate) async fn handle_query_metric_names(
42        &self,
43        matchers: Vec<Matcher>,
44        ctx: &QueryContextRef,
45    ) -> Result<Vec<String>> {
46        let _timer = crate::metrics::PROMQL_QUERY_METRICS_ELAPSED
47            .with_label_values(&[ctx.get_db_string().as_str()])
48            .start_timer();
49
50        let table = self
51            .catalog_manager
52            .table(
53                ctx.current_catalog(),
54                INFORMATION_SCHEMA_NAME,
55                TABLES,
56                Some(ctx),
57            )
58            .await
59            .context(CatalogSnafu)?
60            .with_context(|| TableNotFoundSnafu {
61                table_name: "greptime.information_schema.tables",
62            })?;
63
64        let dataframe = self
65            .query_engine
66            .read_table(table)
67            .with_context(|_| ReadTableSnafu {
68                table_name: "greptime.information_schema.tables",
69            })?;
70
71        let logical_plan = prometheus::metric_name_matchers_to_plan(dataframe, matchers, ctx)
72            .context(PrometheusMetricNamesQueryPlanSnafu)?;
73
74        let results = self
75            .query_engine
76            .execute(logical_plan, ctx.clone())
77            .await
78            .context(ExecLogicalPlanSnafu)?;
79
80        let batches = match results.data {
81            OutputData::Stream(stream) => util::collect(stream)
82                .await
83                .context(CollectRecordbatchSnafu)?,
84            OutputData::RecordBatches(rbs) => rbs.take(),
85            _ => unreachable!("should not happen"),
86        };
87
88        let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
89
90        for batch in batches {
91            // Only one column the results, ensured by `prometheus::metric_name_matchers_to_plan`.
92            let names = batch.column(0);
93
94            for i in 0..names.len() {
95                let Value::String(name) = names.get(i) else {
96                    unreachable!();
97                };
98
99                results.push(name.into_string());
100            }
101        }
102
103        Ok(results)
104    }
105
106    /// Handles label values query request, returns the values.
107    #[tracing::instrument(skip_all)]
108    pub(crate) async fn handle_query_label_values(
109        &self,
110        metric: String,
111        label_name: String,
112        matchers: Vec<Matcher>,
113        start: SystemTime,
114        end: SystemTime,
115        ctx: &QueryContextRef,
116    ) -> Result<Vec<String>> {
117        let table_schema = ctx.current_schema();
118        let table = self
119            .catalog_manager
120            .table(ctx.current_catalog(), &table_schema, &metric, Some(ctx))
121            .await
122            .context(CatalogSnafu)?
123            .with_context(|| TableNotFoundSnafu {
124                table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
125            })?;
126
127        let dataframe = self
128            .query_engine
129            .read_table(table.clone())
130            .with_context(|_| ReadTableSnafu {
131                table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
132            })?;
133
134        let scan_plan = dataframe.into_logical_plan();
135        let filter_conditions =
136            PromPlanner::matchers_to_expr(Matchers::new(matchers), scan_plan.schema())
137                .context(PrometheusLabelValuesQueryPlanSnafu)?;
138        let logical_plan = promql::label_values::rewrite_label_values_query(
139            table,
140            scan_plan,
141            filter_conditions,
142            label_name,
143            start,
144            end,
145        )
146        .context(PrometheusLabelValuesQueryPlanSnafu)?;
147
148        let results = self
149            .query_engine
150            .execute(logical_plan, ctx.clone())
151            .await
152            .context(ExecLogicalPlanSnafu)?;
153
154        let batches = match results.data {
155            OutputData::Stream(stream) => util::collect(stream)
156                .await
157                .context(CollectRecordbatchSnafu)?,
158            OutputData::RecordBatches(rbs) => rbs.take(),
159            _ => unreachable!("should not happen"),
160        };
161
162        let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
163        for batch in batches {
164            // Only one column in results, ensured by `prometheus::label_values_matchers_to_plan`.
165            let names = batch.column(0);
166
167            for i in 0..names.len() {
168                results.push(names.get(i).to_string());
169            }
170        }
171
172        Ok(results)
173    }
174}