1use std::cmp::Ordering;
17use std::collections::{BTreeMap, HashMap};
18
19use axum::http::HeaderValue;
20use axum::response::{IntoResponse, Response};
21use axum::Json;
22use common_error::ext::ErrorExt;
23use common_error::status_code::StatusCode;
24use common_query::{Output, OutputData};
25use common_recordbatch::RecordBatches;
26use datatypes::prelude::ConcreteDataType;
27use datatypes::scalars::ScalarVector;
28use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
29use indexmap::IndexMap;
30use promql_parser::label::METRIC_NAME;
31use promql_parser::parser::value::ValueType;
32use serde::{Deserialize, Serialize};
33use serde_json::Value;
34use snafu::{OptionExt, ResultExt};
35
36use crate::error::{
37 status_code_to_http_status, CollectRecordbatchSnafu, Result, UnexpectedResultSnafu,
38};
39use crate::http::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS};
40use crate::http::prometheus::{
41 PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusResponse,
42};
43
44#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
45pub struct PrometheusJsonResponse {
46 pub status: String,
47 #[serde(skip_serializing_if = "PrometheusResponse::is_none")]
48 #[serde(default)]
49 pub data: PrometheusResponse,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub error: Option<String>,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 #[serde(rename = "errorType")]
54 pub error_type: Option<String>,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pub warnings: Option<Vec<String>>,
57
58 #[serde(skip)]
59 pub status_code: Option<StatusCode>,
60 #[serde(skip)]
62 #[serde(default)]
63 pub resp_metrics: HashMap<String, Value>,
64}
65
66impl IntoResponse for PrometheusJsonResponse {
67 fn into_response(self) -> Response {
68 let metrics = if self.resp_metrics.is_empty() {
69 None
70 } else {
71 serde_json::to_string(&self.resp_metrics).ok()
72 };
73
74 let http_code = self.status_code.map(|c| status_code_to_http_status(&c));
75
76 let mut resp = Json(self).into_response();
77
78 if let Some(http_code) = http_code {
79 *resp.status_mut() = http_code;
80 }
81
82 if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
83 resp.headers_mut().insert(&GREPTIME_DB_HEADER_METRICS, m);
84 }
85
86 resp
87 }
88}
89
90impl PrometheusJsonResponse {
91 pub fn error<S1>(error_type: StatusCode, reason: S1) -> Self
92 where
93 S1: Into<String>,
94 {
95 PrometheusJsonResponse {
96 status: "error".to_string(),
97 data: PrometheusResponse::None,
98 error: Some(reason.into()),
99 error_type: Some(error_type.to_string()),
100 warnings: None,
101 resp_metrics: Default::default(),
102 status_code: Some(error_type),
103 }
104 }
105
106 pub fn success(data: PrometheusResponse) -> Self {
107 PrometheusJsonResponse {
108 status: "success".to_string(),
109 data,
110 error: None,
111 error_type: None,
112 warnings: None,
113 resp_metrics: Default::default(),
114 status_code: None,
115 }
116 }
117
118 pub async fn from_query_result(
120 result: Result<Output>,
121 metric_name: String,
122 result_type: ValueType,
123 ) -> Self {
124 let response: Result<Self> = try {
125 let result = result?;
126 let mut resp =
127 match result.data {
128 OutputData::RecordBatches(batches) => Self::success(
129 Self::record_batches_to_data(batches, metric_name, result_type)?,
130 ),
131 OutputData::Stream(stream) => {
132 let record_batches = RecordBatches::try_collect(stream)
133 .await
134 .context(CollectRecordbatchSnafu)?;
135 Self::success(Self::record_batches_to_data(
136 record_batches,
137 metric_name,
138 result_type,
139 )?)
140 }
141 OutputData::AffectedRows(_) => Self::error(
142 StatusCode::Unexpected,
143 "expected data result, but got affected rows",
144 ),
145 };
146
147 if let Some(physical_plan) = result.meta.plan {
148 let mut result_map = HashMap::new();
149 let mut tmp = vec![&mut result_map];
150 collect_plan_metrics(&physical_plan, &mut tmp);
151
152 let re = result_map
153 .into_iter()
154 .map(|(k, v)| (k, Value::from(v)))
155 .collect();
156 resp.resp_metrics = re;
157 }
158
159 resp
160 };
161
162 let result_type_string = result_type.to_string();
163
164 match response {
165 Ok(resp) => resp,
166 Err(err) => {
167 if err.status_code() == StatusCode::TableNotFound
169 || err.status_code() == StatusCode::TableColumnNotFound
170 {
171 Self::success(PrometheusResponse::PromData(PromData {
172 result_type: result_type_string,
173 ..Default::default()
174 }))
175 } else {
176 Self::error(err.status_code(), err.output_msg())
177 }
178 }
179 }
180 }
181
182 fn record_batches_to_data(
184 batches: RecordBatches,
185 metric_name: String,
186 result_type: ValueType,
187 ) -> Result<PrometheusResponse> {
188 let mut timestamp_column_index = None;
191 let mut tag_column_indices = Vec::new();
192 let mut first_field_column_index = None;
193
194 let mut num_label_columns = 0;
195
196 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
197 match column.data_type {
198 ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => {
199 if timestamp_column_index.is_none() {
200 timestamp_column_index = Some(i);
201 }
202 }
203 ConcreteDataType::Float32(_)
205 | ConcreteDataType::Float64(_)
206 | ConcreteDataType::Int8(_)
207 | ConcreteDataType::Int16(_)
208 | ConcreteDataType::Int32(_)
209 | ConcreteDataType::Int64(_)
210 | ConcreteDataType::UInt8(_)
211 | ConcreteDataType::UInt16(_)
212 | ConcreteDataType::UInt32(_)
213 | ConcreteDataType::UInt64(_) => {
214 if first_field_column_index.is_none() {
215 first_field_column_index = Some(i);
216 }
217 }
218 ConcreteDataType::String(_) => {
219 tag_column_indices.push(i);
220 num_label_columns += 1;
221 }
222 _ => {}
223 }
224 }
225
226 let timestamp_column_index = timestamp_column_index.context(UnexpectedResultSnafu {
227 reason: "no timestamp column found".to_string(),
228 })?;
229 let first_field_column_index = first_field_column_index.context(UnexpectedResultSnafu {
230 reason: "no value column found".to_string(),
231 })?;
232
233 let metric_name = (METRIC_NAME, metric_name.as_str());
234 let mut buffer = IndexMap::<Vec<(&str, &str)>, Vec<(f64, String)>>::new();
237
238 let schema = batches.schema();
239 for batch in batches.iter() {
240 let tag_columns = tag_column_indices
242 .iter()
243 .map(|i| {
244 batch
245 .column(*i)
246 .as_any()
247 .downcast_ref::<StringVector>()
248 .unwrap()
249 })
250 .collect::<Vec<_>>();
251 let tag_names = tag_column_indices
252 .iter()
253 .map(|c| schema.column_name_by_index(*c))
254 .collect::<Vec<_>>();
255 let timestamp_column = batch
256 .column(timestamp_column_index)
257 .as_any()
258 .downcast_ref::<TimestampMillisecondVector>()
259 .unwrap();
260 let casted_field_column = batch
261 .column(first_field_column_index)
262 .cast(&ConcreteDataType::float64_datatype())
263 .unwrap();
264 let field_column = casted_field_column
265 .as_any()
266 .downcast_ref::<Float64Vector>()
267 .unwrap();
268
269 for row_index in 0..batch.num_rows() {
271 if let Some(v) = field_column.get_data(row_index) {
273 if v.is_nan() {
275 continue;
276 }
277
278 let mut tags = Vec::with_capacity(num_label_columns + 1);
281 tags.push(metric_name);
282 for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
283 if let Some(tag_value) = tag_column.get_data(row_index) {
285 tags.push((tag_name, tag_value));
286 }
287 }
288
289 let timestamp_millis: i64 =
291 timestamp_column.get_data(row_index).unwrap().into();
292 let timestamp = timestamp_millis as f64 / 1000.0;
293
294 buffer
295 .entry(tags)
296 .or_default()
297 .push((timestamp, Into::<f64>::into(v).to_string()));
298 };
299 }
300 }
301
302 let mut result = match result_type {
304 ValueType::Vector => PromQueryResult::Vector(vec![]),
305 ValueType::Matrix => PromQueryResult::Matrix(vec![]),
306 ValueType::Scalar => PromQueryResult::Scalar(None),
307 ValueType::String => PromQueryResult::String(None),
308 };
309
310 buffer.into_iter().for_each(|(tags, mut values)| {
312 let metric = tags
313 .into_iter()
314 .map(|(k, v)| (k.to_string(), v.to_string()))
315 .collect::<BTreeMap<_, _>>();
316 match result {
317 PromQueryResult::Vector(ref mut v) => {
318 v.push(PromSeriesVector {
319 metric,
320 value: values.pop(),
321 });
322 }
323 PromQueryResult::Matrix(ref mut v) => {
324 if !values.is_sorted_by(|a, b| a.0 <= b.0) {
326 values.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
327 }
328
329 v.push(PromSeriesMatrix { metric, values });
330 }
331 PromQueryResult::Scalar(ref mut v) => {
332 *v = values.pop();
333 }
334 PromQueryResult::String(ref mut _v) => {
335 }
337 }
338 });
339
340 if let PromQueryResult::Matrix(ref mut v) = result {
343 v.sort_by(|a, b| a.metric.cmp(&b.metric));
344 }
345
346 let result_type_string = result_type.to_string();
347 let data = PrometheusResponse::PromData(PromData {
348 result_type: result_type_string,
349 result,
350 });
351
352 Ok(data)
353 }
354}