1use std::collections::HashMap;
16use std::sync::Arc;
17
18use axum::body::Bytes;
19use axum::extract::{Query, State};
20use axum::http::StatusCode as HttpStatusCode;
21use axum::{Extension, Json};
22use common_error::ext::ErrorExt;
23use serde::{Deserialize, Serialize};
24use session::context::{Channel, QueryContext};
25use snafu::ResultExt;
26
27use crate::error::{self, Result};
28use crate::opentsdb::codec::DataPoint;
29use crate::query_handler::OpentsdbProtocolHandlerRef;
30
31#[derive(Serialize, Deserialize)]
32#[serde(untagged)]
33enum OneOrMany<T> {
34 One(T),
35 Vec(Vec<T>),
36}
37
38impl<T> From<OneOrMany<T>> for Vec<T> {
39 fn from(from: OneOrMany<T>) -> Self {
40 match from {
41 OneOrMany::One(val) => vec![val],
42 OneOrMany::Vec(vec) => vec,
43 }
44 }
45}
46
47#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
48pub struct DataPointRequest {
49 metric: String,
50 timestamp: i64,
51 value: f64,
52 tags: HashMap<String, String>,
53}
54
55impl From<DataPointRequest> for DataPoint {
56 fn from(request: DataPointRequest) -> Self {
57 let ts_millis = DataPoint::timestamp_to_millis(request.timestamp);
58
59 let tags = request.tags.into_iter().collect::<Vec<(String, String)>>();
60
61 DataPoint::new(request.metric, ts_millis, request.value, tags)
62 }
63}
64
65#[derive(Serialize, Deserialize, Debug)]
66#[serde(untagged)]
67pub enum OpentsdbPutResponse {
68 Empty,
69 Debug(OpentsdbDebuggingResponse),
70}
71
72#[axum_macros::debug_handler]
75pub async fn put(
76 State(opentsdb_handler): State<OpentsdbProtocolHandlerRef>,
77 Query(params): Query<HashMap<String, String>>,
78 Extension(mut ctx): Extension<QueryContext>,
79 body: Bytes,
80) -> Result<(HttpStatusCode, Json<OpentsdbPutResponse>)> {
81 let summary = params.contains_key("summary");
82 let details = params.contains_key("details");
83
84 let data_point_requests = parse_data_points(body).await?;
85 let data_points = data_point_requests
86 .iter()
87 .map(|point| point.clone().into())
88 .collect::<Vec<_>>();
89
90 ctx.set_channel(Channel::Opentsdb);
91 let ctx = Arc::new(ctx);
92
93 let response = if !summary && !details {
94 if let Err(e) = opentsdb_handler.exec(data_points, ctx.clone()).await {
95 return error::InternalSnafu {
97 err_msg: e.to_string(),
98 }
99 .fail();
100 }
101 (HttpStatusCode::NO_CONTENT, Json(OpentsdbPutResponse::Empty))
102 } else {
103 let mut response = OpentsdbDebuggingResponse {
104 success: 0,
105 failed: 0,
106 errors: if details {
107 Some(Vec::with_capacity(data_points.len()))
108 } else {
109 None
110 },
111 };
112
113 for (data_point, request) in data_points.into_iter().zip(data_point_requests) {
114 let result = opentsdb_handler.exec(vec![data_point], ctx.clone()).await;
115 match result {
116 Ok(affected_rows) => response.on_success(affected_rows),
117 Err(e) => response.on_failed(request, e),
118 }
119 }
120 (
121 HttpStatusCode::OK,
122 Json(OpentsdbPutResponse::Debug(response)),
123 )
124 };
125 Ok(response)
126}
127
128async fn parse_data_points(body: Bytes) -> Result<Vec<DataPointRequest>> {
129 let data_points = serde_json::from_slice::<OneOrMany<DataPointRequest>>(&body[..])
130 .context(error::InvalidOpentsdbJsonRequestSnafu)?;
131 Ok(data_points.into())
132}
133
134#[derive(Serialize, Deserialize, Debug)]
135struct OpentsdbDetailError {
136 datapoint: DataPointRequest,
137 error: String,
138}
139
140#[derive(Serialize, Deserialize, Debug)]
141pub struct OpentsdbDebuggingResponse {
142 success: i32,
143 failed: i32,
144 #[serde(skip_serializing_if = "Option::is_none")]
145 errors: Option<Vec<OpentsdbDetailError>>,
146}
147
148impl OpentsdbDebuggingResponse {
149 fn on_success(&mut self, affected_rows: usize) {
150 self.success += affected_rows as i32;
151 }
152
153 fn on_failed(&mut self, datapoint: DataPointRequest, error: impl ErrorExt) {
154 self.failed += 1;
155
156 if let Some(details) = self.errors.as_mut() {
157 let error = OpentsdbDetailError {
158 datapoint,
159 error: error.output_msg(),
160 };
161 details.push(error);
162 };
163 }
164}
165
166#[cfg(test)]
167mod test {
168
169 use super::*;
170
171 #[test]
172 fn test_into_opentsdb_data_point() {
173 let request = DataPointRequest {
174 metric: "hello".to_string(),
175 timestamp: 1234,
176 value: 1.0,
177 tags: HashMap::from([("foo".to_string(), "a".to_string())]),
178 };
179 let data_point: DataPoint = request.into();
180 assert_eq!(data_point.metric(), "hello");
181 assert_eq!(data_point.ts_millis(), 1234000);
182 assert_eq!(data_point.value(), 1.0);
183 assert_eq!(
184 data_point.tags(),
185 &vec![("foo".to_string(), "a".to_string())]
186 );
187 }
188
189 #[tokio::test]
190 async fn test_parse_data_points() {
191 let raw_data_point1 = r#"{
192 "metric": "sys.cpu.nice",
193 "timestamp": 1346846400,
194 "value": 18,
195 "tags": {
196 "host": "web01",
197 "dc": "lga"
198 }
199 }"#;
200 let data_point1 = serde_json::from_str::<DataPointRequest>(raw_data_point1).unwrap();
201
202 let raw_data_point2 = r#"{
203 "metric": "sys.cpu.nice",
204 "timestamp": 1346846400,
205 "value": 9,
206 "tags": {
207 "host": "web02",
208 "dc": "lga"
209 }
210 }"#;
211 let data_point2 = serde_json::from_str::<DataPointRequest>(raw_data_point2).unwrap();
212
213 let body = Bytes::from(raw_data_point1);
214 let data_points = parse_data_points(body).await.unwrap();
215 assert_eq!(data_points.len(), 1);
216 assert_eq!(data_points[0], data_point1);
217
218 let body = Bytes::from(format!("[{raw_data_point1},{raw_data_point2}]"));
219 let data_points = parse_data_points(body).await.unwrap();
220 assert_eq!(data_points.len(), 2);
221 assert_eq!(data_points[0], data_point1);
222 assert_eq!(data_points[1], data_point2);
223
224 let body = Bytes::from("");
225 let result = parse_data_points(body).await;
226 assert!(result.is_err());
227 let err = result.unwrap_err().output_msg();
228 assert!(err.contains("EOF while parsing a value at line 1 column 0"));
229
230 let body = Bytes::from("hello world");
231 let result = parse_data_points(body).await;
232 assert!(result.is_err());
233 let err = result.unwrap_err().output_msg();
234 assert!(err.contains("expected value at line 1 column 1"));
235 }
236}