1use std::collections::HashMap;
17
18use axum::http::HeaderValue;
19use axum::response::{IntoResponse, Response};
20use axum::Json;
21use common_error::ext::ErrorExt;
22use common_error::status_code::StatusCode;
23use common_query::{Output, OutputData};
24use common_recordbatch::RecordBatches;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::scalars::ScalarVector;
27use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
28use indexmap::IndexMap;
29use promql_parser::label::METRIC_NAME;
30use promql_parser::parser::value::ValueType;
31use serde::{Deserialize, Serialize};
32use serde_json::Value;
33use snafu::{OptionExt, ResultExt};
34
35use crate::error::{
36 status_code_to_http_status, CollectRecordbatchSnafu, Result, UnexpectedResultSnafu,
37};
38use crate::http::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS};
39use crate::http::prometheus::{
40 PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusResponse,
41};
42
43#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
44pub struct PrometheusJsonResponse {
45 pub status: String,
46 #[serde(skip_serializing_if = "PrometheusResponse::is_none")]
47 #[serde(default)]
48 pub data: PrometheusResponse,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub error: Option<String>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 #[serde(rename = "errorType")]
53 pub error_type: Option<String>,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub warnings: Option<Vec<String>>,
56
57 #[serde(skip)]
58 pub status_code: Option<StatusCode>,
59 #[serde(skip)]
61 #[serde(default)]
62 pub resp_metrics: HashMap<String, Value>,
63}
64
65impl IntoResponse for PrometheusJsonResponse {
66 fn into_response(self) -> Response {
67 let metrics = if self.resp_metrics.is_empty() {
68 None
69 } else {
70 serde_json::to_string(&self.resp_metrics).ok()
71 };
72
73 let http_code = self.status_code.map(|c| status_code_to_http_status(&c));
74
75 let mut resp = Json(self).into_response();
76
77 if let Some(http_code) = http_code {
78 *resp.status_mut() = http_code;
79 }
80
81 if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
82 resp.headers_mut().insert(&GREPTIME_DB_HEADER_METRICS, m);
83 }
84
85 resp
86 }
87}
88
89impl PrometheusJsonResponse {
90 pub fn error<S1>(error_type: StatusCode, reason: S1) -> Self
91 where
92 S1: Into<String>,
93 {
94 PrometheusJsonResponse {
95 status: "error".to_string(),
96 data: PrometheusResponse::None,
97 error: Some(reason.into()),
98 error_type: Some(error_type.to_string()),
99 warnings: None,
100 resp_metrics: Default::default(),
101 status_code: Some(error_type),
102 }
103 }
104
105 pub fn success(data: PrometheusResponse) -> Self {
106 PrometheusJsonResponse {
107 status: "success".to_string(),
108 data,
109 error: None,
110 error_type: None,
111 warnings: None,
112 resp_metrics: Default::default(),
113 status_code: None,
114 }
115 }
116
117 pub async fn from_query_result(
119 result: Result<Output>,
120 metric_name: String,
121 result_type: ValueType,
122 ) -> Self {
123 let response: Result<Self> = try {
124 let result = result?;
125 let mut resp =
126 match result.data {
127 OutputData::RecordBatches(batches) => Self::success(
128 Self::record_batches_to_data(batches, metric_name, result_type)?,
129 ),
130 OutputData::Stream(stream) => {
131 let record_batches = RecordBatches::try_collect(stream)
132 .await
133 .context(CollectRecordbatchSnafu)?;
134 Self::success(Self::record_batches_to_data(
135 record_batches,
136 metric_name,
137 result_type,
138 )?)
139 }
140 OutputData::AffectedRows(_) => Self::error(
141 StatusCode::Unexpected,
142 "expected data result, but got affected rows",
143 ),
144 };
145
146 if let Some(physical_plan) = result.meta.plan {
147 let mut result_map = HashMap::new();
148 let mut tmp = vec![&mut result_map];
149 collect_plan_metrics(&physical_plan, &mut tmp);
150
151 let re = result_map
152 .into_iter()
153 .map(|(k, v)| (k, Value::from(v)))
154 .collect();
155 resp.resp_metrics = re;
156 }
157
158 resp
159 };
160
161 let result_type_string = result_type.to_string();
162
163 match response {
164 Ok(resp) => resp,
165 Err(err) => {
166 if err.status_code() == StatusCode::TableNotFound
168 || err.status_code() == StatusCode::TableColumnNotFound
169 {
170 Self::success(PrometheusResponse::PromData(PromData {
171 result_type: result_type_string,
172 ..Default::default()
173 }))
174 } else {
175 Self::error(err.status_code(), err.output_msg())
176 }
177 }
178 }
179 }
180
181 fn record_batches_to_data(
183 batches: RecordBatches,
184 metric_name: String,
185 result_type: ValueType,
186 ) -> Result<PrometheusResponse> {
187 let mut timestamp_column_index = None;
190 let mut tag_column_indices = Vec::new();
191 let mut first_field_column_index = None;
192
193 let mut num_label_columns = 0;
194
195 for (i, column) in batches.schema().column_schemas().iter().enumerate() {
196 match column.data_type {
197 ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => {
198 if timestamp_column_index.is_none() {
199 timestamp_column_index = Some(i);
200 }
201 }
202 ConcreteDataType::Float32(_)
204 | ConcreteDataType::Float64(_)
205 | ConcreteDataType::Int8(_)
206 | ConcreteDataType::Int16(_)
207 | ConcreteDataType::Int32(_)
208 | ConcreteDataType::Int64(_)
209 | ConcreteDataType::UInt8(_)
210 | ConcreteDataType::UInt16(_)
211 | ConcreteDataType::UInt32(_)
212 | ConcreteDataType::UInt64(_) => {
213 if first_field_column_index.is_none() {
214 first_field_column_index = Some(i);
215 }
216 }
217 ConcreteDataType::String(_) => {
218 tag_column_indices.push(i);
219 num_label_columns += 1;
220 }
221 _ => {}
222 }
223 }
224
225 let timestamp_column_index = timestamp_column_index.context(UnexpectedResultSnafu {
226 reason: "no timestamp column found".to_string(),
227 })?;
228 let first_field_column_index = first_field_column_index.context(UnexpectedResultSnafu {
229 reason: "no value column found".to_string(),
230 })?;
231
232 let metric_name = (METRIC_NAME, metric_name.as_str());
233 let mut buffer = IndexMap::<Vec<(&str, &str)>, Vec<(f64, String)>>::new();
236
237 let schema = batches.schema();
238 for batch in batches.iter() {
239 let tag_columns = tag_column_indices
241 .iter()
242 .map(|i| {
243 batch
244 .column(*i)
245 .as_any()
246 .downcast_ref::<StringVector>()
247 .unwrap()
248 })
249 .collect::<Vec<_>>();
250 let tag_names = tag_column_indices
251 .iter()
252 .map(|c| schema.column_name_by_index(*c))
253 .collect::<Vec<_>>();
254 let timestamp_column = batch
255 .column(timestamp_column_index)
256 .as_any()
257 .downcast_ref::<TimestampMillisecondVector>()
258 .unwrap();
259 let casted_field_column = batch
260 .column(first_field_column_index)
261 .cast(&ConcreteDataType::float64_datatype())
262 .unwrap();
263 let field_column = casted_field_column
264 .as_any()
265 .downcast_ref::<Float64Vector>()
266 .unwrap();
267
268 for row_index in 0..batch.num_rows() {
270 if let Some(v) = field_column.get_data(row_index) {
272 if v.is_nan() {
274 continue;
275 }
276
277 let mut tags = Vec::with_capacity(num_label_columns + 1);
280 tags.push(metric_name);
281 for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
282 if let Some(tag_value) = tag_column.get_data(row_index) {
284 tags.push((tag_name, tag_value));
285 }
286 }
287
288 let timestamp_millis: i64 =
290 timestamp_column.get_data(row_index).unwrap().into();
291 let timestamp = timestamp_millis as f64 / 1000.0;
292
293 buffer
294 .entry(tags)
295 .or_default()
296 .push((timestamp, Into::<f64>::into(v).to_string()));
297 };
298 }
299 }
300
301 let mut result = match result_type {
303 ValueType::Vector => PromQueryResult::Vector(vec![]),
304 ValueType::Matrix => PromQueryResult::Matrix(vec![]),
305 ValueType::Scalar => PromQueryResult::Scalar(None),
306 ValueType::String => PromQueryResult::String(None),
307 };
308
309 buffer.into_iter().for_each(|(tags, mut values)| {
311 let metric = tags
312 .into_iter()
313 .map(|(k, v)| (k.to_string(), v.to_string()))
314 .collect::<HashMap<_, _>>();
315 match result {
316 PromQueryResult::Vector(ref mut v) => {
317 v.push(PromSeriesVector {
318 metric,
319 value: values.pop(),
320 });
321 }
322 PromQueryResult::Matrix(ref mut v) => {
323 v.push(PromSeriesMatrix { metric, values });
324 }
325 PromQueryResult::Scalar(ref mut v) => {
326 *v = values.pop();
327 }
328 PromQueryResult::String(ref mut _v) => {
329 }
331 }
332 });
333
334 let result_type_string = result_type.to_string();
335 let data = PrometheusResponse::PromData(PromData {
336 result_type: result_type_string,
337 result,
338 });
339
340 Ok(data)
341 }
342}