servers/
influxdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::value::ValueData;
16use api::v1::{ColumnDataType, RowInsertRequests};
17use common_grpc::precision::Precision;
18use common_query::prelude::greptime_timestamp;
19use hyper::Request;
20use influxdb_line_protocol::{FieldValue, parse_lines};
21use snafu::ResultExt;
22
23use crate::error::{Error, InfluxdbLineProtocolSnafu};
24use crate::row_writer::{self, MultiTableData};
25
26const INFLUXDB_API_PATH_NAME: &str = "influxdb";
27const INFLUXDB_API_V2_PATH_NAME: &str = "influxdb/api/v2";
28const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
29
30#[inline]
31pub(crate) fn is_influxdb_request<T>(req: &Request<T>) -> bool {
32    req.uri().path().contains(INFLUXDB_API_PATH_NAME)
33}
34
35#[inline]
36pub(crate) fn is_influxdb_v2_request<T>(req: &Request<T>) -> bool {
37    req.uri().path().contains(INFLUXDB_API_V2_PATH_NAME)
38}
39
40#[derive(Debug)]
41pub struct InfluxdbRequest {
42    pub precision: Option<Precision>,
43    pub lines: String,
44}
45
46impl TryFrom<InfluxdbRequest> for RowInsertRequests {
47    type Error = Error;
48
49    fn try_from(value: InfluxdbRequest) -> Result<Self, Self::Error> {
50        let lines = parse_lines(&value.lines)
51            .collect::<influxdb_line_protocol::Result<Vec<_>>>()
52            .context(InfluxdbLineProtocolSnafu)?;
53
54        let mut multi_table_data = MultiTableData::new();
55        let precision = unwrap_or_default_precision(value.precision);
56        for line in &lines {
57            let table_name = line.series.measurement.as_str();
58            let tags = &line.series.tag_set;
59            let fields = &line.field_set;
60            let ts = line.timestamp;
61            // tags.len + fields.len + timestamp(+1)
62            let num_columns = tags.as_ref().map(|x| x.len()).unwrap_or(0) + fields.len() + 1;
63
64            let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 0);
65            let mut one_row = table_data.alloc_one_row();
66
67            // tags
68            if let Some(tags) = tags {
69                let kvs = tags.iter().map(|(k, v)| (k.to_string(), v.to_string()));
70                row_writer::write_tags(table_data, kvs, &mut one_row)?;
71            }
72
73            // fields
74            let fields = fields.iter().map(|(k, v)| {
75                let (datatype, value) = match v {
76                    FieldValue::I64(v) => (ColumnDataType::Int64, Some(ValueData::I64Value(*v))),
77                    FieldValue::U64(v) => (ColumnDataType::Uint64, Some(ValueData::U64Value(*v))),
78                    FieldValue::F64(v) => (ColumnDataType::Float64, Some(ValueData::F64Value(*v))),
79                    FieldValue::String(v) => (
80                        ColumnDataType::String,
81                        Some(ValueData::StringValue(v.to_string())),
82                    ),
83                    FieldValue::Boolean(v) => {
84                        (ColumnDataType::Boolean, Some(ValueData::BoolValue(*v)))
85                    }
86                };
87                (k.to_string(), datatype, value)
88            });
89            row_writer::write_fields(table_data, fields, &mut one_row)?;
90
91            // timestamp
92            row_writer::write_ts_to_nanos(
93                table_data,
94                greptime_timestamp(),
95                ts,
96                precision,
97                &mut one_row,
98            )?;
99
100            table_data.add_row(one_row);
101        }
102
103        Ok(multi_table_data.into_row_insert_requests().0)
104    }
105}
106
107#[inline]
108fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
109    if let Some(val) = precision {
110        val
111    } else {
112        DEFAULT_TIME_PRECISION
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use api::v1::value::ValueData;
119    use api::v1::{ColumnDataType, RowInsertRequests, Rows, SemanticType};
120    use common_query::prelude::greptime_timestamp;
121
122    use crate::influxdb::InfluxdbRequest;
123
124    #[test]
125    fn test_convert_influxdb_lines_to_rows() {
126        let lines = r"
127monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
128monitor1,host=host2 memory=1027 1663840496400340001
129monitor2,host=host3 cpu=66.5 1663840496100023102
130monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
131
132        let influxdb_req = InfluxdbRequest {
133            precision: None,
134            lines: lines.to_string(),
135        };
136
137        let requests: RowInsertRequests = influxdb_req.try_into().unwrap();
138        assert_eq!(2, requests.inserts.len());
139
140        for request in requests.inserts {
141            match &request.table_name[..] {
142                "monitor1" => assert_monitor1_rows(&request.rows),
143                "monitor2" => assert_monitor2_rows(&request.rows),
144                _ => panic!(),
145            }
146        }
147    }
148
149    fn assert_monitor1_rows(rows: &Option<Rows>) {
150        let rows = rows.as_ref().unwrap();
151        let schema = &rows.schema;
152        let rows = &rows.rows;
153        assert_eq!(4, schema.len());
154        assert_eq!(2, rows.len());
155
156        for (i, column_schema) in schema.iter().enumerate() {
157            match &column_schema.column_name[..] {
158                "host" => {
159                    assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
160                    assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
161
162                    for (j, row) in rows.iter().enumerate() {
163                        let v = row.values[i].value_data.as_ref().unwrap();
164                        match j {
165                            0 => assert_eq!("host1", extract_string_value(v)),
166                            1 => assert_eq!("host2", extract_string_value(v)),
167                            _ => panic!(),
168                        }
169                    }
170                }
171                "cpu" => {
172                    assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
173                    assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
174
175                    for (j, row) in rows.iter().enumerate() {
176                        let v = row.values[i].value_data.as_ref();
177                        match j {
178                            0 => assert_eq!(66.6f64, extract_f64_value(v.as_ref().unwrap())),
179                            1 => assert_eq!(None, v),
180                            _ => panic!(),
181                        }
182                    }
183                }
184                "memory" => {
185                    assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
186                    assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
187
188                    for (j, row) in rows.iter().enumerate() {
189                        let v = row.values[i].value_data.as_ref();
190                        match j {
191                            0 => assert_eq!(1024f64, extract_f64_value(v.as_ref().unwrap())),
192                            1 => assert_eq!(1027f64, extract_f64_value(v.as_ref().unwrap())),
193                            _ => panic!(),
194                        }
195                    }
196                }
197                _ if column_schema.column_name == greptime_timestamp() => {
198                    assert_eq!(
199                        ColumnDataType::TimestampNanosecond as i32,
200                        column_schema.datatype
201                    );
202                    assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
203
204                    for (j, row) in rows.iter().enumerate() {
205                        let v = row.values[i].value_data.as_ref();
206                        match j {
207                            0 => assert_eq!(
208                                1663840496100023100,
209                                extract_ts_nanos_value(v.as_ref().unwrap())
210                            ),
211                            1 => assert_eq!(
212                                1663840496400340001,
213                                extract_ts_nanos_value(v.as_ref().unwrap())
214                            ),
215                            _ => panic!(),
216                        }
217                    }
218                }
219                _ => panic!(),
220            }
221        }
222    }
223
224    fn assert_monitor2_rows(rows: &Option<Rows>) {
225        let rows = rows.as_ref().unwrap();
226        let schema = &rows.schema;
227        let rows = &rows.rows;
228        assert_eq!(4, schema.len());
229        assert_eq!(2, rows.len());
230
231        for (i, column_schema) in schema.iter().enumerate() {
232            match &column_schema.column_name[..] {
233                "host" => {
234                    assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
235                    assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
236
237                    for (j, row) in rows.iter().enumerate() {
238                        let v = row.values[i].value_data.as_ref().unwrap();
239                        match j {
240                            0 => assert_eq!("host3", extract_string_value(v)),
241                            1 => assert_eq!("host4", extract_string_value(v)),
242                            _ => panic!(),
243                        }
244                    }
245                }
246                "cpu" => {
247                    assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
248                    assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
249
250                    for (j, row) in rows.iter().enumerate() {
251                        let v = row.values[i].value_data.as_ref();
252                        match j {
253                            0 => assert_eq!(66.5f64, extract_f64_value(v.as_ref().unwrap())),
254                            1 => assert_eq!(66.3f64, extract_f64_value(v.as_ref().unwrap())),
255                            _ => panic!(),
256                        }
257                    }
258                }
259                "memory" => {
260                    assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
261                    assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
262
263                    for (j, row) in rows.iter().enumerate() {
264                        let v = row.values[i].value_data.as_ref();
265                        match j {
266                            0 => assert_eq!(None, v),
267                            1 => assert_eq!(1029f64, extract_f64_value(v.as_ref().unwrap())),
268                            _ => panic!(),
269                        }
270                    }
271                }
272                _ if column_schema.column_name == greptime_timestamp() => {
273                    assert_eq!(
274                        ColumnDataType::TimestampNanosecond as i32,
275                        column_schema.datatype
276                    );
277                    assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
278
279                    for (j, row) in rows.iter().enumerate() {
280                        let v = row.values[i].value_data.as_ref();
281                        match j {
282                            0 => assert_eq!(
283                                1663840496100023102,
284                                extract_ts_nanos_value(v.as_ref().unwrap())
285                            ),
286                            1 => assert_eq!(
287                                1663840496400340003,
288                                extract_ts_nanos_value(v.as_ref().unwrap())
289                            ),
290                            _ => panic!(),
291                        }
292                    }
293                }
294                _ => panic!(),
295            }
296        }
297    }
298
299    fn extract_string_value(value: &ValueData) -> &str {
300        match value {
301            ValueData::StringValue(v) => v,
302            _ => unreachable!(),
303        }
304    }
305
306    fn extract_f64_value(value: &ValueData) -> f64 {
307        match value {
308            ValueData::F64Value(v) => *v,
309            _ => unreachable!(),
310        }
311    }
312
313    fn extract_ts_nanos_value(value: &ValueData) -> i64 {
314        match value {
315            ValueData::TimestampNanosecondValue(v) => *v,
316            _ => unreachable!(),
317        }
318    }
319}