1use api::v1::value::ValueData;
16use api::v1::{ColumnDataType, RowInsertRequests};
17use common_grpc::precision::Precision;
18use hyper::Request;
19use influxdb_line_protocol::{parse_lines, FieldValue};
20use snafu::ResultExt;
21
22use crate::error::{Error, InfluxdbLineProtocolSnafu};
23use crate::row_writer::{self, MultiTableData};
24
25const INFLUXDB_API_PATH_NAME: &str = "influxdb";
26const INFLUXDB_API_V2_PATH_NAME: &str = "influxdb/api/v2";
27const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
28const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
29
30#[inline]
31pub(crate) fn is_influxdb_request<T>(req: &Request<T>) -> bool {
32 req.uri().path().contains(INFLUXDB_API_PATH_NAME)
33}
34
35#[inline]
36pub(crate) fn is_influxdb_v2_request<T>(req: &Request<T>) -> bool {
37 req.uri().path().contains(INFLUXDB_API_V2_PATH_NAME)
38}
39
40#[derive(Debug)]
41pub struct InfluxdbRequest {
42 pub precision: Option<Precision>,
43 pub lines: String,
44}
45
46impl TryFrom<InfluxdbRequest> for RowInsertRequests {
47 type Error = Error;
48
49 fn try_from(value: InfluxdbRequest) -> Result<Self, Self::Error> {
50 let lines = parse_lines(&value.lines)
51 .collect::<influxdb_line_protocol::Result<Vec<_>>>()
52 .context(InfluxdbLineProtocolSnafu)?;
53
54 let mut multi_table_data = MultiTableData::new();
55 let precision = unwrap_or_default_precision(value.precision);
56 for line in &lines {
57 let table_name = line.series.measurement.as_str();
58 let tags = &line.series.tag_set;
59 let fields = &line.field_set;
60 let ts = line.timestamp;
61 let num_columns = tags.as_ref().map(|x| x.len()).unwrap_or(0) + fields.len() + 1;
63
64 let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 0);
65 let mut one_row = table_data.alloc_one_row();
66
67 if let Some(tags) = tags {
69 let kvs = tags.iter().map(|(k, v)| (k.to_string(), v.to_string()));
70 row_writer::write_tags(table_data, kvs, &mut one_row)?;
71 }
72
73 let fields = fields.iter().map(|(k, v)| {
75 let (datatype, value) = match v {
76 FieldValue::I64(v) => (ColumnDataType::Int64, Some(ValueData::I64Value(*v))),
77 FieldValue::U64(v) => (ColumnDataType::Uint64, Some(ValueData::U64Value(*v))),
78 FieldValue::F64(v) => (ColumnDataType::Float64, Some(ValueData::F64Value(*v))),
79 FieldValue::String(v) => (
80 ColumnDataType::String,
81 Some(ValueData::StringValue(v.to_string())),
82 ),
83 FieldValue::Boolean(v) => {
84 (ColumnDataType::Boolean, Some(ValueData::BoolValue(*v)))
85 }
86 };
87 (k.to_string(), datatype, value)
88 });
89 row_writer::write_fields(table_data, fields, &mut one_row)?;
90
91 row_writer::write_ts_to_nanos(
93 table_data,
94 INFLUXDB_TIMESTAMP_COLUMN_NAME,
95 ts,
96 precision,
97 &mut one_row,
98 )?;
99
100 table_data.add_row(one_row);
101 }
102
103 Ok(multi_table_data.into_row_insert_requests().0)
104 }
105}
106
107#[inline]
108fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
109 if let Some(val) = precision {
110 val
111 } else {
112 DEFAULT_TIME_PRECISION
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use api::v1::value::ValueData;
119 use api::v1::{ColumnDataType, RowInsertRequests, Rows, SemanticType};
120
121 use crate::influxdb::InfluxdbRequest;
122
123 #[test]
124 fn test_convert_influxdb_lines_to_rows() {
125 let lines = r"
126monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
127monitor1,host=host2 memory=1027 1663840496400340001
128monitor2,host=host3 cpu=66.5 1663840496100023102
129monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
130
131 let influxdb_req = InfluxdbRequest {
132 precision: None,
133 lines: lines.to_string(),
134 };
135
136 let requests: RowInsertRequests = influxdb_req.try_into().unwrap();
137 assert_eq!(2, requests.inserts.len());
138
139 for request in requests.inserts {
140 match &request.table_name[..] {
141 "monitor1" => assert_monitor1_rows(&request.rows),
142 "monitor2" => assert_monitor2_rows(&request.rows),
143 _ => panic!(),
144 }
145 }
146 }
147
148 fn assert_monitor1_rows(rows: &Option<Rows>) {
149 let rows = rows.as_ref().unwrap();
150 let schema = &rows.schema;
151 let rows = &rows.rows;
152 assert_eq!(4, schema.len());
153 assert_eq!(2, rows.len());
154
155 for (i, column_schema) in schema.iter().enumerate() {
156 match &column_schema.column_name[..] {
157 "host" => {
158 assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
159 assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
160
161 for (j, row) in rows.iter().enumerate() {
162 let v = row.values[i].value_data.as_ref().unwrap();
163 match j {
164 0 => assert_eq!("host1", extract_string_value(v)),
165 1 => assert_eq!("host2", extract_string_value(v)),
166 _ => panic!(),
167 }
168 }
169 }
170 "cpu" => {
171 assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
172 assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
173
174 for (j, row) in rows.iter().enumerate() {
175 let v = row.values[i].value_data.as_ref();
176 match j {
177 0 => assert_eq!(66.6f64, extract_f64_value(v.as_ref().unwrap())),
178 1 => assert_eq!(None, v),
179 _ => panic!(),
180 }
181 }
182 }
183 "memory" => {
184 assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
185 assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
186
187 for (j, row) in rows.iter().enumerate() {
188 let v = row.values[i].value_data.as_ref();
189 match j {
190 0 => assert_eq!(1024f64, extract_f64_value(v.as_ref().unwrap())),
191 1 => assert_eq!(1027f64, extract_f64_value(v.as_ref().unwrap())),
192 _ => panic!(),
193 }
194 }
195 }
196 "ts" => {
197 assert_eq!(
198 ColumnDataType::TimestampNanosecond as i32,
199 column_schema.datatype
200 );
201 assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
202
203 for (j, row) in rows.iter().enumerate() {
204 let v = row.values[i].value_data.as_ref();
205 match j {
206 0 => assert_eq!(
207 1663840496100023100,
208 extract_ts_nanos_value(v.as_ref().unwrap())
209 ),
210 1 => assert_eq!(
211 1663840496400340001,
212 extract_ts_nanos_value(v.as_ref().unwrap())
213 ),
214 _ => panic!(),
215 }
216 }
217 }
218 _ => panic!(),
219 }
220 }
221 }
222
223 fn assert_monitor2_rows(rows: &Option<Rows>) {
224 let rows = rows.as_ref().unwrap();
225 let schema = &rows.schema;
226 let rows = &rows.rows;
227 assert_eq!(4, schema.len());
228 assert_eq!(2, rows.len());
229
230 for (i, column_schema) in schema.iter().enumerate() {
231 match &column_schema.column_name[..] {
232 "host" => {
233 assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
234 assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
235
236 for (j, row) in rows.iter().enumerate() {
237 let v = row.values[i].value_data.as_ref().unwrap();
238 match j {
239 0 => assert_eq!("host3", extract_string_value(v)),
240 1 => assert_eq!("host4", extract_string_value(v)),
241 _ => panic!(),
242 }
243 }
244 }
245 "cpu" => {
246 assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
247 assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
248
249 for (j, row) in rows.iter().enumerate() {
250 let v = row.values[i].value_data.as_ref();
251 match j {
252 0 => assert_eq!(66.5f64, extract_f64_value(v.as_ref().unwrap())),
253 1 => assert_eq!(66.3f64, extract_f64_value(v.as_ref().unwrap())),
254 _ => panic!(),
255 }
256 }
257 }
258 "memory" => {
259 assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
260 assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
261
262 for (j, row) in rows.iter().enumerate() {
263 let v = row.values[i].value_data.as_ref();
264 match j {
265 0 => assert_eq!(None, v),
266 1 => assert_eq!(1029f64, extract_f64_value(v.as_ref().unwrap())),
267 _ => panic!(),
268 }
269 }
270 }
271 "ts" => {
272 assert_eq!(
273 ColumnDataType::TimestampNanosecond as i32,
274 column_schema.datatype
275 );
276 assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
277
278 for (j, row) in rows.iter().enumerate() {
279 let v = row.values[i].value_data.as_ref();
280 match j {
281 0 => assert_eq!(
282 1663840496100023102,
283 extract_ts_nanos_value(v.as_ref().unwrap())
284 ),
285 1 => assert_eq!(
286 1663840496400340003,
287 extract_ts_nanos_value(v.as_ref().unwrap())
288 ),
289 _ => panic!(),
290 }
291 }
292 }
293 _ => panic!(),
294 }
295 }
296 }
297
298 fn extract_string_value(value: &ValueData) -> &str {
299 match value {
300 ValueData::StringValue(v) => v,
301 _ => unreachable!(),
302 }
303 }
304
305 fn extract_f64_value(value: &ValueData) -> f64 {
306 match value {
307 ValueData::F64Value(v) => *v,
308 _ => unreachable!(),
309 }
310 }
311
312 fn extract_ts_nanos_value(value: &ValueData) -> i64 {
313 match value {
314 ValueData::TimestampNanosecondValue(v) => *v,
315 _ => unreachable!(),
316 }
317 }
318}