servers/http/
opentsdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use axum::body::Bytes;
19use axum::extract::{Query, State};
20use axum::http::StatusCode as HttpStatusCode;
21use axum::{Extension, Json};
22use common_error::ext::ErrorExt;
23use serde::{Deserialize, Serialize};
24use session::context::{Channel, QueryContext};
25use snafu::ResultExt;
26
27use crate::error::{self, Result};
28use crate::opentsdb::codec::DataPoint;
29use crate::query_handler::OpentsdbProtocolHandlerRef;
30
31#[derive(Serialize, Deserialize)]
32#[serde(untagged)]
33enum OneOrMany<T> {
34    One(T),
35    Vec(Vec<T>),
36}
37
38impl<T> From<OneOrMany<T>> for Vec<T> {
39    fn from(from: OneOrMany<T>) -> Self {
40        match from {
41            OneOrMany::One(val) => vec![val],
42            OneOrMany::Vec(vec) => vec,
43        }
44    }
45}
46
47#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
48pub struct DataPointRequest {
49    metric: String,
50    timestamp: i64,
51    value: f64,
52    tags: HashMap<String, String>,
53}
54
55impl From<DataPointRequest> for DataPoint {
56    fn from(request: DataPointRequest) -> Self {
57        let ts_millis = DataPoint::timestamp_to_millis(request.timestamp);
58
59        let tags = request.tags.into_iter().collect::<Vec<(String, String)>>();
60
61        DataPoint::new(request.metric, ts_millis, request.value, tags)
62    }
63}
64
65#[derive(Serialize, Deserialize, Debug)]
66#[serde(untagged)]
67pub enum OpentsdbPutResponse {
68    Empty,
69    Debug(OpentsdbDebuggingResponse),
70}
71
72// Please refer to the OpenTSDB documents of ["api/put"](http://opentsdb.net/docs/build/html/api_http/put.html)
73// for more details.
74#[axum_macros::debug_handler]
75pub async fn put(
76    State(opentsdb_handler): State<OpentsdbProtocolHandlerRef>,
77    Query(params): Query<HashMap<String, String>>,
78    Extension(mut ctx): Extension<QueryContext>,
79    body: Bytes,
80) -> Result<(HttpStatusCode, Json<OpentsdbPutResponse>)> {
81    let summary = params.contains_key("summary");
82    let details = params.contains_key("details");
83
84    let data_point_requests = parse_data_points(body).await?;
85    let data_points = data_point_requests
86        .iter()
87        .map(|point| point.clone().into())
88        .collect::<Vec<_>>();
89
90    ctx.set_channel(Channel::Opentsdb);
91    let ctx = Arc::new(ctx);
92
93    let response = if !summary && !details {
94        if let Err(e) = opentsdb_handler.exec(data_points, ctx.clone()).await {
95            // Not debugging purpose, failed fast.
96            return error::InternalSnafu {
97                err_msg: e.to_string(),
98            }
99            .fail();
100        }
101        (HttpStatusCode::NO_CONTENT, Json(OpentsdbPutResponse::Empty))
102    } else {
103        let mut response = OpentsdbDebuggingResponse {
104            success: 0,
105            failed: 0,
106            errors: if details {
107                Some(Vec::with_capacity(data_points.len()))
108            } else {
109                None
110            },
111        };
112
113        for (data_point, request) in data_points.into_iter().zip(data_point_requests) {
114            let result = opentsdb_handler.exec(vec![data_point], ctx.clone()).await;
115            match result {
116                Ok(affected_rows) => response.on_success(affected_rows),
117                Err(e) => response.on_failed(request, e),
118            }
119        }
120        (
121            HttpStatusCode::OK,
122            Json(OpentsdbPutResponse::Debug(response)),
123        )
124    };
125    Ok(response)
126}
127
128async fn parse_data_points(body: Bytes) -> Result<Vec<DataPointRequest>> {
129    let data_points = serde_json::from_slice::<OneOrMany<DataPointRequest>>(&body[..])
130        .context(error::InvalidOpentsdbJsonRequestSnafu)?;
131    Ok(data_points.into())
132}
133
134#[derive(Serialize, Deserialize, Debug)]
135struct OpentsdbDetailError {
136    datapoint: DataPointRequest,
137    error: String,
138}
139
140#[derive(Serialize, Deserialize, Debug)]
141pub struct OpentsdbDebuggingResponse {
142    success: i32,
143    failed: i32,
144    #[serde(skip_serializing_if = "Option::is_none")]
145    errors: Option<Vec<OpentsdbDetailError>>,
146}
147
148impl OpentsdbDebuggingResponse {
149    fn on_success(&mut self, affected_rows: usize) {
150        self.success += affected_rows as i32;
151    }
152
153    fn on_failed(&mut self, datapoint: DataPointRequest, error: impl ErrorExt) {
154        self.failed += 1;
155
156        if let Some(details) = self.errors.as_mut() {
157            let error = OpentsdbDetailError {
158                datapoint,
159                error: error.output_msg(),
160            };
161            details.push(error);
162        };
163    }
164}
165
166#[cfg(test)]
167mod test {
168
169    use super::*;
170
171    #[test]
172    fn test_into_opentsdb_data_point() {
173        let request = DataPointRequest {
174            metric: "hello".to_string(),
175            timestamp: 1234,
176            value: 1.0,
177            tags: HashMap::from([("foo".to_string(), "a".to_string())]),
178        };
179        let data_point: DataPoint = request.into();
180        assert_eq!(data_point.metric(), "hello");
181        assert_eq!(data_point.ts_millis(), 1234000);
182        assert_eq!(data_point.value(), 1.0);
183        assert_eq!(
184            data_point.tags(),
185            &vec![("foo".to_string(), "a".to_string())]
186        );
187    }
188
189    #[tokio::test]
190    async fn test_parse_data_points() {
191        let raw_data_point1 = r#"{
192                "metric": "sys.cpu.nice",
193                "timestamp": 1346846400,
194                "value": 18,
195                "tags": {
196                    "host": "web01",
197                    "dc": "lga"
198                }
199            }"#;
200        let data_point1 = serde_json::from_str::<DataPointRequest>(raw_data_point1).unwrap();
201
202        let raw_data_point2 = r#"{
203                "metric": "sys.cpu.nice",
204                "timestamp": 1346846400,
205                "value": 9,
206                "tags": {
207                    "host": "web02",
208                    "dc": "lga"
209                }
210            }"#;
211        let data_point2 = serde_json::from_str::<DataPointRequest>(raw_data_point2).unwrap();
212
213        let body = Bytes::from(raw_data_point1);
214        let data_points = parse_data_points(body).await.unwrap();
215        assert_eq!(data_points.len(), 1);
216        assert_eq!(data_points[0], data_point1);
217
218        let body = Bytes::from(format!("[{raw_data_point1},{raw_data_point2}]"));
219        let data_points = parse_data_points(body).await.unwrap();
220        assert_eq!(data_points.len(), 2);
221        assert_eq!(data_points[0], data_point1);
222        assert_eq!(data_points[1], data_point2);
223
224        let body = Bytes::from("");
225        let result = parse_data_points(body).await;
226        assert!(result.is_err());
227        let err = result.unwrap_err().output_msg();
228        assert!(err.contains("EOF while parsing a value at line 1 column 0"));
229
230        let body = Bytes::from("hello world");
231        let result = parse_data_points(body).await;
232        assert!(result.is_err());
233        let err = result.unwrap_err().output_msg();
234        assert!(err.contains("expected value at line 1 column 1"));
235    }
236}