servers/opentsdb/
codec.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest, SemanticType};
16use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
17
18use crate::error::{self, Result};
19
20#[derive(Debug, Clone)]
21pub struct DataPoint {
22    metric: String,
23    ts_millis: i64,
24    value: f64,
25    tags: Vec<(String, String)>,
26}
27
28impl DataPoint {
29    pub fn new(metric: String, ts_millis: i64, value: f64, tags: Vec<(String, String)>) -> Self {
30        Self {
31            metric,
32            ts_millis,
33            value,
34            tags,
35        }
36    }
37
38    pub fn try_create(line: &str) -> Result<Self> {
39        let tokens = line.split_whitespace().collect::<Vec<&str>>();
40        let cmd = if tokens.is_empty() { "" } else { tokens[0] };
41        // OpenTSDB command is case sensitive, verified in real OpenTSDB.
42        if cmd != "put" {
43            return error::InvalidQuerySnafu {
44                reason: format!("unknown command {cmd}."),
45            }
46            .fail();
47        }
48        if tokens.len() < 4 {
49            return error::InvalidQuerySnafu {
50                reason: format!(
51                    "put: illegal argument: not enough arguments (need least 4, got {})",
52                    tokens.len()
53                ),
54            }
55            .fail();
56        }
57
58        let metric = tokens[1];
59
60        let ts_millis = match tokens[2].parse::<i64>() {
61            Ok(t) => Self::timestamp_to_millis(t),
62            Err(_) => {
63                return error::InvalidQuerySnafu {
64                    reason: format!("put: invalid timestamp: {}", tokens[2]),
65                }
66                .fail()
67            }
68        };
69
70        let value = match tokens[3].parse::<f64>() {
71            Ok(v) => v,
72            Err(_) => {
73                return error::InvalidQuerySnafu {
74                    reason: format!("put: invalid value: {}", tokens[3]),
75                }
76                .fail()
77            }
78        };
79
80        let mut tags = Vec::with_capacity(tokens.len() - 4);
81        for token in tokens.iter().skip(4) {
82            let tag = token.split('=').collect::<Vec<&str>>();
83            if tag.len() != 2 || tag[0].is_empty() || tag[1].is_empty() {
84                return error::InvalidQuerySnafu {
85                    reason: format!("put: invalid tag: {token}"),
86                }
87                .fail();
88            }
89            let tagk = tag[0].to_string();
90            let tagv = tag[1].to_string();
91            if tags.iter().any(|(t, _)| t == &tagk) {
92                return error::InvalidQuerySnafu {
93                    reason: format!("put: illegal argument: duplicate tag: {tagk}"),
94                }
95                .fail();
96            }
97            tags.push((tagk, tagv));
98        }
99
100        Ok(DataPoint {
101            metric: metric.to_string(),
102            ts_millis,
103            value,
104            tags,
105        })
106    }
107
108    pub fn metric(&self) -> &str {
109        &self.metric
110    }
111
112    pub fn tags(&self) -> &Vec<(String, String)> {
113        &self.tags
114    }
115
116    pub fn tags_mut(&mut self) -> &mut Vec<(String, String)> {
117        &mut self.tags
118    }
119
120    pub fn ts_millis(&self) -> i64 {
121        self.ts_millis
122    }
123
124    pub fn value(&self) -> f64 {
125        self.value
126    }
127
128    pub fn as_grpc_insert(&self) -> GrpcInsertRequest {
129        let mut columns = Vec::with_capacity(2 + self.tags.len());
130
131        let ts_column = Column {
132            column_name: GREPTIME_TIMESTAMP.to_string(),
133            values: Some(column::Values {
134                timestamp_millisecond_values: vec![self.ts_millis],
135                ..Default::default()
136            }),
137            semantic_type: SemanticType::Timestamp as i32,
138            datatype: ColumnDataType::TimestampMillisecond as i32,
139            ..Default::default()
140        };
141        columns.push(ts_column);
142
143        let field_column = Column {
144            column_name: GREPTIME_VALUE.to_string(),
145            values: Some(column::Values {
146                f64_values: vec![self.value],
147                ..Default::default()
148            }),
149            semantic_type: SemanticType::Field as i32,
150            datatype: ColumnDataType::Float64 as i32,
151            ..Default::default()
152        };
153        columns.push(field_column);
154
155        for (tagk, tagv) in self.tags.iter() {
156            columns.push(Column {
157                column_name: tagk.to_string(),
158                values: Some(column::Values {
159                    string_values: vec![tagv.to_string()],
160                    ..Default::default()
161                }),
162                semantic_type: SemanticType::Tag as i32,
163                datatype: ColumnDataType::String as i32,
164                ..Default::default()
165            });
166        }
167
168        GrpcInsertRequest {
169            table_name: self.metric.clone(),
170            columns,
171            row_count: 1,
172        }
173    }
174
175    pub fn timestamp_to_millis(t: i64) -> i64 {
176        // 9999999999999 (13 digits) is of date "Sat Nov 20 2286 17:46:39 UTC",
177        // 999999999999 (12 digits) is "Sun Sep 09 2001 01:46:39 UTC",
178        // so timestamp digits less than 13 means we got seconds here.
179        // (We are not expecting to store data that is 21 years ago, are we?)
180        if t.abs().to_string().len() < 13 {
181            t * 1000
182        } else {
183            t
184        }
185    }
186}
187
188#[cfg(test)]
189mod test {
190    use super::*;
191
192    #[test]
193    fn test_try_create() {
194        fn test_illegal_line(line: &str, expected_err: &str) {
195            let result = DataPoint::try_create(line);
196            match result.unwrap_err() {
197                error::Error::InvalidQuery { reason, .. } => {
198                    assert_eq!(reason, expected_err)
199                }
200                _ => unreachable!(),
201            }
202        }
203
204        test_illegal_line("no_put", "unknown command no_put.");
205        test_illegal_line(
206            "put",
207            "put: illegal argument: not enough arguments (need least 4, got 1)",
208        );
209        test_illegal_line(
210            "put metric.foo notatime 42 host=web01",
211            "put: invalid timestamp: notatime",
212        );
213        test_illegal_line(
214            "put metric.foo 1000 notavalue host=web01",
215            "put: invalid value: notavalue",
216        );
217        test_illegal_line("put metric.foo 1000 42 host=", "put: invalid tag: host=");
218        test_illegal_line(
219            "put metric.foo 1000 42 host=web01 host=web02",
220            "put: illegal argument: duplicate tag: host",
221        );
222
223        let data_point = DataPoint::try_create(
224            "put sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0",
225        )
226        .unwrap();
227        assert_eq!(data_point.metric, "sys.if.bytes.out");
228        assert_eq!(data_point.ts_millis, 1479496100000);
229        assert_eq!(data_point.value, 1.3e3);
230        assert_eq!(
231            data_point.tags,
232            vec![
233                ("host".to_string(), "web01".to_string()),
234                ("interface".to_string(), "eth0".to_string())
235            ]
236        );
237
238        let data_point =
239            DataPoint::try_create("put sys.procs.running 1479496100 42 host=web01").unwrap();
240        assert_eq!(data_point.metric, "sys.procs.running");
241        assert_eq!(data_point.ts_millis, 1479496100000);
242        assert_eq!(data_point.value, 42f64);
243        assert_eq!(
244            data_point.tags,
245            vec![("host".to_string(), "web01".to_string())]
246        );
247    }
248
249    #[test]
250    fn test_as_grpc_insert() {
251        let data_point = DataPoint {
252            metric: "my_metric_1".to_string(),
253            ts_millis: 1000,
254            value: 1.0,
255            tags: vec![
256                ("tagk1".to_string(), "tagv1".to_string()),
257                ("tagk2".to_string(), "tagv2".to_string()),
258            ],
259        };
260
261        let grpc_insert = data_point.as_grpc_insert();
262        assert_eq!(grpc_insert.table_name, "my_metric_1");
263
264        let columns = &grpc_insert.columns;
265        let row_count = grpc_insert.row_count;
266
267        assert_eq!(row_count, 1);
268        assert_eq!(columns.len(), 4);
269
270        assert_eq!(columns[0].column_name, GREPTIME_TIMESTAMP);
271        assert_eq!(
272            columns[0]
273                .values
274                .as_ref()
275                .unwrap()
276                .timestamp_millisecond_values,
277            vec![1000]
278        );
279
280        assert_eq!(columns[1].column_name, GREPTIME_VALUE);
281        assert_eq!(columns[1].values.as_ref().unwrap().f64_values, vec![1.0]);
282
283        assert_eq!(columns[2].column_name, "tagk1");
284        assert_eq!(
285            columns[2].values.as_ref().unwrap().string_values,
286            vec!["tagv1"]
287        );
288
289        assert_eq!(columns[3].column_name, "tagk2");
290        assert_eq!(
291            columns[3].values.as_ref().unwrap().string_values,
292            vec!["tagv2"]
293        );
294    }
295}