servers/http/result/
prometheus_resp.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16use std::cmp::Ordering;
17use std::collections::{BTreeMap, HashMap};
18
19use arrow::array::{Array, AsArray};
20use arrow::datatypes::{Float64Type, TimestampMillisecondType};
21use arrow_schema::DataType;
22use axum::Json;
23use axum::http::HeaderValue;
24use axum::response::{IntoResponse, Response};
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_query::{Output, OutputData};
28use common_recordbatch::RecordBatches;
29use datatypes::prelude::ConcreteDataType;
30use indexmap::IndexMap;
31use promql_parser::label::METRIC_NAME;
32use promql_parser::parser::value::ValueType;
33use serde::{Deserialize, Serialize};
34use serde_json::Value;
35use snafu::{OptionExt, ResultExt};
36
37use crate::error::{
38    ArrowSnafu, CollectRecordbatchSnafu, Result, UnexpectedResultSnafu, status_code_to_http_status,
39};
40use crate::http::header::{GREPTIME_DB_HEADER_METRICS, collect_plan_metrics};
41use crate::http::prometheus::{
42    PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusResponse,
43};
44
45#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
46pub struct PrometheusJsonResponse {
47    pub status: String,
48    #[serde(skip_serializing_if = "PrometheusResponse::is_none")]
49    #[serde(default)]
50    pub data: PrometheusResponse,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub error: Option<String>,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    #[serde(rename = "errorType")]
55    pub error_type: Option<String>,
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub warnings: Option<Vec<String>>,
58
59    #[serde(skip)]
60    pub status_code: Option<StatusCode>,
61    // placeholder for header value
62    #[serde(skip)]
63    #[serde(default)]
64    pub resp_metrics: HashMap<String, Value>,
65}
66
67impl IntoResponse for PrometheusJsonResponse {
68    fn into_response(self) -> Response {
69        let metrics = if self.resp_metrics.is_empty() {
70            None
71        } else {
72            serde_json::to_string(&self.resp_metrics).ok()
73        };
74
75        let http_code = self.status_code.map(|c| status_code_to_http_status(&c));
76
77        let mut resp = Json(self).into_response();
78
79        if let Some(http_code) = http_code {
80            *resp.status_mut() = http_code;
81        }
82
83        if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
84            resp.headers_mut().insert(&GREPTIME_DB_HEADER_METRICS, m);
85        }
86
87        resp
88    }
89}
90
91impl PrometheusJsonResponse {
92    pub fn error<S1>(error_type: StatusCode, reason: S1) -> Self
93    where
94        S1: Into<String>,
95    {
96        PrometheusJsonResponse {
97            status: "error".to_string(),
98            data: PrometheusResponse::None,
99            error: Some(reason.into()),
100            error_type: Some(error_type.to_string()),
101            warnings: None,
102            resp_metrics: Default::default(),
103            status_code: Some(error_type),
104        }
105    }
106
107    pub fn success(data: PrometheusResponse) -> Self {
108        PrometheusJsonResponse {
109            status: "success".to_string(),
110            data,
111            error: None,
112            error_type: None,
113            warnings: None,
114            resp_metrics: Default::default(),
115            status_code: None,
116        }
117    }
118
119    /// Convert from `Result<Output>`
120    pub async fn from_query_result(
121        result: Result<Output>,
122        metric_name: Option<String>,
123        result_type: ValueType,
124    ) -> Self {
125        let response: Result<Self> = try {
126            let result = result?;
127            let mut resp =
128                match result.data {
129                    OutputData::RecordBatches(batches) => Self::success(
130                        Self::record_batches_to_data(batches, metric_name, result_type)?,
131                    ),
132                    OutputData::Stream(stream) => {
133                        let record_batches = RecordBatches::try_collect(stream)
134                            .await
135                            .context(CollectRecordbatchSnafu)?;
136                        Self::success(Self::record_batches_to_data(
137                            record_batches,
138                            metric_name,
139                            result_type,
140                        )?)
141                    }
142                    OutputData::AffectedRows(_) => Self::error(
143                        StatusCode::Unexpected,
144                        "expected data result, but got affected rows",
145                    ),
146                };
147
148            if let Some(physical_plan) = result.meta.plan {
149                let mut result_map = HashMap::new();
150                let mut tmp = vec![&mut result_map];
151                collect_plan_metrics(&physical_plan, &mut tmp);
152
153                let re = result_map
154                    .into_iter()
155                    .map(|(k, v)| (k, Value::from(v)))
156                    .collect();
157                resp.resp_metrics = re;
158            }
159
160            resp
161        };
162
163        let result_type_string = result_type.to_string();
164
165        match response {
166            Ok(resp) => resp,
167            Err(err) => {
168                // Prometheus won't report error if querying nonexist label and metric
169                if err.status_code() == StatusCode::TableNotFound
170                    || err.status_code() == StatusCode::TableColumnNotFound
171                {
172                    Self::success(PrometheusResponse::PromData(PromData {
173                        result_type: result_type_string,
174                        ..Default::default()
175                    }))
176                } else {
177                    Self::error(err.status_code(), err.output_msg())
178                }
179            }
180        }
181    }
182
183    /// Convert [RecordBatches] to [PromData]
184    fn record_batches_to_data(
185        batches: RecordBatches,
186        metric_name: Option<String>,
187        result_type: ValueType,
188    ) -> Result<PrometheusResponse> {
189        // Return empty result if no batches
190        if batches.iter().next().is_none() {
191            return Ok(PrometheusResponse::PromData(PromData {
192                result_type: result_type.to_string(),
193                ..Default::default()
194            }));
195        }
196
197        // infer semantic type of each column from schema.
198        // TODO(ruihang): wish there is a better way to do this.
199        let mut timestamp_column_index = None;
200        let mut tag_column_indices = Vec::new();
201        let mut first_field_column_index = None;
202
203        let mut num_label_columns = 0;
204
205        for (i, column) in batches.schema().column_schemas().iter().enumerate() {
206            match column.data_type {
207                ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => {
208                    if timestamp_column_index.is_none() {
209                        timestamp_column_index = Some(i);
210                    }
211                }
212                // Treat all value types as field
213                ConcreteDataType::Float32(_)
214                | ConcreteDataType::Float64(_)
215                | ConcreteDataType::Int8(_)
216                | ConcreteDataType::Int16(_)
217                | ConcreteDataType::Int32(_)
218                | ConcreteDataType::Int64(_)
219                | ConcreteDataType::UInt8(_)
220                | ConcreteDataType::UInt16(_)
221                | ConcreteDataType::UInt32(_)
222                | ConcreteDataType::UInt64(_) => {
223                    if first_field_column_index.is_none() {
224                        first_field_column_index = Some(i);
225                    }
226                }
227                ConcreteDataType::String(_) => {
228                    tag_column_indices.push(i);
229                    num_label_columns += 1;
230                }
231                _ => {}
232            }
233        }
234
235        let timestamp_column_index = timestamp_column_index.context(UnexpectedResultSnafu {
236            reason: "no timestamp column found".to_string(),
237        })?;
238        let first_field_column_index = first_field_column_index.context(UnexpectedResultSnafu {
239            reason: "no value column found".to_string(),
240        })?;
241
242        // Preserves the order of output tags.
243        // Tag order matters, e.g., after sorc and sort_desc, the output order must be kept.
244        let mut buffer = IndexMap::<Vec<(&str, &str)>, Vec<(f64, String)>>::new();
245
246        let schema = batches.schema();
247        for batch in batches.iter() {
248            // prepare things...
249            let tag_columns = tag_column_indices
250                .iter()
251                .map(|i| batch.column(*i).as_string::<i32>())
252                .collect::<Vec<_>>();
253            let tag_names = tag_column_indices
254                .iter()
255                .map(|c| schema.column_name_by_index(*c))
256                .collect::<Vec<_>>();
257            let timestamp_column = batch
258                .column(timestamp_column_index)
259                .as_primitive::<TimestampMillisecondType>();
260
261            let array =
262                arrow::compute::cast(batch.column(first_field_column_index), &DataType::Float64)
263                    .context(ArrowSnafu)?;
264            let field_column = array.as_primitive::<Float64Type>();
265
266            // assemble rows
267            for row_index in 0..batch.num_rows() {
268                // retrieve value
269                if field_column.is_valid(row_index) {
270                    let v = field_column.value(row_index);
271                    // ignore all NaN values to reduce the amount of data to be sent.
272                    if v.is_nan() {
273                        continue;
274                    }
275
276                    // retrieve tags
277                    let mut tags = Vec::with_capacity(num_label_columns + 1);
278                    if let Some(metric_name) = &metric_name {
279                        tags.push((METRIC_NAME, metric_name.as_str()));
280                    }
281                    for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
282                        // TODO(ruihang): add test for NULL tag
283                        if tag_column.is_valid(row_index) {
284                            tags.push((tag_name, tag_column.value(row_index)));
285                        }
286                    }
287
288                    // retrieve timestamp
289                    let timestamp_millis = timestamp_column.value(row_index);
290                    let timestamp = timestamp_millis as f64 / 1000.0;
291
292                    buffer
293                        .entry(tags)
294                        .or_default()
295                        .push((timestamp, Into::<f64>::into(v).to_string()));
296                };
297            }
298        }
299
300        // initialize result to return
301        let mut result = match result_type {
302            ValueType::Vector => PromQueryResult::Vector(vec![]),
303            ValueType::Matrix => PromQueryResult::Matrix(vec![]),
304            ValueType::Scalar => PromQueryResult::Scalar(None),
305            ValueType::String => PromQueryResult::String(None),
306        };
307
308        // accumulate data into result
309        buffer.into_iter().for_each(|(tags, mut values)| {
310            let metric = tags
311                .into_iter()
312                .map(|(k, v)| (k.to_string(), v.to_string()))
313                .collect::<BTreeMap<_, _>>();
314            match result {
315                PromQueryResult::Vector(ref mut v) => {
316                    v.push(PromSeriesVector {
317                        metric,
318                        value: values.pop(),
319                    });
320                }
321                PromQueryResult::Matrix(ref mut v) => {
322                    // sort values by timestamp
323                    if !values.is_sorted_by(|a, b| a.0 <= b.0) {
324                        values.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
325                    }
326
327                    v.push(PromSeriesMatrix { metric, values });
328                }
329                PromQueryResult::Scalar(ref mut v) => {
330                    *v = values.pop();
331                }
332                PromQueryResult::String(ref mut _v) => {
333                    // TODO(ruihang): Not supported yet
334                }
335            }
336        });
337
338        // sort matrix by metric
339        // see: https://prometheus.io/docs/prometheus/3.5/querying/api/#range-vectors
340        if let PromQueryResult::Matrix(ref mut v) = result {
341            v.sort_by(|a, b| a.metric.cmp(&b.metric));
342        }
343
344        let result_type_string = result_type.to_string();
345        let data = PrometheusResponse::PromData(PromData {
346            result_type: result_type_string,
347            result,
348        });
349
350        Ok(data)
351    }
352}