frontend/instance/
promql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::SystemTime;
16
17use catalog::information_schema::TABLES;
18use client::OutputData;
19use common_catalog::consts::INFORMATION_SCHEMA_NAME;
20use common_catalog::format_full_table_name;
21use common_recordbatch::util;
22use common_telemetry::tracing;
23use promql_parser::label::{MatchOp, Matcher, Matchers};
24use query::promql;
25use query::promql::planner::PromPlanner;
26use servers::prom_store::{DATABASE_LABEL, SCHEMA_LABEL};
27use servers::prometheus;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30
31use crate::error::{
32    CatalogSnafu, CollectRecordbatchSnafu, ExecLogicalPlanSnafu,
33    PrometheusLabelValuesQueryPlanSnafu, PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu,
34    Result, TableNotFoundSnafu,
35};
36use crate::instance::Instance;
37
38impl Instance {
39    /// Handles metric names query request, returns the names.
40    #[tracing::instrument(skip_all)]
41    pub(crate) async fn handle_query_metric_names(
42        &self,
43        matchers: Vec<Matcher>,
44        ctx: &QueryContextRef,
45    ) -> Result<Vec<String>> {
46        let _timer = crate::metrics::PROMQL_QUERY_METRICS_ELAPSED
47            .with_label_values(&[ctx.get_db_string().as_str()])
48            .start_timer();
49
50        let table = self
51            .catalog_manager
52            .table(
53                ctx.current_catalog(),
54                INFORMATION_SCHEMA_NAME,
55                TABLES,
56                Some(ctx),
57            )
58            .await
59            .context(CatalogSnafu)?
60            .with_context(|| TableNotFoundSnafu {
61                table_name: "greptime.information_schema.tables",
62            })?;
63
64        let dataframe = self
65            .query_engine
66            .read_table(table)
67            .with_context(|_| ReadTableSnafu {
68                table_name: "greptime.information_schema.tables",
69            })?;
70
71        let logical_plan = prometheus::metric_name_matchers_to_plan(dataframe, matchers, ctx)
72            .context(PrometheusMetricNamesQueryPlanSnafu)?;
73
74        let results = self
75            .query_engine
76            .execute(logical_plan, ctx.clone())
77            .await
78            .context(ExecLogicalPlanSnafu)?;
79
80        let batches = match results.data {
81            OutputData::Stream(stream) => util::collect(stream)
82                .await
83                .context(CollectRecordbatchSnafu)?,
84            OutputData::RecordBatches(rbs) => rbs.take(),
85            _ => unreachable!("should not happen"),
86        };
87
88        let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
89
90        for batch in batches {
91            // Only one column the results, ensured by `prometheus::metric_name_matchers_to_plan`.
92            batch
93                .iter_column_as_string(0)
94                .flatten()
95                .for_each(|x| results.push(x))
96        }
97
98        Ok(results)
99    }
100
101    /// Handles label values query request, returns the values.
102    #[tracing::instrument(skip_all)]
103    pub(crate) async fn handle_query_label_values(
104        &self,
105        metric: String,
106        label_name: String,
107        matchers: Vec<Matcher>,
108        start: SystemTime,
109        end: SystemTime,
110        ctx: &QueryContextRef,
111    ) -> Result<Vec<String>> {
112        let table_schema = matchers
113            .iter()
114            .find_map(|m| {
115                if (m.name == SCHEMA_LABEL || m.name == DATABASE_LABEL) && m.op == MatchOp::Equal {
116                    Some(m.value.clone())
117                } else {
118                    None
119                }
120            })
121            .unwrap_or_else(|| ctx.current_schema());
122
123        let table = self
124            .catalog_manager
125            .table(ctx.current_catalog(), &table_schema, &metric, Some(ctx))
126            .await
127            .context(CatalogSnafu)?
128            .with_context(|| TableNotFoundSnafu {
129                table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
130            })?;
131
132        let dataframe = self
133            .query_engine
134            .read_table(table.clone())
135            .with_context(|_| ReadTableSnafu {
136                table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
137            })?;
138
139        let scan_plan = dataframe.into_unoptimized_plan();
140        let filter_conditions =
141            PromPlanner::matchers_to_expr(Matchers::new(matchers), scan_plan.schema())
142                .context(PrometheusLabelValuesQueryPlanSnafu)?;
143        let logical_plan = promql::label_values::rewrite_label_values_query(
144            table,
145            scan_plan,
146            filter_conditions,
147            label_name,
148            start,
149            end,
150        )
151        .context(PrometheusLabelValuesQueryPlanSnafu)?;
152
153        let results = self
154            .query_engine
155            .execute(logical_plan, ctx.clone())
156            .await
157            .context(ExecLogicalPlanSnafu)?;
158
159        let batches = match results.data {
160            OutputData::Stream(stream) => util::collect(stream)
161                .await
162                .context(CollectRecordbatchSnafu)?,
163            OutputData::RecordBatches(rbs) => rbs.take(),
164            _ => unreachable!("should not happen"),
165        };
166
167        let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
168        for batch in batches {
169            // Only one column in results, ensured by `prometheus::label_values_matchers_to_plan`.
170            batch
171                .iter_column_as_string(0)
172                .flatten()
173                .for_each(|x| results.push(x))
174        }
175
176        Ok(results)
177    }
178}