Skip to main content

frontend/instance/
promql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::SystemTime;
16
17use catalog::information_schema::TABLES;
18use client::OutputData;
19use common_catalog::consts::INFORMATION_SCHEMA_NAME;
20use common_catalog::format_full_table_name;
21use common_recordbatch::util;
22use common_telemetry::tracing;
23use promql_parser::label::{MatchOp, Matcher, Matchers};
24use query::promql;
25use query::promql::planner::PromPlanner;
26use servers::prom_store::is_database_selection_label;
27use servers::prometheus;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30
31use crate::error::{
32    CatalogSnafu, CollectRecordbatchSnafu, ExecLogicalPlanSnafu,
33    PrometheusLabelValuesQueryPlanSnafu, PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu,
34    Result, TableNotFoundSnafu, TableSnafu,
35};
36use crate::instance::Instance;
37
38impl Instance {
39    /// Handles metric names query request, returns the names.
40    #[tracing::instrument(skip_all)]
41    pub(crate) async fn handle_query_metric_names(
42        &self,
43        matchers: Vec<Matcher>,
44        ctx: &QueryContextRef,
45    ) -> Result<Vec<String>> {
46        let _timer = crate::metrics::PROMQL_QUERY_METRICS_ELAPSED
47            .with_label_values(&[ctx.get_db_string().as_str()])
48            .start_timer();
49
50        let table = self
51            .catalog_manager
52            .table(
53                ctx.current_catalog(),
54                INFORMATION_SCHEMA_NAME,
55                TABLES,
56                Some(ctx),
57            )
58            .await
59            .context(CatalogSnafu)?
60            .with_context(|| TableNotFoundSnafu {
61                table_name: "greptime.information_schema.tables",
62            })?;
63
64        let dataframe = self
65            .query_engine
66            .read_table(table)
67            .with_context(|_| ReadTableSnafu {
68                table_name: "greptime.information_schema.tables",
69            })?;
70
71        let logical_plan = prometheus::metric_name_matchers_to_plan(dataframe, matchers, ctx)
72            .context(PrometheusMetricNamesQueryPlanSnafu)?;
73
74        let results = self
75            .query_engine
76            .execute(logical_plan, ctx.clone())
77            .await
78            .context(ExecLogicalPlanSnafu)?;
79
80        let batches = match results.data {
81            OutputData::Stream(stream) => util::collect(stream)
82                .await
83                .context(CollectRecordbatchSnafu)?,
84            OutputData::RecordBatches(rbs) => rbs.take(),
85            _ => unreachable!("should not happen"),
86        };
87
88        let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
89
90        for batch in batches {
91            // Only one column the results, ensured by `prometheus::metric_name_matchers_to_plan`.
92            batch
93                .iter_column_as_string(0)
94                .flatten()
95                .for_each(|x| results.push(x))
96        }
97
98        Ok(results)
99    }
100
101    /// Handles label values query request, returns the values.
102    #[tracing::instrument(skip_all)]
103    pub(crate) async fn handle_query_label_values(
104        &self,
105        metric: String,
106        label_name: String,
107        matchers: Vec<Matcher>,
108        start: SystemTime,
109        end: SystemTime,
110        ctx: &QueryContextRef,
111    ) -> Result<Vec<String>> {
112        let table_schema = matchers
113            .iter()
114            .find_map(|m| {
115                if is_database_selection_label(&m.name) && m.op == MatchOp::Equal {
116                    Some(m.value.clone())
117                } else {
118                    None
119                }
120            })
121            .unwrap_or_else(|| ctx.current_schema());
122
123        let full_table_name = format_full_table_name(ctx.current_catalog(), &table_schema, &metric);
124        let table = self
125            .catalog_manager
126            .table(ctx.current_catalog(), &table_schema, &metric, Some(ctx))
127            .await
128            .context(CatalogSnafu)?
129            .with_context(|| TableNotFoundSnafu {
130                table_name: full_table_name.clone(),
131            })?;
132
133        // Check label column existence before building the query plan so a missing label can be
134        // reported as `TableColumnNotFound` and handled like Prometheus expects.
135        if table.schema().column_schema_by_name(&label_name).is_none() {
136            return table::error::ColumnNotExistsSnafu {
137                column_name: label_name,
138                table_name: full_table_name,
139            }
140            .fail()
141            .context(TableSnafu);
142        }
143
144        let dataframe = self
145            .query_engine
146            .read_table(table.clone())
147            .with_context(|_| ReadTableSnafu {
148                table_name: full_table_name,
149            })?;
150
151        let scan_plan = dataframe.into_unoptimized_plan();
152        let filter_conditions =
153            PromPlanner::matchers_to_expr(Matchers::new(matchers), scan_plan.schema())
154                .context(PrometheusLabelValuesQueryPlanSnafu)?;
155        let logical_plan = promql::label_values::rewrite_label_values_query(
156            table,
157            scan_plan,
158            filter_conditions,
159            label_name,
160            start,
161            end,
162        )
163        .context(PrometheusLabelValuesQueryPlanSnafu)?;
164
165        let results = self
166            .query_engine
167            .execute(logical_plan, ctx.clone())
168            .await
169            .context(ExecLogicalPlanSnafu)?;
170
171        let batches = match results.data {
172            OutputData::Stream(stream) => util::collect(stream)
173                .await
174                .context(CollectRecordbatchSnafu)?,
175            OutputData::RecordBatches(rbs) => rbs.take(),
176            _ => unreachable!("should not happen"),
177        };
178
179        let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
180        for batch in batches {
181            // Only one column in results, ensured by `prometheus::label_values_matchers_to_plan`.
182            batch
183                .iter_column_as_string(0)
184                .flatten()
185                .for_each(|x| results.push(x))
186        }
187
188        Ok(results)
189    }
190}