servers/http/result/
prometheus_resp.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16use std::collections::HashMap;
17
18use axum::http::HeaderValue;
19use axum::response::{IntoResponse, Response};
20use axum::Json;
21use common_error::ext::ErrorExt;
22use common_error::status_code::StatusCode;
23use common_query::{Output, OutputData};
24use common_recordbatch::RecordBatches;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::scalars::ScalarVector;
27use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
28use indexmap::IndexMap;
29use promql_parser::label::METRIC_NAME;
30use promql_parser::parser::value::ValueType;
31use serde::{Deserialize, Serialize};
32use serde_json::Value;
33use snafu::{OptionExt, ResultExt};
34
35use crate::error::{
36    status_code_to_http_status, CollectRecordbatchSnafu, Result, UnexpectedResultSnafu,
37};
38use crate::http::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS};
39use crate::http::prometheus::{
40    PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusResponse,
41};
42
43#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
44pub struct PrometheusJsonResponse {
45    pub status: String,
46    #[serde(skip_serializing_if = "PrometheusResponse::is_none")]
47    #[serde(default)]
48    pub data: PrometheusResponse,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub error: Option<String>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    #[serde(rename = "errorType")]
53    pub error_type: Option<String>,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub warnings: Option<Vec<String>>,
56
57    #[serde(skip)]
58    pub status_code: Option<StatusCode>,
59    // placeholder for header value
60    #[serde(skip)]
61    #[serde(default)]
62    pub resp_metrics: HashMap<String, Value>,
63}
64
65impl IntoResponse for PrometheusJsonResponse {
66    fn into_response(self) -> Response {
67        let metrics = if self.resp_metrics.is_empty() {
68            None
69        } else {
70            serde_json::to_string(&self.resp_metrics).ok()
71        };
72
73        let http_code = self.status_code.map(|c| status_code_to_http_status(&c));
74
75        let mut resp = Json(self).into_response();
76
77        if let Some(http_code) = http_code {
78            *resp.status_mut() = http_code;
79        }
80
81        if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
82            resp.headers_mut().insert(&GREPTIME_DB_HEADER_METRICS, m);
83        }
84
85        resp
86    }
87}
88
89impl PrometheusJsonResponse {
90    pub fn error<S1>(error_type: StatusCode, reason: S1) -> Self
91    where
92        S1: Into<String>,
93    {
94        PrometheusJsonResponse {
95            status: "error".to_string(),
96            data: PrometheusResponse::None,
97            error: Some(reason.into()),
98            error_type: Some(error_type.to_string()),
99            warnings: None,
100            resp_metrics: Default::default(),
101            status_code: Some(error_type),
102        }
103    }
104
105    pub fn success(data: PrometheusResponse) -> Self {
106        PrometheusJsonResponse {
107            status: "success".to_string(),
108            data,
109            error: None,
110            error_type: None,
111            warnings: None,
112            resp_metrics: Default::default(),
113            status_code: None,
114        }
115    }
116
117    /// Convert from `Result<Output>`
118    pub async fn from_query_result(
119        result: Result<Output>,
120        metric_name: String,
121        result_type: ValueType,
122    ) -> Self {
123        let response: Result<Self> = try {
124            let result = result?;
125            let mut resp =
126                match result.data {
127                    OutputData::RecordBatches(batches) => Self::success(
128                        Self::record_batches_to_data(batches, metric_name, result_type)?,
129                    ),
130                    OutputData::Stream(stream) => {
131                        let record_batches = RecordBatches::try_collect(stream)
132                            .await
133                            .context(CollectRecordbatchSnafu)?;
134                        Self::success(Self::record_batches_to_data(
135                            record_batches,
136                            metric_name,
137                            result_type,
138                        )?)
139                    }
140                    OutputData::AffectedRows(_) => Self::error(
141                        StatusCode::Unexpected,
142                        "expected data result, but got affected rows",
143                    ),
144                };
145
146            if let Some(physical_plan) = result.meta.plan {
147                let mut result_map = HashMap::new();
148                let mut tmp = vec![&mut result_map];
149                collect_plan_metrics(&physical_plan, &mut tmp);
150
151                let re = result_map
152                    .into_iter()
153                    .map(|(k, v)| (k, Value::from(v)))
154                    .collect();
155                resp.resp_metrics = re;
156            }
157
158            resp
159        };
160
161        let result_type_string = result_type.to_string();
162
163        match response {
164            Ok(resp) => resp,
165            Err(err) => {
166                // Prometheus won't report error if querying nonexist label and metric
167                if err.status_code() == StatusCode::TableNotFound
168                    || err.status_code() == StatusCode::TableColumnNotFound
169                {
170                    Self::success(PrometheusResponse::PromData(PromData {
171                        result_type: result_type_string,
172                        ..Default::default()
173                    }))
174                } else {
175                    Self::error(err.status_code(), err.output_msg())
176                }
177            }
178        }
179    }
180
181    /// Convert [RecordBatches] to [PromData]
182    fn record_batches_to_data(
183        batches: RecordBatches,
184        metric_name: String,
185        result_type: ValueType,
186    ) -> Result<PrometheusResponse> {
187        // infer semantic type of each column from schema.
188        // TODO(ruihang): wish there is a better way to do this.
189        let mut timestamp_column_index = None;
190        let mut tag_column_indices = Vec::new();
191        let mut first_field_column_index = None;
192
193        let mut num_label_columns = 0;
194
195        for (i, column) in batches.schema().column_schemas().iter().enumerate() {
196            match column.data_type {
197                ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => {
198                    if timestamp_column_index.is_none() {
199                        timestamp_column_index = Some(i);
200                    }
201                }
202                // Treat all value types as field
203                ConcreteDataType::Float32(_)
204                | ConcreteDataType::Float64(_)
205                | ConcreteDataType::Int8(_)
206                | ConcreteDataType::Int16(_)
207                | ConcreteDataType::Int32(_)
208                | ConcreteDataType::Int64(_)
209                | ConcreteDataType::UInt8(_)
210                | ConcreteDataType::UInt16(_)
211                | ConcreteDataType::UInt32(_)
212                | ConcreteDataType::UInt64(_) => {
213                    if first_field_column_index.is_none() {
214                        first_field_column_index = Some(i);
215                    }
216                }
217                ConcreteDataType::String(_) => {
218                    tag_column_indices.push(i);
219                    num_label_columns += 1;
220                }
221                _ => {}
222            }
223        }
224
225        let timestamp_column_index = timestamp_column_index.context(UnexpectedResultSnafu {
226            reason: "no timestamp column found".to_string(),
227        })?;
228        let first_field_column_index = first_field_column_index.context(UnexpectedResultSnafu {
229            reason: "no value column found".to_string(),
230        })?;
231
232        let metric_name = (METRIC_NAME, metric_name.as_str());
233        // Preserves the order of output tags.
234        // Tag order matters, e.g., after sorc and sort_desc, the output order must be kept.
235        let mut buffer = IndexMap::<Vec<(&str, &str)>, Vec<(f64, String)>>::new();
236
237        let schema = batches.schema();
238        for batch in batches.iter() {
239            // prepare things...
240            let tag_columns = tag_column_indices
241                .iter()
242                .map(|i| {
243                    batch
244                        .column(*i)
245                        .as_any()
246                        .downcast_ref::<StringVector>()
247                        .unwrap()
248                })
249                .collect::<Vec<_>>();
250            let tag_names = tag_column_indices
251                .iter()
252                .map(|c| schema.column_name_by_index(*c))
253                .collect::<Vec<_>>();
254            let timestamp_column = batch
255                .column(timestamp_column_index)
256                .as_any()
257                .downcast_ref::<TimestampMillisecondVector>()
258                .unwrap();
259            let casted_field_column = batch
260                .column(first_field_column_index)
261                .cast(&ConcreteDataType::float64_datatype())
262                .unwrap();
263            let field_column = casted_field_column
264                .as_any()
265                .downcast_ref::<Float64Vector>()
266                .unwrap();
267
268            // assemble rows
269            for row_index in 0..batch.num_rows() {
270                // retrieve value
271                if let Some(v) = field_column.get_data(row_index) {
272                    // ignore all NaN values to reduce the amount of data to be sent.
273                    if v.is_nan() {
274                        continue;
275                    }
276
277                    // retrieve tags
278                    // TODO(ruihang): push table name `__metric__`
279                    let mut tags = Vec::with_capacity(num_label_columns + 1);
280                    tags.push(metric_name);
281                    for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
282                        // TODO(ruihang): add test for NULL tag
283                        if let Some(tag_value) = tag_column.get_data(row_index) {
284                            tags.push((tag_name, tag_value));
285                        }
286                    }
287
288                    // retrieve timestamp
289                    let timestamp_millis: i64 =
290                        timestamp_column.get_data(row_index).unwrap().into();
291                    let timestamp = timestamp_millis as f64 / 1000.0;
292
293                    buffer
294                        .entry(tags)
295                        .or_default()
296                        .push((timestamp, Into::<f64>::into(v).to_string()));
297                };
298            }
299        }
300
301        // initialize result to return
302        let mut result = match result_type {
303            ValueType::Vector => PromQueryResult::Vector(vec![]),
304            ValueType::Matrix => PromQueryResult::Matrix(vec![]),
305            ValueType::Scalar => PromQueryResult::Scalar(None),
306            ValueType::String => PromQueryResult::String(None),
307        };
308
309        // accumulate data into result
310        buffer.into_iter().for_each(|(tags, mut values)| {
311            let metric = tags
312                .into_iter()
313                .map(|(k, v)| (k.to_string(), v.to_string()))
314                .collect::<HashMap<_, _>>();
315            match result {
316                PromQueryResult::Vector(ref mut v) => {
317                    v.push(PromSeriesVector {
318                        metric,
319                        value: values.pop(),
320                    });
321                }
322                PromQueryResult::Matrix(ref mut v) => {
323                    v.push(PromSeriesMatrix { metric, values });
324                }
325                PromQueryResult::Scalar(ref mut v) => {
326                    *v = values.pop();
327                }
328                PromQueryResult::String(ref mut _v) => {
329                    // TODO(ruihang): Not supported yet
330                }
331            }
332        });
333
334        let result_type_string = result_type.to_string();
335        let data = PrometheusResponse::PromData(PromData {
336            result_type: result_type_string,
337            result,
338        });
339
340        Ok(data)
341    }
342}