servers/
influxdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::value::ValueData;
16use api::v1::{ColumnDataType, RowInsertRequests};
17use common_grpc::precision::Precision;
18use hyper::Request;
19use influxdb_line_protocol::{parse_lines, FieldValue};
20use snafu::ResultExt;
21
22use crate::error::{Error, InfluxdbLineProtocolSnafu};
23use crate::row_writer::{self, MultiTableData};
24
25const INFLUXDB_API_PATH_NAME: &str = "influxdb";
26const INFLUXDB_API_V2_PATH_NAME: &str = "influxdb/api/v2";
27const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
28const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
29
30#[inline]
31pub(crate) fn is_influxdb_request<T>(req: &Request<T>) -> bool {
32    req.uri().path().contains(INFLUXDB_API_PATH_NAME)
33}
34
35#[inline]
36pub(crate) fn is_influxdb_v2_request<T>(req: &Request<T>) -> bool {
37    req.uri().path().contains(INFLUXDB_API_V2_PATH_NAME)
38}
39
40#[derive(Debug)]
41pub struct InfluxdbRequest {
42    pub precision: Option<Precision>,
43    pub lines: String,
44}
45
46impl TryFrom<InfluxdbRequest> for RowInsertRequests {
47    type Error = Error;
48
49    fn try_from(value: InfluxdbRequest) -> Result<Self, Self::Error> {
50        let lines = parse_lines(&value.lines)
51            .collect::<influxdb_line_protocol::Result<Vec<_>>>()
52            .context(InfluxdbLineProtocolSnafu)?;
53
54        let mut multi_table_data = MultiTableData::new();
55        let precision = unwrap_or_default_precision(value.precision);
56        for line in &lines {
57            let table_name = line.series.measurement.as_str();
58            let tags = &line.series.tag_set;
59            let fields = &line.field_set;
60            let ts = line.timestamp;
61            // tags.len + fields.len + timestamp(+1)
62            let num_columns = tags.as_ref().map(|x| x.len()).unwrap_or(0) + fields.len() + 1;
63
64            let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 0);
65            let mut one_row = table_data.alloc_one_row();
66
67            // tags
68            if let Some(tags) = tags {
69                let kvs = tags.iter().map(|(k, v)| (k.to_string(), v.to_string()));
70                row_writer::write_tags(table_data, kvs, &mut one_row)?;
71            }
72
73            // fields
74            let fields = fields.iter().map(|(k, v)| {
75                let (datatype, value) = match v {
76                    FieldValue::I64(v) => (ColumnDataType::Int64, Some(ValueData::I64Value(*v))),
77                    FieldValue::U64(v) => (ColumnDataType::Uint64, Some(ValueData::U64Value(*v))),
78                    FieldValue::F64(v) => (ColumnDataType::Float64, Some(ValueData::F64Value(*v))),
79                    FieldValue::String(v) => (
80                        ColumnDataType::String,
81                        Some(ValueData::StringValue(v.to_string())),
82                    ),
83                    FieldValue::Boolean(v) => {
84                        (ColumnDataType::Boolean, Some(ValueData::BoolValue(*v)))
85                    }
86                };
87                (k.to_string(), datatype, value)
88            });
89            row_writer::write_fields(table_data, fields, &mut one_row)?;
90
91            // timestamp
92            row_writer::write_ts_to_nanos(
93                table_data,
94                INFLUXDB_TIMESTAMP_COLUMN_NAME,
95                ts,
96                precision,
97                &mut one_row,
98            )?;
99
100            table_data.add_row(one_row);
101        }
102
103        Ok(multi_table_data.into_row_insert_requests().0)
104    }
105}
106
107#[inline]
108fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
109    if let Some(val) = precision {
110        val
111    } else {
112        DEFAULT_TIME_PRECISION
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use api::v1::value::ValueData;
119    use api::v1::{ColumnDataType, RowInsertRequests, Rows, SemanticType};
120
121    use crate::influxdb::InfluxdbRequest;
122
123    #[test]
124    fn test_convert_influxdb_lines_to_rows() {
125        let lines = r"
126monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
127monitor1,host=host2 memory=1027 1663840496400340001
128monitor2,host=host3 cpu=66.5 1663840496100023102
129monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
130
131        let influxdb_req = InfluxdbRequest {
132            precision: None,
133            lines: lines.to_string(),
134        };
135
136        let requests: RowInsertRequests = influxdb_req.try_into().unwrap();
137        assert_eq!(2, requests.inserts.len());
138
139        for request in requests.inserts {
140            match &request.table_name[..] {
141                "monitor1" => assert_monitor1_rows(&request.rows),
142                "monitor2" => assert_monitor2_rows(&request.rows),
143                _ => panic!(),
144            }
145        }
146    }
147
148    fn assert_monitor1_rows(rows: &Option<Rows>) {
149        let rows = rows.as_ref().unwrap();
150        let schema = &rows.schema;
151        let rows = &rows.rows;
152        assert_eq!(4, schema.len());
153        assert_eq!(2, rows.len());
154
155        for (i, column_schema) in schema.iter().enumerate() {
156            match &column_schema.column_name[..] {
157                "host" => {
158                    assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
159                    assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
160
161                    for (j, row) in rows.iter().enumerate() {
162                        let v = row.values[i].value_data.as_ref().unwrap();
163                        match j {
164                            0 => assert_eq!("host1", extract_string_value(v)),
165                            1 => assert_eq!("host2", extract_string_value(v)),
166                            _ => panic!(),
167                        }
168                    }
169                }
170                "cpu" => {
171                    assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
172                    assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
173
174                    for (j, row) in rows.iter().enumerate() {
175                        let v = row.values[i].value_data.as_ref();
176                        match j {
177                            0 => assert_eq!(66.6f64, extract_f64_value(v.as_ref().unwrap())),
178                            1 => assert_eq!(None, v),
179                            _ => panic!(),
180                        }
181                    }
182                }
183                "memory" => {
184                    assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
185                    assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
186
187                    for (j, row) in rows.iter().enumerate() {
188                        let v = row.values[i].value_data.as_ref();
189                        match j {
190                            0 => assert_eq!(1024f64, extract_f64_value(v.as_ref().unwrap())),
191                            1 => assert_eq!(1027f64, extract_f64_value(v.as_ref().unwrap())),
192                            _ => panic!(),
193                        }
194                    }
195                }
196                "ts" => {
197                    assert_eq!(
198                        ColumnDataType::TimestampNanosecond as i32,
199                        column_schema.datatype
200                    );
201                    assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
202
203                    for (j, row) in rows.iter().enumerate() {
204                        let v = row.values[i].value_data.as_ref();
205                        match j {
206                            0 => assert_eq!(
207                                1663840496100023100,
208                                extract_ts_nanos_value(v.as_ref().unwrap())
209                            ),
210                            1 => assert_eq!(
211                                1663840496400340001,
212                                extract_ts_nanos_value(v.as_ref().unwrap())
213                            ),
214                            _ => panic!(),
215                        }
216                    }
217                }
218                _ => panic!(),
219            }
220        }
221    }
222
223    fn assert_monitor2_rows(rows: &Option<Rows>) {
224        let rows = rows.as_ref().unwrap();
225        let schema = &rows.schema;
226        let rows = &rows.rows;
227        assert_eq!(4, schema.len());
228        assert_eq!(2, rows.len());
229
230        for (i, column_schema) in schema.iter().enumerate() {
231            match &column_schema.column_name[..] {
232                "host" => {
233                    assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
234                    assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
235
236                    for (j, row) in rows.iter().enumerate() {
237                        let v = row.values[i].value_data.as_ref().unwrap();
238                        match j {
239                            0 => assert_eq!("host3", extract_string_value(v)),
240                            1 => assert_eq!("host4", extract_string_value(v)),
241                            _ => panic!(),
242                        }
243                    }
244                }
245                "cpu" => {
246                    assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
247                    assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
248
249                    for (j, row) in rows.iter().enumerate() {
250                        let v = row.values[i].value_data.as_ref();
251                        match j {
252                            0 => assert_eq!(66.5f64, extract_f64_value(v.as_ref().unwrap())),
253                            1 => assert_eq!(66.3f64, extract_f64_value(v.as_ref().unwrap())),
254                            _ => panic!(),
255                        }
256                    }
257                }
258                "memory" => {
259                    assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
260                    assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
261
262                    for (j, row) in rows.iter().enumerate() {
263                        let v = row.values[i].value_data.as_ref();
264                        match j {
265                            0 => assert_eq!(None, v),
266                            1 => assert_eq!(1029f64, extract_f64_value(v.as_ref().unwrap())),
267                            _ => panic!(),
268                        }
269                    }
270                }
271                "ts" => {
272                    assert_eq!(
273                        ColumnDataType::TimestampNanosecond as i32,
274                        column_schema.datatype
275                    );
276                    assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
277
278                    for (j, row) in rows.iter().enumerate() {
279                        let v = row.values[i].value_data.as_ref();
280                        match j {
281                            0 => assert_eq!(
282                                1663840496100023102,
283                                extract_ts_nanos_value(v.as_ref().unwrap())
284                            ),
285                            1 => assert_eq!(
286                                1663840496400340003,
287                                extract_ts_nanos_value(v.as_ref().unwrap())
288                            ),
289                            _ => panic!(),
290                        }
291                    }
292                }
293                _ => panic!(),
294            }
295        }
296    }
297
298    fn extract_string_value(value: &ValueData) -> &str {
299        match value {
300            ValueData::StringValue(v) => v,
301            _ => unreachable!(),
302        }
303    }
304
305    fn extract_f64_value(value: &ValueData) -> f64 {
306        match value {
307            ValueData::F64Value(v) => *v,
308            _ => unreachable!(),
309        }
310    }
311
312    fn extract_ts_nanos_value(value: &ValueData) -> i64 {
313        match value {
314            ValueData::TimestampNanosecondValue(v) => *v,
315            _ => unreachable!(),
316        }
317    }
318}