1use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest, SemanticType};
16use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
17
18use crate::error::{self, Result};
19
20#[derive(Debug, Clone)]
21pub struct DataPoint {
22 metric: String,
23 ts_millis: i64,
24 value: f64,
25 tags: Vec<(String, String)>,
26}
27
28impl DataPoint {
29 pub fn new(metric: String, ts_millis: i64, value: f64, tags: Vec<(String, String)>) -> Self {
30 Self {
31 metric,
32 ts_millis,
33 value,
34 tags,
35 }
36 }
37
38 pub fn try_create(line: &str) -> Result<Self> {
39 let tokens = line.split_whitespace().collect::<Vec<&str>>();
40 let cmd = if tokens.is_empty() { "" } else { tokens[0] };
41 if cmd != "put" {
43 return error::InvalidQuerySnafu {
44 reason: format!("unknown command {cmd}."),
45 }
46 .fail();
47 }
48 if tokens.len() < 4 {
49 return error::InvalidQuerySnafu {
50 reason: format!(
51 "put: illegal argument: not enough arguments (need least 4, got {})",
52 tokens.len()
53 ),
54 }
55 .fail();
56 }
57
58 let metric = tokens[1];
59
60 let ts_millis = match tokens[2].parse::<i64>() {
61 Ok(t) => Self::timestamp_to_millis(t),
62 Err(_) => {
63 return error::InvalidQuerySnafu {
64 reason: format!("put: invalid timestamp: {}", tokens[2]),
65 }
66 .fail()
67 }
68 };
69
70 let value = match tokens[3].parse::<f64>() {
71 Ok(v) => v,
72 Err(_) => {
73 return error::InvalidQuerySnafu {
74 reason: format!("put: invalid value: {}", tokens[3]),
75 }
76 .fail()
77 }
78 };
79
80 let mut tags = Vec::with_capacity(tokens.len() - 4);
81 for token in tokens.iter().skip(4) {
82 let tag = token.split('=').collect::<Vec<&str>>();
83 if tag.len() != 2 || tag[0].is_empty() || tag[1].is_empty() {
84 return error::InvalidQuerySnafu {
85 reason: format!("put: invalid tag: {token}"),
86 }
87 .fail();
88 }
89 let tagk = tag[0].to_string();
90 let tagv = tag[1].to_string();
91 if tags.iter().any(|(t, _)| t == &tagk) {
92 return error::InvalidQuerySnafu {
93 reason: format!("put: illegal argument: duplicate tag: {tagk}"),
94 }
95 .fail();
96 }
97 tags.push((tagk, tagv));
98 }
99
100 Ok(DataPoint {
101 metric: metric.to_string(),
102 ts_millis,
103 value,
104 tags,
105 })
106 }
107
108 pub fn metric(&self) -> &str {
109 &self.metric
110 }
111
112 pub fn tags(&self) -> &Vec<(String, String)> {
113 &self.tags
114 }
115
116 pub fn tags_mut(&mut self) -> &mut Vec<(String, String)> {
117 &mut self.tags
118 }
119
120 pub fn ts_millis(&self) -> i64 {
121 self.ts_millis
122 }
123
124 pub fn value(&self) -> f64 {
125 self.value
126 }
127
128 pub fn as_grpc_insert(&self) -> GrpcInsertRequest {
129 let mut columns = Vec::with_capacity(2 + self.tags.len());
130
131 let ts_column = Column {
132 column_name: GREPTIME_TIMESTAMP.to_string(),
133 values: Some(column::Values {
134 timestamp_millisecond_values: vec![self.ts_millis],
135 ..Default::default()
136 }),
137 semantic_type: SemanticType::Timestamp as i32,
138 datatype: ColumnDataType::TimestampMillisecond as i32,
139 ..Default::default()
140 };
141 columns.push(ts_column);
142
143 let field_column = Column {
144 column_name: GREPTIME_VALUE.to_string(),
145 values: Some(column::Values {
146 f64_values: vec![self.value],
147 ..Default::default()
148 }),
149 semantic_type: SemanticType::Field as i32,
150 datatype: ColumnDataType::Float64 as i32,
151 ..Default::default()
152 };
153 columns.push(field_column);
154
155 for (tagk, tagv) in self.tags.iter() {
156 columns.push(Column {
157 column_name: tagk.to_string(),
158 values: Some(column::Values {
159 string_values: vec![tagv.to_string()],
160 ..Default::default()
161 }),
162 semantic_type: SemanticType::Tag as i32,
163 datatype: ColumnDataType::String as i32,
164 ..Default::default()
165 });
166 }
167
168 GrpcInsertRequest {
169 table_name: self.metric.clone(),
170 columns,
171 row_count: 1,
172 }
173 }
174
175 pub fn timestamp_to_millis(t: i64) -> i64 {
176 if t.abs().to_string().len() < 13 {
181 t * 1000
182 } else {
183 t
184 }
185 }
186}
187
188#[cfg(test)]
189mod test {
190 use super::*;
191
192 #[test]
193 fn test_try_create() {
194 fn test_illegal_line(line: &str, expected_err: &str) {
195 let result = DataPoint::try_create(line);
196 match result.unwrap_err() {
197 error::Error::InvalidQuery { reason, .. } => {
198 assert_eq!(reason, expected_err)
199 }
200 _ => unreachable!(),
201 }
202 }
203
204 test_illegal_line("no_put", "unknown command no_put.");
205 test_illegal_line(
206 "put",
207 "put: illegal argument: not enough arguments (need least 4, got 1)",
208 );
209 test_illegal_line(
210 "put metric.foo notatime 42 host=web01",
211 "put: invalid timestamp: notatime",
212 );
213 test_illegal_line(
214 "put metric.foo 1000 notavalue host=web01",
215 "put: invalid value: notavalue",
216 );
217 test_illegal_line("put metric.foo 1000 42 host=", "put: invalid tag: host=");
218 test_illegal_line(
219 "put metric.foo 1000 42 host=web01 host=web02",
220 "put: illegal argument: duplicate tag: host",
221 );
222
223 let data_point = DataPoint::try_create(
224 "put sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0",
225 )
226 .unwrap();
227 assert_eq!(data_point.metric, "sys.if.bytes.out");
228 assert_eq!(data_point.ts_millis, 1479496100000);
229 assert_eq!(data_point.value, 1.3e3);
230 assert_eq!(
231 data_point.tags,
232 vec![
233 ("host".to_string(), "web01".to_string()),
234 ("interface".to_string(), "eth0".to_string())
235 ]
236 );
237
238 let data_point =
239 DataPoint::try_create("put sys.procs.running 1479496100 42 host=web01").unwrap();
240 assert_eq!(data_point.metric, "sys.procs.running");
241 assert_eq!(data_point.ts_millis, 1479496100000);
242 assert_eq!(data_point.value, 42f64);
243 assert_eq!(
244 data_point.tags,
245 vec![("host".to_string(), "web01".to_string())]
246 );
247 }
248
249 #[test]
250 fn test_as_grpc_insert() {
251 let data_point = DataPoint {
252 metric: "my_metric_1".to_string(),
253 ts_millis: 1000,
254 value: 1.0,
255 tags: vec![
256 ("tagk1".to_string(), "tagv1".to_string()),
257 ("tagk2".to_string(), "tagv2".to_string()),
258 ],
259 };
260
261 let grpc_insert = data_point.as_grpc_insert();
262 assert_eq!(grpc_insert.table_name, "my_metric_1");
263
264 let columns = &grpc_insert.columns;
265 let row_count = grpc_insert.row_count;
266
267 assert_eq!(row_count, 1);
268 assert_eq!(columns.len(), 4);
269
270 assert_eq!(columns[0].column_name, GREPTIME_TIMESTAMP);
271 assert_eq!(
272 columns[0]
273 .values
274 .as_ref()
275 .unwrap()
276 .timestamp_millisecond_values,
277 vec![1000]
278 );
279
280 assert_eq!(columns[1].column_name, GREPTIME_VALUE);
281 assert_eq!(columns[1].values.as_ref().unwrap().f64_values, vec![1.0]);
282
283 assert_eq!(columns[2].column_name, "tagk1");
284 assert_eq!(
285 columns[2].values.as_ref().unwrap().string_values,
286 vec!["tagv1"]
287 );
288
289 assert_eq!(columns[3].column_name, "tagk2");
290 assert_eq!(
291 columns[3].values.as_ref().unwrap().string_values,
292 vec!["tagv2"]
293 );
294 }
295}