frontend/instance/
promql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::SystemTime;
16
17use catalog::information_schema::TABLES;
18use client::OutputData;
19use common_catalog::consts::INFORMATION_SCHEMA_NAME;
20use common_catalog::format_full_table_name;
21use common_recordbatch::util;
22use common_telemetry::tracing;
23use datatypes::prelude::Value;
24use promql_parser::label::{MatchOp, Matcher, Matchers};
25use query::promql;
26use query::promql::planner::PromPlanner;
27use servers::prom_store::{DATABASE_LABEL, SCHEMA_LABEL};
28use servers::prometheus;
29use session::context::QueryContextRef;
30use snafu::{OptionExt, ResultExt};
31
32use crate::error::{
33    CatalogSnafu, CollectRecordbatchSnafu, ExecLogicalPlanSnafu,
34    PrometheusLabelValuesQueryPlanSnafu, PrometheusMetricNamesQueryPlanSnafu, ReadTableSnafu,
35    Result, TableNotFoundSnafu,
36};
37use crate::instance::Instance;
38
39impl Instance {
40    /// Handles metric names query request, returns the names.
41    #[tracing::instrument(skip_all)]
42    pub(crate) async fn handle_query_metric_names(
43        &self,
44        matchers: Vec<Matcher>,
45        ctx: &QueryContextRef,
46    ) -> Result<Vec<String>> {
47        let _timer = crate::metrics::PROMQL_QUERY_METRICS_ELAPSED
48            .with_label_values(&[ctx.get_db_string().as_str()])
49            .start_timer();
50
51        let table = self
52            .catalog_manager
53            .table(
54                ctx.current_catalog(),
55                INFORMATION_SCHEMA_NAME,
56                TABLES,
57                Some(ctx),
58            )
59            .await
60            .context(CatalogSnafu)?
61            .with_context(|| TableNotFoundSnafu {
62                table_name: "greptime.information_schema.tables",
63            })?;
64
65        let dataframe = self
66            .query_engine
67            .read_table(table)
68            .with_context(|_| ReadTableSnafu {
69                table_name: "greptime.information_schema.tables",
70            })?;
71
72        let logical_plan = prometheus::metric_name_matchers_to_plan(dataframe, matchers, ctx)
73            .context(PrometheusMetricNamesQueryPlanSnafu)?;
74
75        let results = self
76            .query_engine
77            .execute(logical_plan, ctx.clone())
78            .await
79            .context(ExecLogicalPlanSnafu)?;
80
81        let batches = match results.data {
82            OutputData::Stream(stream) => util::collect(stream)
83                .await
84                .context(CollectRecordbatchSnafu)?,
85            OutputData::RecordBatches(rbs) => rbs.take(),
86            _ => unreachable!("should not happen"),
87        };
88
89        let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
90
91        for batch in batches {
92            // Only one column the results, ensured by `prometheus::metric_name_matchers_to_plan`.
93            let names = batch.column(0);
94
95            for i in 0..names.len() {
96                let Value::String(name) = names.get(i) else {
97                    unreachable!();
98                };
99
100                results.push(name.into_string());
101            }
102        }
103
104        Ok(results)
105    }
106
107    /// Handles label values query request, returns the values.
108    #[tracing::instrument(skip_all)]
109    pub(crate) async fn handle_query_label_values(
110        &self,
111        metric: String,
112        label_name: String,
113        matchers: Vec<Matcher>,
114        start: SystemTime,
115        end: SystemTime,
116        ctx: &QueryContextRef,
117    ) -> Result<Vec<String>> {
118        let table_schema = matchers
119            .iter()
120            .find_map(|m| {
121                if (m.name == SCHEMA_LABEL || m.name == DATABASE_LABEL) && m.op == MatchOp::Equal {
122                    Some(m.value.clone())
123                } else {
124                    None
125                }
126            })
127            .unwrap_or_else(|| ctx.current_schema());
128
129        let table = self
130            .catalog_manager
131            .table(ctx.current_catalog(), &table_schema, &metric, Some(ctx))
132            .await
133            .context(CatalogSnafu)?
134            .with_context(|| TableNotFoundSnafu {
135                table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
136            })?;
137
138        let dataframe = self
139            .query_engine
140            .read_table(table.clone())
141            .with_context(|_| ReadTableSnafu {
142                table_name: format_full_table_name(ctx.current_catalog(), &table_schema, &metric),
143            })?;
144
145        let scan_plan = dataframe.into_logical_plan();
146        let filter_conditions =
147            PromPlanner::matchers_to_expr(Matchers::new(matchers), scan_plan.schema())
148                .context(PrometheusLabelValuesQueryPlanSnafu)?;
149        let logical_plan = promql::label_values::rewrite_label_values_query(
150            table,
151            scan_plan,
152            filter_conditions,
153            label_name,
154            start,
155            end,
156        )
157        .context(PrometheusLabelValuesQueryPlanSnafu)?;
158
159        let results = self
160            .query_engine
161            .execute(logical_plan, ctx.clone())
162            .await
163            .context(ExecLogicalPlanSnafu)?;
164
165        let batches = match results.data {
166            OutputData::Stream(stream) => util::collect(stream)
167                .await
168                .context(CollectRecordbatchSnafu)?,
169            OutputData::RecordBatches(rbs) => rbs.take(),
170            _ => unreachable!("should not happen"),
171        };
172
173        let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
174        for batch in batches {
175            // Only one column in results, ensured by `prometheus::label_values_matchers_to_plan`.
176            let names = batch.column(0);
177
178            for i in 0..names.len() {
179                results.push(names.get(i).to_string());
180            }
181        }
182
183        Ok(results)
184    }
185}