servers/http/result/
prometheus_resp.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16use std::cmp::Ordering;
17use std::collections::{BTreeMap, HashMap};
18
19use axum::http::HeaderValue;
20use axum::response::{IntoResponse, Response};
21use axum::Json;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_query::{Output, OutputData};
25use common_recordbatch::RecordBatches;
26use datatypes::prelude::ConcreteDataType;
27use datatypes::scalars::ScalarVector;
28use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
29use indexmap::IndexMap;
30use promql_parser::label::METRIC_NAME;
31use promql_parser::parser::value::ValueType;
32use serde::{Deserialize, Serialize};
33use serde_json::Value;
34use snafu::{OptionExt, ResultExt};
35
36use crate::error::{
37    status_code_to_http_status, CollectRecordbatchSnafu, Result, UnexpectedResultSnafu,
38};
39use crate::http::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS};
40use crate::http::prometheus::{
41    PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusResponse,
42};
43
44#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
45pub struct PrometheusJsonResponse {
46    pub status: String,
47    #[serde(skip_serializing_if = "PrometheusResponse::is_none")]
48    #[serde(default)]
49    pub data: PrometheusResponse,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub error: Option<String>,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    #[serde(rename = "errorType")]
54    pub error_type: Option<String>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub warnings: Option<Vec<String>>,
57
58    #[serde(skip)]
59    pub status_code: Option<StatusCode>,
60    // placeholder for header value
61    #[serde(skip)]
62    #[serde(default)]
63    pub resp_metrics: HashMap<String, Value>,
64}
65
66impl IntoResponse for PrometheusJsonResponse {
67    fn into_response(self) -> Response {
68        let metrics = if self.resp_metrics.is_empty() {
69            None
70        } else {
71            serde_json::to_string(&self.resp_metrics).ok()
72        };
73
74        let http_code = self.status_code.map(|c| status_code_to_http_status(&c));
75
76        let mut resp = Json(self).into_response();
77
78        if let Some(http_code) = http_code {
79            *resp.status_mut() = http_code;
80        }
81
82        if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
83            resp.headers_mut().insert(&GREPTIME_DB_HEADER_METRICS, m);
84        }
85
86        resp
87    }
88}
89
90impl PrometheusJsonResponse {
91    pub fn error<S1>(error_type: StatusCode, reason: S1) -> Self
92    where
93        S1: Into<String>,
94    {
95        PrometheusJsonResponse {
96            status: "error".to_string(),
97            data: PrometheusResponse::None,
98            error: Some(reason.into()),
99            error_type: Some(error_type.to_string()),
100            warnings: None,
101            resp_metrics: Default::default(),
102            status_code: Some(error_type),
103        }
104    }
105
106    pub fn success(data: PrometheusResponse) -> Self {
107        PrometheusJsonResponse {
108            status: "success".to_string(),
109            data,
110            error: None,
111            error_type: None,
112            warnings: None,
113            resp_metrics: Default::default(),
114            status_code: None,
115        }
116    }
117
118    /// Convert from `Result<Output>`
119    pub async fn from_query_result(
120        result: Result<Output>,
121        metric_name: String,
122        result_type: ValueType,
123    ) -> Self {
124        let response: Result<Self> = try {
125            let result = result?;
126            let mut resp =
127                match result.data {
128                    OutputData::RecordBatches(batches) => Self::success(
129                        Self::record_batches_to_data(batches, metric_name, result_type)?,
130                    ),
131                    OutputData::Stream(stream) => {
132                        let record_batches = RecordBatches::try_collect(stream)
133                            .await
134                            .context(CollectRecordbatchSnafu)?;
135                        Self::success(Self::record_batches_to_data(
136                            record_batches,
137                            metric_name,
138                            result_type,
139                        )?)
140                    }
141                    OutputData::AffectedRows(_) => Self::error(
142                        StatusCode::Unexpected,
143                        "expected data result, but got affected rows",
144                    ),
145                };
146
147            if let Some(physical_plan) = result.meta.plan {
148                let mut result_map = HashMap::new();
149                let mut tmp = vec![&mut result_map];
150                collect_plan_metrics(&physical_plan, &mut tmp);
151
152                let re = result_map
153                    .into_iter()
154                    .map(|(k, v)| (k, Value::from(v)))
155                    .collect();
156                resp.resp_metrics = re;
157            }
158
159            resp
160        };
161
162        let result_type_string = result_type.to_string();
163
164        match response {
165            Ok(resp) => resp,
166            Err(err) => {
167                // Prometheus won't report error if querying nonexist label and metric
168                if err.status_code() == StatusCode::TableNotFound
169                    || err.status_code() == StatusCode::TableColumnNotFound
170                {
171                    Self::success(PrometheusResponse::PromData(PromData {
172                        result_type: result_type_string,
173                        ..Default::default()
174                    }))
175                } else {
176                    Self::error(err.status_code(), err.output_msg())
177                }
178            }
179        }
180    }
181
182    /// Convert [RecordBatches] to [PromData]
183    fn record_batches_to_data(
184        batches: RecordBatches,
185        metric_name: String,
186        result_type: ValueType,
187    ) -> Result<PrometheusResponse> {
188        // infer semantic type of each column from schema.
189        // TODO(ruihang): wish there is a better way to do this.
190        let mut timestamp_column_index = None;
191        let mut tag_column_indices = Vec::new();
192        let mut first_field_column_index = None;
193
194        let mut num_label_columns = 0;
195
196        for (i, column) in batches.schema().column_schemas().iter().enumerate() {
197            match column.data_type {
198                ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => {
199                    if timestamp_column_index.is_none() {
200                        timestamp_column_index = Some(i);
201                    }
202                }
203                // Treat all value types as field
204                ConcreteDataType::Float32(_)
205                | ConcreteDataType::Float64(_)
206                | ConcreteDataType::Int8(_)
207                | ConcreteDataType::Int16(_)
208                | ConcreteDataType::Int32(_)
209                | ConcreteDataType::Int64(_)
210                | ConcreteDataType::UInt8(_)
211                | ConcreteDataType::UInt16(_)
212                | ConcreteDataType::UInt32(_)
213                | ConcreteDataType::UInt64(_) => {
214                    if first_field_column_index.is_none() {
215                        first_field_column_index = Some(i);
216                    }
217                }
218                ConcreteDataType::String(_) => {
219                    tag_column_indices.push(i);
220                    num_label_columns += 1;
221                }
222                _ => {}
223            }
224        }
225
226        let timestamp_column_index = timestamp_column_index.context(UnexpectedResultSnafu {
227            reason: "no timestamp column found".to_string(),
228        })?;
229        let first_field_column_index = first_field_column_index.context(UnexpectedResultSnafu {
230            reason: "no value column found".to_string(),
231        })?;
232
233        let metric_name = (METRIC_NAME, metric_name.as_str());
234        // Preserves the order of output tags.
235        // Tag order matters, e.g., after sorc and sort_desc, the output order must be kept.
236        let mut buffer = IndexMap::<Vec<(&str, &str)>, Vec<(f64, String)>>::new();
237
238        let schema = batches.schema();
239        for batch in batches.iter() {
240            // prepare things...
241            let tag_columns = tag_column_indices
242                .iter()
243                .map(|i| {
244                    batch
245                        .column(*i)
246                        .as_any()
247                        .downcast_ref::<StringVector>()
248                        .unwrap()
249                })
250                .collect::<Vec<_>>();
251            let tag_names = tag_column_indices
252                .iter()
253                .map(|c| schema.column_name_by_index(*c))
254                .collect::<Vec<_>>();
255            let timestamp_column = batch
256                .column(timestamp_column_index)
257                .as_any()
258                .downcast_ref::<TimestampMillisecondVector>()
259                .unwrap();
260            let casted_field_column = batch
261                .column(first_field_column_index)
262                .cast(&ConcreteDataType::float64_datatype())
263                .unwrap();
264            let field_column = casted_field_column
265                .as_any()
266                .downcast_ref::<Float64Vector>()
267                .unwrap();
268
269            // assemble rows
270            for row_index in 0..batch.num_rows() {
271                // retrieve value
272                if let Some(v) = field_column.get_data(row_index) {
273                    // ignore all NaN values to reduce the amount of data to be sent.
274                    if v.is_nan() {
275                        continue;
276                    }
277
278                    // retrieve tags
279                    // TODO(ruihang): push table name `__metric__`
280                    let mut tags = Vec::with_capacity(num_label_columns + 1);
281                    tags.push(metric_name);
282                    for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
283                        // TODO(ruihang): add test for NULL tag
284                        if let Some(tag_value) = tag_column.get_data(row_index) {
285                            tags.push((tag_name, tag_value));
286                        }
287                    }
288
289                    // retrieve timestamp
290                    let timestamp_millis: i64 =
291                        timestamp_column.get_data(row_index).unwrap().into();
292                    let timestamp = timestamp_millis as f64 / 1000.0;
293
294                    buffer
295                        .entry(tags)
296                        .or_default()
297                        .push((timestamp, Into::<f64>::into(v).to_string()));
298                };
299            }
300        }
301
302        // initialize result to return
303        let mut result = match result_type {
304            ValueType::Vector => PromQueryResult::Vector(vec![]),
305            ValueType::Matrix => PromQueryResult::Matrix(vec![]),
306            ValueType::Scalar => PromQueryResult::Scalar(None),
307            ValueType::String => PromQueryResult::String(None),
308        };
309
310        // accumulate data into result
311        buffer.into_iter().for_each(|(tags, mut values)| {
312            let metric = tags
313                .into_iter()
314                .map(|(k, v)| (k.to_string(), v.to_string()))
315                .collect::<BTreeMap<_, _>>();
316            match result {
317                PromQueryResult::Vector(ref mut v) => {
318                    v.push(PromSeriesVector {
319                        metric,
320                        value: values.pop(),
321                    });
322                }
323                PromQueryResult::Matrix(ref mut v) => {
324                    // sort values by timestamp
325                    if !values.is_sorted_by(|a, b| a.0 <= b.0) {
326                        values.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
327                    }
328
329                    v.push(PromSeriesMatrix { metric, values });
330                }
331                PromQueryResult::Scalar(ref mut v) => {
332                    *v = values.pop();
333                }
334                PromQueryResult::String(ref mut _v) => {
335                    // TODO(ruihang): Not supported yet
336                }
337            }
338        });
339
340        // sort matrix by metric
341        // see: https://prometheus.io/docs/prometheus/3.5/querying/api/#range-vectors
342        if let PromQueryResult::Matrix(ref mut v) = result {
343            v.sort_by(|a, b| a.metric.cmp(&b.metric));
344        }
345
346        let result_type_string = result_type.to_string();
347        let data = PrometheusResponse::PromData(PromData {
348            result_type: result_type_string,
349            result,
350        });
351
352        Ok(data)
353    }
354}