servers/
influxdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::value::ValueData;
16use api::v1::{ColumnDataType, RowInsertRequests};
17use common_grpc::precision::Precision;
18use hyper::Request;
19use influxdb_line_protocol::{parse_lines, FieldValue};
20use snafu::ResultExt;
21
22use crate::error::{Error, InfluxdbLineProtocolSnafu};
23use crate::row_writer::{self, MultiTableData};
24
25const INFLUXDB_API_PATH_NAME: &str = "influxdb";
26const INFLUXDB_API_V2_PATH_NAME: &str = "influxdb/api/v2";
27const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
28const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
29
30#[inline]
31pub(crate) fn is_influxdb_request<T>(req: &Request<T>) -> bool {
32    req.uri().path().contains(INFLUXDB_API_PATH_NAME)
33}
34
35#[inline]
36pub(crate) fn is_influxdb_v2_request<T>(req: &Request<T>) -> bool {
37    req.uri().path().contains(INFLUXDB_API_V2_PATH_NAME)
38}
39
40#[derive(Debug)]
41pub struct InfluxdbRequest {
42    pub precision: Option<Precision>,
43    pub lines: String,
44}
45
46impl TryFrom<InfluxdbRequest> for RowInsertRequests {
47    type Error = Error;
48
49    fn try_from(value: InfluxdbRequest) -> Result<Self, Self::Error> {
50        let lines = parse_lines(&value.lines)
51            .collect::<influxdb_line_protocol::Result<Vec<_>>>()
52            .context(InfluxdbLineProtocolSnafu)?;
53
54        let mut multi_table_data = MultiTableData::new();
55        let precision = unwrap_or_default_precision(value.precision);
56        for line in &lines {
57            let table_name = line.series.measurement.as_str();
58            let tags = &line.series.tag_set;
59            let fields = &line.field_set;
60            let ts = line.timestamp;
61            // tags.len + fields.len + timestamp(+1)
62            let num_columns = tags.as_ref().map(|x| x.len()).unwrap_or(0) + fields.len() + 1;
63
64            let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 0);
65            let mut one_row = table_data.alloc_one_row();
66
67            // tags
68            if let Some(tags) = tags {
69                let kvs = tags.iter().map(|(k, v)| (k.to_string(), v.to_string()));
70                row_writer::write_tags(table_data, kvs, &mut one_row)?;
71            }
72
73            // fields
74            let fields = fields.iter().map(|(k, v)| {
75                let (datatype, value) = match v {
76                    FieldValue::I64(v) => (ColumnDataType::Int64, ValueData::I64Value(*v)),
77                    FieldValue::U64(v) => (ColumnDataType::Uint64, ValueData::U64Value(*v)),
78                    FieldValue::F64(v) => (ColumnDataType::Float64, ValueData::F64Value(*v)),
79                    FieldValue::String(v) => (
80                        ColumnDataType::String,
81                        ValueData::StringValue(v.to_string()),
82                    ),
83                    FieldValue::Boolean(v) => (ColumnDataType::Boolean, ValueData::BoolValue(*v)),
84                };
85                (k.to_string(), datatype, value)
86            });
87            row_writer::write_fields(table_data, fields, &mut one_row)?;
88
89            // timestamp
90            row_writer::write_ts_to_nanos(
91                table_data,
92                INFLUXDB_TIMESTAMP_COLUMN_NAME,
93                ts,
94                precision,
95                &mut one_row,
96            )?;
97
98            table_data.add_row(one_row);
99        }
100
101        Ok(multi_table_data.into_row_insert_requests().0)
102    }
103}
104
105#[inline]
106fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
107    if let Some(val) = precision {
108        val
109    } else {
110        DEFAULT_TIME_PRECISION
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use api::v1::value::ValueData;
117    use api::v1::{ColumnDataType, RowInsertRequests, Rows, SemanticType};
118
119    use crate::influxdb::InfluxdbRequest;
120
121    #[test]
122    fn test_convert_influxdb_lines_to_rows() {
123        let lines = r"
124monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
125monitor1,host=host2 memory=1027 1663840496400340001
126monitor2,host=host3 cpu=66.5 1663840496100023102
127monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
128
129        let influxdb_req = InfluxdbRequest {
130            precision: None,
131            lines: lines.to_string(),
132        };
133
134        let requests: RowInsertRequests = influxdb_req.try_into().unwrap();
135        assert_eq!(2, requests.inserts.len());
136
137        for request in requests.inserts {
138            match &request.table_name[..] {
139                "monitor1" => assert_monitor1_rows(&request.rows),
140                "monitor2" => assert_monitor2_rows(&request.rows),
141                _ => panic!(),
142            }
143        }
144    }
145
146    fn assert_monitor1_rows(rows: &Option<Rows>) {
147        let rows = rows.as_ref().unwrap();
148        let schema = &rows.schema;
149        let rows = &rows.rows;
150        assert_eq!(4, schema.len());
151        assert_eq!(2, rows.len());
152
153        for (i, column_schema) in schema.iter().enumerate() {
154            match &column_schema.column_name[..] {
155                "host" => {
156                    assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
157                    assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
158
159                    for (j, row) in rows.iter().enumerate() {
160                        let v = row.values[i].value_data.as_ref().unwrap();
161                        match j {
162                            0 => assert_eq!("host1", extract_string_value(v)),
163                            1 => assert_eq!("host2", extract_string_value(v)),
164                            _ => panic!(),
165                        }
166                    }
167                }
168                "cpu" => {
169                    assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
170                    assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
171
172                    for (j, row) in rows.iter().enumerate() {
173                        let v = row.values[i].value_data.as_ref();
174                        match j {
175                            0 => assert_eq!(66.6f64, extract_f64_value(v.as_ref().unwrap())),
176                            1 => assert_eq!(None, v),
177                            _ => panic!(),
178                        }
179                    }
180                }
181                "memory" => {
182                    assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
183                    assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
184
185                    for (j, row) in rows.iter().enumerate() {
186                        let v = row.values[i].value_data.as_ref();
187                        match j {
188                            0 => assert_eq!(1024f64, extract_f64_value(v.as_ref().unwrap())),
189                            1 => assert_eq!(1027f64, extract_f64_value(v.as_ref().unwrap())),
190                            _ => panic!(),
191                        }
192                    }
193                }
194                "ts" => {
195                    assert_eq!(
196                        ColumnDataType::TimestampNanosecond as i32,
197                        column_schema.datatype
198                    );
199                    assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
200
201                    for (j, row) in rows.iter().enumerate() {
202                        let v = row.values[i].value_data.as_ref();
203                        match j {
204                            0 => assert_eq!(
205                                1663840496100023100,
206                                extract_ts_nanos_value(v.as_ref().unwrap())
207                            ),
208                            1 => assert_eq!(
209                                1663840496400340001,
210                                extract_ts_nanos_value(v.as_ref().unwrap())
211                            ),
212                            _ => panic!(),
213                        }
214                    }
215                }
216                _ => panic!(),
217            }
218        }
219    }
220
221    fn assert_monitor2_rows(rows: &Option<Rows>) {
222        let rows = rows.as_ref().unwrap();
223        let schema = &rows.schema;
224        let rows = &rows.rows;
225        assert_eq!(4, schema.len());
226        assert_eq!(2, rows.len());
227
228        for (i, column_schema) in schema.iter().enumerate() {
229            match &column_schema.column_name[..] {
230                "host" => {
231                    assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
232                    assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
233
234                    for (j, row) in rows.iter().enumerate() {
235                        let v = row.values[i].value_data.as_ref().unwrap();
236                        match j {
237                            0 => assert_eq!("host3", extract_string_value(v)),
238                            1 => assert_eq!("host4", extract_string_value(v)),
239                            _ => panic!(),
240                        }
241                    }
242                }
243                "cpu" => {
244                    assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
245                    assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
246
247                    for (j, row) in rows.iter().enumerate() {
248                        let v = row.values[i].value_data.as_ref();
249                        match j {
250                            0 => assert_eq!(66.5f64, extract_f64_value(v.as_ref().unwrap())),
251                            1 => assert_eq!(66.3f64, extract_f64_value(v.as_ref().unwrap())),
252                            _ => panic!(),
253                        }
254                    }
255                }
256                "memory" => {
257                    assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
258                    assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
259
260                    for (j, row) in rows.iter().enumerate() {
261                        let v = row.values[i].value_data.as_ref();
262                        match j {
263                            0 => assert_eq!(None, v),
264                            1 => assert_eq!(1029f64, extract_f64_value(v.as_ref().unwrap())),
265                            _ => panic!(),
266                        }
267                    }
268                }
269                "ts" => {
270                    assert_eq!(
271                        ColumnDataType::TimestampNanosecond as i32,
272                        column_schema.datatype
273                    );
274                    assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
275
276                    for (j, row) in rows.iter().enumerate() {
277                        let v = row.values[i].value_data.as_ref();
278                        match j {
279                            0 => assert_eq!(
280                                1663840496100023102,
281                                extract_ts_nanos_value(v.as_ref().unwrap())
282                            ),
283                            1 => assert_eq!(
284                                1663840496400340003,
285                                extract_ts_nanos_value(v.as_ref().unwrap())
286                            ),
287                            _ => panic!(),
288                        }
289                    }
290                }
291                _ => panic!(),
292            }
293        }
294    }
295
296    fn extract_string_value(value: &ValueData) -> &str {
297        match value {
298            ValueData::StringValue(v) => v,
299            _ => unreachable!(),
300        }
301    }
302
303    fn extract_f64_value(value: &ValueData) -> f64 {
304        match value {
305            ValueData::F64Value(v) => *v,
306            _ => unreachable!(),
307        }
308    }
309
310    fn extract_ts_nanos_value(value: &ValueData) -> i64 {
311        match value {
312            ValueData::TimestampNanosecondValue(v) => *v,
313            _ => unreachable!(),
314        }
315    }
316}