1use std::cmp::Ordering;
17use std::collections::{BTreeMap, HashMap};
18
19use axum::http::HeaderValue;
20use axum::response::{IntoResponse, Response};
21use axum::Json;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_query::{Output, OutputData};
25use common_recordbatch::RecordBatches;
26use datatypes::prelude::ConcreteDataType;
27use datatypes::scalars::ScalarVector;
28use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
29use indexmap::IndexMap;
30use promql_parser::label::METRIC_NAME;
31use promql_parser::parser::value::ValueType;
32use serde::{Deserialize, Serialize};
33use serde_json::Value;
34use snafu::{OptionExt, ResultExt};
35
36use crate::error::{
37 status_code_to_http_status, CollectRecordbatchSnafu, Result, UnexpectedResultSnafu,
38};
39use crate::http::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS};
40use crate::http::prometheus::{
41 PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusResponse,
42};
43
44#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
45pub struct PrometheusJsonResponse {
46 pub status: String,
47 #[serde(skip_serializing_if = "PrometheusResponse::is_none")]
48 #[serde(default)]
49 pub data: PrometheusResponse,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub error: Option<String>,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 #[serde(rename = "errorType")]
54 pub error_type: Option<String>,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pub warnings: Option<Vec<String>>,
57
58 #[serde(skip)]
59 pub status_code: Option<StatusCode>,
60 #[serde(skip)]
62 #[serde(default)]
63 pub resp_metrics: HashMap<String, Value>,
64}
65
66impl IntoResponse for PrometheusJsonResponse {
67 fn into_response(self) -> Response {
68 let metrics = if self.resp_metrics.is_empty() {
69 None
70 } else {
71 serde_json::to_string(&self.resp_metrics).ok()
72 };
73
74 let http_code = self.status_code.map(|c| status_code_to_http_status(&c));
75
76 let mut resp = Json(self).into_response();
77
78 if let Some(http_code) = http_code {
79 *resp.status_mut() = http_code;
80 }
81
82 if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
83 resp.headers_mut().insert(&GREPTIME_DB_HEADER_METRICS, m);
84 }
85
86 resp
87 }
88}
89
90impl PrometheusJsonResponse {
91 pub fn error<S1>(error_type: StatusCode, reason: S1) -> Self
92 where
93 S1: Into<String>,
94 {
95 PrometheusJsonResponse {
96 status: "error".to_string(),
97 data: PrometheusResponse::None,
98 error: Some(reason.into()),
99 error_type: Some(error_type.to_string()),
100 warnings: None,
101 resp_metrics: Default::default(),
102 status_code: Some(error_type),
103 }
104 }
105
106 pub fn success(data: PrometheusResponse) -> Self {
107 PrometheusJsonResponse {
108 status: "success".to_string(),
109 data,
110 error: None,
111 error_type: None,
112 warnings: None,
113 resp_metrics: Default::default(),
114 status_code: None,
115 }
116 }
117
118 pub async fn from_query_result(
120 result: Result<Output>,
121 metric_name: Option<String>,
122 result_type: ValueType,
123 ) -> Self {
124 let response: Result<Self> = try {
125 let result = result?;
126 let mut resp =
127 match result.data {
128 OutputData::RecordBatches(batches) => Self::success(
129 Self::record_batches_to_data(batches, metric_name, result_type)?,
130 ),
131 OutputData::Stream(stream) => {
132 let record_batches = RecordBatches::try_collect(stream)
133 .await
134 .context(CollectRecordbatchSnafu)?;
135 Self::success(Self::record_batches_to_data(
136 record_batches,
137 metric_name,
138 result_type,
139 )?)
140 }
141 OutputData::AffectedRows(_) => Self::error(
142 StatusCode::Unexpected,
143 "expected data result, but got affected rows",
144 ),
145 };
146
147 if let Some(physical_plan) = result.meta.plan {
148 let mut result_map = HashMap::new();
149 let mut tmp = vec![&mut result_map];
150 collect_plan_metrics(&physical_plan, &mut tmp);
151
152 let re = result_map
153 .into_iter()
154 .map(|(k, v)| (k, Value::from(v)))
155 .collect();
156 resp.resp_metrics = re;
157 }
158
159 resp
160 };
161
162 let result_type_string = result_type.to_string();
163
164 match response {
165 Ok(resp) => resp,
166 Err(err) => {
167 if err.status_code() == StatusCode::TableNotFound
169 || err.status_code() == StatusCode::TableColumnNotFound
170 {
171 Self::success(PrometheusResponse::PromData(PromData {
172 result_type: result_type_string,
173 ..Default::default()
174 }))
175 } else {
176 Self::error(err.status_code(), err.output_msg())
177 }
178 }
179 }
180 }
181
182 fn record_batches_to_data(
184 batches: RecordBatches,
185 metric_name: Option<String>,
186 result_type: ValueType,
187 ) -> Result<PrometheusResponse> {
188 if batches.iter().next().is_none() {
190 return Ok(PrometheusResponse::PromData(PromData {
191 result_type: result_type.to_string(),
192 ..Default::default()
193 }));
194 }
195
196 let mut timestamp_column_index = None;
199 let mut tag_column_indices = Vec::new();
200 let mut first_field_column_index = None;
201
202 let mut num_label_columns = 0;
203
204 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
205 match column.data_type {
206 ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => {
207 if timestamp_column_index.is_none() {
208 timestamp_column_index = Some(i);
209 }
210 }
211 ConcreteDataType::Float32(_)
213 | ConcreteDataType::Float64(_)
214 | ConcreteDataType::Int8(_)
215 | ConcreteDataType::Int16(_)
216 | ConcreteDataType::Int32(_)
217 | ConcreteDataType::Int64(_)
218 | ConcreteDataType::UInt8(_)
219 | ConcreteDataType::UInt16(_)
220 | ConcreteDataType::UInt32(_)
221 | ConcreteDataType::UInt64(_) => {
222 if first_field_column_index.is_none() {
223 first_field_column_index = Some(i);
224 }
225 }
226 ConcreteDataType::String(_) => {
227 tag_column_indices.push(i);
228 num_label_columns += 1;
229 }
230 _ => {}
231 }
232 }
233
234 let timestamp_column_index = timestamp_column_index.context(UnexpectedResultSnafu {
235 reason: "no timestamp column found".to_string(),
236 })?;
237 let first_field_column_index = first_field_column_index.context(UnexpectedResultSnafu {
238 reason: "no value column found".to_string(),
239 })?;
240
241 let mut buffer = IndexMap::<Vec<(&str, &str)>, Vec<(f64, String)>>::new();
244
245 let schema = batches.schema();
246 for batch in batches.iter() {
247 let tag_columns = tag_column_indices
249 .iter()
250 .map(|i| {
251 batch
252 .column(*i)
253 .as_any()
254 .downcast_ref::<StringVector>()
255 .unwrap()
256 })
257 .collect::<Vec<_>>();
258 let tag_names = tag_column_indices
259 .iter()
260 .map(|c| schema.column_name_by_index(*c))
261 .collect::<Vec<_>>();
262 let timestamp_column = batch
263 .column(timestamp_column_index)
264 .as_any()
265 .downcast_ref::<TimestampMillisecondVector>()
266 .unwrap();
267 let casted_field_column = batch
268 .column(first_field_column_index)
269 .cast(&ConcreteDataType::float64_datatype())
270 .unwrap();
271 let field_column = casted_field_column
272 .as_any()
273 .downcast_ref::<Float64Vector>()
274 .unwrap();
275
276 for row_index in 0..batch.num_rows() {
278 if let Some(v) = field_column.get_data(row_index) {
280 if v.is_nan() {
282 continue;
283 }
284
285 let mut tags = Vec::with_capacity(num_label_columns + 1);
287 if let Some(metric_name) = &metric_name {
288 tags.push((METRIC_NAME, metric_name.as_str()));
289 }
290 for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
291 if let Some(tag_value) = tag_column.get_data(row_index) {
293 tags.push((tag_name, tag_value));
294 }
295 }
296
297 let timestamp_millis: i64 =
299 timestamp_column.get_data(row_index).unwrap().into();
300 let timestamp = timestamp_millis as f64 / 1000.0;
301
302 buffer
303 .entry(tags)
304 .or_default()
305 .push((timestamp, Into::<f64>::into(v).to_string()));
306 };
307 }
308 }
309
310 let mut result = match result_type {
312 ValueType::Vector => PromQueryResult::Vector(vec![]),
313 ValueType::Matrix => PromQueryResult::Matrix(vec![]),
314 ValueType::Scalar => PromQueryResult::Scalar(None),
315 ValueType::String => PromQueryResult::String(None),
316 };
317
318 buffer.into_iter().for_each(|(tags, mut values)| {
320 let metric = tags
321 .into_iter()
322 .map(|(k, v)| (k.to_string(), v.to_string()))
323 .collect::<BTreeMap<_, _>>();
324 match result {
325 PromQueryResult::Vector(ref mut v) => {
326 v.push(PromSeriesVector {
327 metric,
328 value: values.pop(),
329 });
330 }
331 PromQueryResult::Matrix(ref mut v) => {
332 if !values.is_sorted_by(|a, b| a.0 <= b.0) {
334 values.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
335 }
336
337 v.push(PromSeriesMatrix { metric, values });
338 }
339 PromQueryResult::Scalar(ref mut v) => {
340 *v = values.pop();
341 }
342 PromQueryResult::String(ref mut _v) => {
343 }
345 }
346 });
347
348 if let PromQueryResult::Matrix(ref mut v) = result {
351 v.sort_by(|a, b| a.metric.cmp(&b.metric));
352 }
353
354 let result_type_string = result_type.to_string();
355 let data = PrometheusResponse::PromData(PromData {
356 result_type: result_type_string,
357 result,
358 });
359
360 Ok(data)
361 }
362}