1use std::cmp::Ordering;
17use std::collections::{BTreeMap, HashMap};
18
19use arrow::array::{Array, AsArray};
20use arrow::datatypes::{Float64Type, TimestampMillisecondType};
21use arrow_schema::DataType;
22use axum::Json;
23use axum::http::HeaderValue;
24use axum::response::{IntoResponse, Response};
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_query::{Output, OutputData};
28use common_recordbatch::RecordBatches;
29use datatypes::prelude::ConcreteDataType;
30use indexmap::IndexMap;
31use promql_parser::label::METRIC_NAME;
32use promql_parser::parser::value::ValueType;
33use serde::{Deserialize, Serialize};
34use serde_json::Value;
35use snafu::{OptionExt, ResultExt};
36
37use crate::error::{
38 ArrowSnafu, CollectRecordbatchSnafu, Result, UnexpectedResultSnafu, status_code_to_http_status,
39};
40use crate::http::header::{GREPTIME_DB_HEADER_METRICS, collect_plan_metrics};
41use crate::http::prometheus::{
42 PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusResponse,
43};
44
45#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
46pub struct PrometheusJsonResponse {
47 pub status: String,
48 #[serde(skip_serializing_if = "PrometheusResponse::is_none")]
49 #[serde(default)]
50 pub data: PrometheusResponse,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub error: Option<String>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 #[serde(rename = "errorType")]
55 pub error_type: Option<String>,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub warnings: Option<Vec<String>>,
58
59 #[serde(skip)]
60 pub status_code: Option<StatusCode>,
61 #[serde(skip)]
63 #[serde(default)]
64 pub resp_metrics: HashMap<String, Value>,
65}
66
67impl IntoResponse for PrometheusJsonResponse {
68 fn into_response(self) -> Response {
69 let metrics = if self.resp_metrics.is_empty() {
70 None
71 } else {
72 serde_json::to_string(&self.resp_metrics).ok()
73 };
74
75 let http_code = self.status_code.map(|c| status_code_to_http_status(&c));
76
77 let mut resp = Json(self).into_response();
78
79 if let Some(http_code) = http_code {
80 *resp.status_mut() = http_code;
81 }
82
83 if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
84 resp.headers_mut().insert(&GREPTIME_DB_HEADER_METRICS, m);
85 }
86
87 resp
88 }
89}
90
91impl PrometheusJsonResponse {
92 pub fn error<S1>(error_type: StatusCode, reason: S1) -> Self
93 where
94 S1: Into<String>,
95 {
96 PrometheusJsonResponse {
97 status: "error".to_string(),
98 data: PrometheusResponse::None,
99 error: Some(reason.into()),
100 error_type: Some(error_type.to_string()),
101 warnings: None,
102 resp_metrics: Default::default(),
103 status_code: Some(error_type),
104 }
105 }
106
107 pub fn success(data: PrometheusResponse) -> Self {
108 PrometheusJsonResponse {
109 status: "success".to_string(),
110 data,
111 error: None,
112 error_type: None,
113 warnings: None,
114 resp_metrics: Default::default(),
115 status_code: None,
116 }
117 }
118
119 pub async fn from_query_result(
121 result: Result<Output>,
122 metric_name: Option<String>,
123 result_type: ValueType,
124 ) -> Self {
125 let response: Result<Self> = try {
126 let result = result?;
127 let mut resp =
128 match result.data {
129 OutputData::RecordBatches(batches) => Self::success(
130 Self::record_batches_to_data(batches, metric_name, result_type)?,
131 ),
132 OutputData::Stream(stream) => {
133 let record_batches = RecordBatches::try_collect(stream)
134 .await
135 .context(CollectRecordbatchSnafu)?;
136 Self::success(Self::record_batches_to_data(
137 record_batches,
138 metric_name,
139 result_type,
140 )?)
141 }
142 OutputData::AffectedRows(_) => Self::error(
143 StatusCode::Unexpected,
144 "expected data result, but got affected rows",
145 ),
146 };
147
148 if let Some(physical_plan) = result.meta.plan {
149 let mut result_map = HashMap::new();
150 let mut tmp = vec![&mut result_map];
151 collect_plan_metrics(&physical_plan, &mut tmp);
152
153 let re = result_map
154 .into_iter()
155 .map(|(k, v)| (k, Value::from(v)))
156 .collect();
157 resp.resp_metrics = re;
158 }
159
160 resp
161 };
162
163 let result_type_string = result_type.to_string();
164
165 match response {
166 Ok(resp) => resp,
167 Err(err) => {
168 if err.status_code() == StatusCode::TableNotFound
170 || err.status_code() == StatusCode::TableColumnNotFound
171 {
172 Self::success(PrometheusResponse::PromData(PromData {
173 result_type: result_type_string,
174 ..Default::default()
175 }))
176 } else {
177 Self::error(err.status_code(), err.output_msg())
178 }
179 }
180 }
181 }
182
183 fn record_batches_to_data(
185 batches: RecordBatches,
186 metric_name: Option<String>,
187 result_type: ValueType,
188 ) -> Result<PrometheusResponse> {
189 if batches.iter().next().is_none() {
191 return Ok(PrometheusResponse::PromData(PromData {
192 result_type: result_type.to_string(),
193 ..Default::default()
194 }));
195 }
196
197 let mut timestamp_column_index = None;
200 let mut tag_column_indices = Vec::new();
201 let mut first_field_column_index = None;
202
203 let mut num_label_columns = 0;
204
205 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
206 match column.data_type {
207 ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => {
208 if timestamp_column_index.is_none() {
209 timestamp_column_index = Some(i);
210 }
211 }
212 ConcreteDataType::Float32(_)
214 | ConcreteDataType::Float64(_)
215 | ConcreteDataType::Int8(_)
216 | ConcreteDataType::Int16(_)
217 | ConcreteDataType::Int32(_)
218 | ConcreteDataType::Int64(_)
219 | ConcreteDataType::UInt8(_)
220 | ConcreteDataType::UInt16(_)
221 | ConcreteDataType::UInt32(_)
222 | ConcreteDataType::UInt64(_) => {
223 if first_field_column_index.is_none() {
224 first_field_column_index = Some(i);
225 }
226 }
227 ConcreteDataType::String(_) => {
228 tag_column_indices.push(i);
229 num_label_columns += 1;
230 }
231 _ => {}
232 }
233 }
234
235 let timestamp_column_index = timestamp_column_index.context(UnexpectedResultSnafu {
236 reason: "no timestamp column found".to_string(),
237 })?;
238 let first_field_column_index = first_field_column_index.context(UnexpectedResultSnafu {
239 reason: "no value column found".to_string(),
240 })?;
241
242 let mut buffer = IndexMap::<Vec<(&str, &str)>, Vec<(f64, String)>>::new();
245
246 let schema = batches.schema();
247 for batch in batches.iter() {
248 let tag_columns = tag_column_indices
250 .iter()
251 .map(|i| batch.column(*i).as_string::<i32>())
252 .collect::<Vec<_>>();
253 let tag_names = tag_column_indices
254 .iter()
255 .map(|c| schema.column_name_by_index(*c))
256 .collect::<Vec<_>>();
257 let timestamp_column = batch
258 .column(timestamp_column_index)
259 .as_primitive::<TimestampMillisecondType>();
260
261 let array =
262 arrow::compute::cast(batch.column(first_field_column_index), &DataType::Float64)
263 .context(ArrowSnafu)?;
264 let field_column = array.as_primitive::<Float64Type>();
265
266 for row_index in 0..batch.num_rows() {
268 if field_column.is_valid(row_index) {
270 let v = field_column.value(row_index);
271 if v.is_nan() {
273 continue;
274 }
275
276 let mut tags = Vec::with_capacity(num_label_columns + 1);
278 if let Some(metric_name) = &metric_name {
279 tags.push((METRIC_NAME, metric_name.as_str()));
280 }
281 for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
282 if tag_column.is_valid(row_index) {
284 tags.push((tag_name, tag_column.value(row_index)));
285 }
286 }
287
288 let timestamp_millis = timestamp_column.value(row_index);
290 let timestamp = timestamp_millis as f64 / 1000.0;
291
292 buffer
293 .entry(tags)
294 .or_default()
295 .push((timestamp, Into::<f64>::into(v).to_string()));
296 };
297 }
298 }
299
300 let mut result = match result_type {
302 ValueType::Vector => PromQueryResult::Vector(vec![]),
303 ValueType::Matrix => PromQueryResult::Matrix(vec![]),
304 ValueType::Scalar => PromQueryResult::Scalar(None),
305 ValueType::String => PromQueryResult::String(None),
306 };
307
308 buffer.into_iter().for_each(|(tags, mut values)| {
310 let metric = tags
311 .into_iter()
312 .map(|(k, v)| (k.to_string(), v.to_string()))
313 .collect::<BTreeMap<_, _>>();
314 match result {
315 PromQueryResult::Vector(ref mut v) => {
316 v.push(PromSeriesVector {
317 metric,
318 value: values.pop(),
319 });
320 }
321 PromQueryResult::Matrix(ref mut v) => {
322 if !values.is_sorted_by(|a, b| a.0 <= b.0) {
324 values.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
325 }
326
327 v.push(PromSeriesMatrix { metric, values });
328 }
329 PromQueryResult::Scalar(ref mut v) => {
330 *v = values.pop();
331 }
332 PromQueryResult::String(ref mut _v) => {
333 }
335 }
336 });
337
338 if let PromQueryResult::Matrix(ref mut v) = result {
341 v.sort_by(|a, b| a.metric.cmp(&b.metric));
342 }
343
344 let result_type_string = result_type.to_string();
345 let data = PrometheusResponse::PromData(PromData {
346 result_type: result_type_string,
347 result,
348 });
349
350 Ok(data)
351 }
352}