servers/http/result/
prometheus_resp.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16use std::cmp::Ordering;
17use std::collections::{BTreeMap, HashMap};
18
19use axum::http::HeaderValue;
20use axum::response::{IntoResponse, Response};
21use axum::Json;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_query::{Output, OutputData};
25use common_recordbatch::RecordBatches;
26use datatypes::prelude::ConcreteDataType;
27use datatypes::scalars::ScalarVector;
28use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
29use indexmap::IndexMap;
30use promql_parser::label::METRIC_NAME;
31use promql_parser::parser::value::ValueType;
32use serde::{Deserialize, Serialize};
33use serde_json::Value;
34use snafu::{OptionExt, ResultExt};
35
36use crate::error::{
37    status_code_to_http_status, CollectRecordbatchSnafu, Result, UnexpectedResultSnafu,
38};
39use crate::http::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS};
40use crate::http::prometheus::{
41    PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusResponse,
42};
43
44#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
45pub struct PrometheusJsonResponse {
46    pub status: String,
47    #[serde(skip_serializing_if = "PrometheusResponse::is_none")]
48    #[serde(default)]
49    pub data: PrometheusResponse,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub error: Option<String>,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    #[serde(rename = "errorType")]
54    pub error_type: Option<String>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub warnings: Option<Vec<String>>,
57
58    #[serde(skip)]
59    pub status_code: Option<StatusCode>,
60    // placeholder for header value
61    #[serde(skip)]
62    #[serde(default)]
63    pub resp_metrics: HashMap<String, Value>,
64}
65
66impl IntoResponse for PrometheusJsonResponse {
67    fn into_response(self) -> Response {
68        let metrics = if self.resp_metrics.is_empty() {
69            None
70        } else {
71            serde_json::to_string(&self.resp_metrics).ok()
72        };
73
74        let http_code = self.status_code.map(|c| status_code_to_http_status(&c));
75
76        let mut resp = Json(self).into_response();
77
78        if let Some(http_code) = http_code {
79            *resp.status_mut() = http_code;
80        }
81
82        if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
83            resp.headers_mut().insert(&GREPTIME_DB_HEADER_METRICS, m);
84        }
85
86        resp
87    }
88}
89
90impl PrometheusJsonResponse {
91    pub fn error<S1>(error_type: StatusCode, reason: S1) -> Self
92    where
93        S1: Into<String>,
94    {
95        PrometheusJsonResponse {
96            status: "error".to_string(),
97            data: PrometheusResponse::None,
98            error: Some(reason.into()),
99            error_type: Some(error_type.to_string()),
100            warnings: None,
101            resp_metrics: Default::default(),
102            status_code: Some(error_type),
103        }
104    }
105
106    pub fn success(data: PrometheusResponse) -> Self {
107        PrometheusJsonResponse {
108            status: "success".to_string(),
109            data,
110            error: None,
111            error_type: None,
112            warnings: None,
113            resp_metrics: Default::default(),
114            status_code: None,
115        }
116    }
117
118    /// Convert from `Result<Output>`
119    pub async fn from_query_result(
120        result: Result<Output>,
121        metric_name: Option<String>,
122        result_type: ValueType,
123    ) -> Self {
124        let response: Result<Self> = try {
125            let result = result?;
126            let mut resp =
127                match result.data {
128                    OutputData::RecordBatches(batches) => Self::success(
129                        Self::record_batches_to_data(batches, metric_name, result_type)?,
130                    ),
131                    OutputData::Stream(stream) => {
132                        let record_batches = RecordBatches::try_collect(stream)
133                            .await
134                            .context(CollectRecordbatchSnafu)?;
135                        Self::success(Self::record_batches_to_data(
136                            record_batches,
137                            metric_name,
138                            result_type,
139                        )?)
140                    }
141                    OutputData::AffectedRows(_) => Self::error(
142                        StatusCode::Unexpected,
143                        "expected data result, but got affected rows",
144                    ),
145                };
146
147            if let Some(physical_plan) = result.meta.plan {
148                let mut result_map = HashMap::new();
149                let mut tmp = vec![&mut result_map];
150                collect_plan_metrics(&physical_plan, &mut tmp);
151
152                let re = result_map
153                    .into_iter()
154                    .map(|(k, v)| (k, Value::from(v)))
155                    .collect();
156                resp.resp_metrics = re;
157            }
158
159            resp
160        };
161
162        let result_type_string = result_type.to_string();
163
164        match response {
165            Ok(resp) => resp,
166            Err(err) => {
167                // Prometheus won't report error if querying nonexist label and metric
168                if err.status_code() == StatusCode::TableNotFound
169                    || err.status_code() == StatusCode::TableColumnNotFound
170                {
171                    Self::success(PrometheusResponse::PromData(PromData {
172                        result_type: result_type_string,
173                        ..Default::default()
174                    }))
175                } else {
176                    Self::error(err.status_code(), err.output_msg())
177                }
178            }
179        }
180    }
181
182    /// Convert [RecordBatches] to [PromData]
183    fn record_batches_to_data(
184        batches: RecordBatches,
185        metric_name: Option<String>,
186        result_type: ValueType,
187    ) -> Result<PrometheusResponse> {
188        // Return empty result if no batches
189        if batches.iter().next().is_none() {
190            return Ok(PrometheusResponse::PromData(PromData {
191                result_type: result_type.to_string(),
192                ..Default::default()
193            }));
194        }
195
196        // infer semantic type of each column from schema.
197        // TODO(ruihang): wish there is a better way to do this.
198        let mut timestamp_column_index = None;
199        let mut tag_column_indices = Vec::new();
200        let mut first_field_column_index = None;
201
202        let mut num_label_columns = 0;
203
204        for (i, column) in batches.schema().column_schemas().iter().enumerate() {
205            match column.data_type {
206                ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => {
207                    if timestamp_column_index.is_none() {
208                        timestamp_column_index = Some(i);
209                    }
210                }
211                // Treat all value types as field
212                ConcreteDataType::Float32(_)
213                | ConcreteDataType::Float64(_)
214                | ConcreteDataType::Int8(_)
215                | ConcreteDataType::Int16(_)
216                | ConcreteDataType::Int32(_)
217                | ConcreteDataType::Int64(_)
218                | ConcreteDataType::UInt8(_)
219                | ConcreteDataType::UInt16(_)
220                | ConcreteDataType::UInt32(_)
221                | ConcreteDataType::UInt64(_) => {
222                    if first_field_column_index.is_none() {
223                        first_field_column_index = Some(i);
224                    }
225                }
226                ConcreteDataType::String(_) => {
227                    tag_column_indices.push(i);
228                    num_label_columns += 1;
229                }
230                _ => {}
231            }
232        }
233
234        let timestamp_column_index = timestamp_column_index.context(UnexpectedResultSnafu {
235            reason: "no timestamp column found".to_string(),
236        })?;
237        let first_field_column_index = first_field_column_index.context(UnexpectedResultSnafu {
238            reason: "no value column found".to_string(),
239        })?;
240
241        // Preserves the order of output tags.
242        // Tag order matters, e.g., after sorc and sort_desc, the output order must be kept.
243        let mut buffer = IndexMap::<Vec<(&str, &str)>, Vec<(f64, String)>>::new();
244
245        let schema = batches.schema();
246        for batch in batches.iter() {
247            // prepare things...
248            let tag_columns = tag_column_indices
249                .iter()
250                .map(|i| {
251                    batch
252                        .column(*i)
253                        .as_any()
254                        .downcast_ref::<StringVector>()
255                        .unwrap()
256                })
257                .collect::<Vec<_>>();
258            let tag_names = tag_column_indices
259                .iter()
260                .map(|c| schema.column_name_by_index(*c))
261                .collect::<Vec<_>>();
262            let timestamp_column = batch
263                .column(timestamp_column_index)
264                .as_any()
265                .downcast_ref::<TimestampMillisecondVector>()
266                .unwrap();
267            let casted_field_column = batch
268                .column(first_field_column_index)
269                .cast(&ConcreteDataType::float64_datatype())
270                .unwrap();
271            let field_column = casted_field_column
272                .as_any()
273                .downcast_ref::<Float64Vector>()
274                .unwrap();
275
276            // assemble rows
277            for row_index in 0..batch.num_rows() {
278                // retrieve value
279                if let Some(v) = field_column.get_data(row_index) {
280                    // ignore all NaN values to reduce the amount of data to be sent.
281                    if v.is_nan() {
282                        continue;
283                    }
284
285                    // retrieve tags
286                    let mut tags = Vec::with_capacity(num_label_columns + 1);
287                    if let Some(metric_name) = &metric_name {
288                        tags.push((METRIC_NAME, metric_name.as_str()));
289                    }
290                    for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
291                        // TODO(ruihang): add test for NULL tag
292                        if let Some(tag_value) = tag_column.get_data(row_index) {
293                            tags.push((tag_name, tag_value));
294                        }
295                    }
296
297                    // retrieve timestamp
298                    let timestamp_millis: i64 =
299                        timestamp_column.get_data(row_index).unwrap().into();
300                    let timestamp = timestamp_millis as f64 / 1000.0;
301
302                    buffer
303                        .entry(tags)
304                        .or_default()
305                        .push((timestamp, Into::<f64>::into(v).to_string()));
306                };
307            }
308        }
309
310        // initialize result to return
311        let mut result = match result_type {
312            ValueType::Vector => PromQueryResult::Vector(vec![]),
313            ValueType::Matrix => PromQueryResult::Matrix(vec![]),
314            ValueType::Scalar => PromQueryResult::Scalar(None),
315            ValueType::String => PromQueryResult::String(None),
316        };
317
318        // accumulate data into result
319        buffer.into_iter().for_each(|(tags, mut values)| {
320            let metric = tags
321                .into_iter()
322                .map(|(k, v)| (k.to_string(), v.to_string()))
323                .collect::<BTreeMap<_, _>>();
324            match result {
325                PromQueryResult::Vector(ref mut v) => {
326                    v.push(PromSeriesVector {
327                        metric,
328                        value: values.pop(),
329                    });
330                }
331                PromQueryResult::Matrix(ref mut v) => {
332                    // sort values by timestamp
333                    if !values.is_sorted_by(|a, b| a.0 <= b.0) {
334                        values.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
335                    }
336
337                    v.push(PromSeriesMatrix { metric, values });
338                }
339                PromQueryResult::Scalar(ref mut v) => {
340                    *v = values.pop();
341                }
342                PromQueryResult::String(ref mut _v) => {
343                    // TODO(ruihang): Not supported yet
344                }
345            }
346        });
347
348        // sort matrix by metric
349        // see: https://prometheus.io/docs/prometheus/3.5/querying/api/#range-vectors
350        if let PromQueryResult::Matrix(ref mut v) = result {
351            v.sort_by(|a, b| a.metric.cmp(&b.metric));
352        }
353
354        let result_type_string = result_type.to_string();
355        let data = PrometheusResponse::PromData(PromData {
356            result_type: result_type_string,
357            result,
358        });
359
360        Ok(data)
361    }
362}