1use api::v1::value::ValueData;
16use api::v1::{ColumnDataType, RowInsertRequests};
17use common_grpc::precision::Precision;
18use hyper::Request;
19use influxdb_line_protocol::{parse_lines, FieldValue};
20use snafu::ResultExt;
21
22use crate::error::{Error, InfluxdbLineProtocolSnafu};
23use crate::row_writer::{self, MultiTableData};
24
25const INFLUXDB_API_PATH_NAME: &str = "influxdb";
26const INFLUXDB_API_V2_PATH_NAME: &str = "influxdb/api/v2";
27const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
28const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
29
30#[inline]
31pub(crate) fn is_influxdb_request<T>(req: &Request<T>) -> bool {
32 req.uri().path().contains(INFLUXDB_API_PATH_NAME)
33}
34
35#[inline]
36pub(crate) fn is_influxdb_v2_request<T>(req: &Request<T>) -> bool {
37 req.uri().path().contains(INFLUXDB_API_V2_PATH_NAME)
38}
39
40#[derive(Debug)]
41pub struct InfluxdbRequest {
42 pub precision: Option<Precision>,
43 pub lines: String,
44}
45
46impl TryFrom<InfluxdbRequest> for RowInsertRequests {
47 type Error = Error;
48
49 fn try_from(value: InfluxdbRequest) -> Result<Self, Self::Error> {
50 let lines = parse_lines(&value.lines)
51 .collect::<influxdb_line_protocol::Result<Vec<_>>>()
52 .context(InfluxdbLineProtocolSnafu)?;
53
54 let mut multi_table_data = MultiTableData::new();
55 let precision = unwrap_or_default_precision(value.precision);
56 for line in &lines {
57 let table_name = line.series.measurement.as_str();
58 let tags = &line.series.tag_set;
59 let fields = &line.field_set;
60 let ts = line.timestamp;
61 let num_columns = tags.as_ref().map(|x| x.len()).unwrap_or(0) + fields.len() + 1;
63
64 let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 0);
65 let mut one_row = table_data.alloc_one_row();
66
67 if let Some(tags) = tags {
69 let kvs = tags.iter().map(|(k, v)| (k.to_string(), v.to_string()));
70 row_writer::write_tags(table_data, kvs, &mut one_row)?;
71 }
72
73 let fields = fields.iter().map(|(k, v)| {
75 let (datatype, value) = match v {
76 FieldValue::I64(v) => (ColumnDataType::Int64, ValueData::I64Value(*v)),
77 FieldValue::U64(v) => (ColumnDataType::Uint64, ValueData::U64Value(*v)),
78 FieldValue::F64(v) => (ColumnDataType::Float64, ValueData::F64Value(*v)),
79 FieldValue::String(v) => (
80 ColumnDataType::String,
81 ValueData::StringValue(v.to_string()),
82 ),
83 FieldValue::Boolean(v) => (ColumnDataType::Boolean, ValueData::BoolValue(*v)),
84 };
85 (k.to_string(), datatype, value)
86 });
87 row_writer::write_fields(table_data, fields, &mut one_row)?;
88
89 row_writer::write_ts_to_nanos(
91 table_data,
92 INFLUXDB_TIMESTAMP_COLUMN_NAME,
93 ts,
94 precision,
95 &mut one_row,
96 )?;
97
98 table_data.add_row(one_row);
99 }
100
101 Ok(multi_table_data.into_row_insert_requests().0)
102 }
103}
104
105#[inline]
106fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
107 if let Some(val) = precision {
108 val
109 } else {
110 DEFAULT_TIME_PRECISION
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use api::v1::value::ValueData;
117 use api::v1::{ColumnDataType, RowInsertRequests, Rows, SemanticType};
118
119 use crate::influxdb::InfluxdbRequest;
120
121 #[test]
122 fn test_convert_influxdb_lines_to_rows() {
123 let lines = r"
124monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
125monitor1,host=host2 memory=1027 1663840496400340001
126monitor2,host=host3 cpu=66.5 1663840496100023102
127monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
128
129 let influxdb_req = InfluxdbRequest {
130 precision: None,
131 lines: lines.to_string(),
132 };
133
134 let requests: RowInsertRequests = influxdb_req.try_into().unwrap();
135 assert_eq!(2, requests.inserts.len());
136
137 for request in requests.inserts {
138 match &request.table_name[..] {
139 "monitor1" => assert_monitor1_rows(&request.rows),
140 "monitor2" => assert_monitor2_rows(&request.rows),
141 _ => panic!(),
142 }
143 }
144 }
145
146 fn assert_monitor1_rows(rows: &Option<Rows>) {
147 let rows = rows.as_ref().unwrap();
148 let schema = &rows.schema;
149 let rows = &rows.rows;
150 assert_eq!(4, schema.len());
151 assert_eq!(2, rows.len());
152
153 for (i, column_schema) in schema.iter().enumerate() {
154 match &column_schema.column_name[..] {
155 "host" => {
156 assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
157 assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
158
159 for (j, row) in rows.iter().enumerate() {
160 let v = row.values[i].value_data.as_ref().unwrap();
161 match j {
162 0 => assert_eq!("host1", extract_string_value(v)),
163 1 => assert_eq!("host2", extract_string_value(v)),
164 _ => panic!(),
165 }
166 }
167 }
168 "cpu" => {
169 assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
170 assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
171
172 for (j, row) in rows.iter().enumerate() {
173 let v = row.values[i].value_data.as_ref();
174 match j {
175 0 => assert_eq!(66.6f64, extract_f64_value(v.as_ref().unwrap())),
176 1 => assert_eq!(None, v),
177 _ => panic!(),
178 }
179 }
180 }
181 "memory" => {
182 assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
183 assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
184
185 for (j, row) in rows.iter().enumerate() {
186 let v = row.values[i].value_data.as_ref();
187 match j {
188 0 => assert_eq!(1024f64, extract_f64_value(v.as_ref().unwrap())),
189 1 => assert_eq!(1027f64, extract_f64_value(v.as_ref().unwrap())),
190 _ => panic!(),
191 }
192 }
193 }
194 "ts" => {
195 assert_eq!(
196 ColumnDataType::TimestampNanosecond as i32,
197 column_schema.datatype
198 );
199 assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
200
201 for (j, row) in rows.iter().enumerate() {
202 let v = row.values[i].value_data.as_ref();
203 match j {
204 0 => assert_eq!(
205 1663840496100023100,
206 extract_ts_nanos_value(v.as_ref().unwrap())
207 ),
208 1 => assert_eq!(
209 1663840496400340001,
210 extract_ts_nanos_value(v.as_ref().unwrap())
211 ),
212 _ => panic!(),
213 }
214 }
215 }
216 _ => panic!(),
217 }
218 }
219 }
220
221 fn assert_monitor2_rows(rows: &Option<Rows>) {
222 let rows = rows.as_ref().unwrap();
223 let schema = &rows.schema;
224 let rows = &rows.rows;
225 assert_eq!(4, schema.len());
226 assert_eq!(2, rows.len());
227
228 for (i, column_schema) in schema.iter().enumerate() {
229 match &column_schema.column_name[..] {
230 "host" => {
231 assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
232 assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
233
234 for (j, row) in rows.iter().enumerate() {
235 let v = row.values[i].value_data.as_ref().unwrap();
236 match j {
237 0 => assert_eq!("host3", extract_string_value(v)),
238 1 => assert_eq!("host4", extract_string_value(v)),
239 _ => panic!(),
240 }
241 }
242 }
243 "cpu" => {
244 assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
245 assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
246
247 for (j, row) in rows.iter().enumerate() {
248 let v = row.values[i].value_data.as_ref();
249 match j {
250 0 => assert_eq!(66.5f64, extract_f64_value(v.as_ref().unwrap())),
251 1 => assert_eq!(66.3f64, extract_f64_value(v.as_ref().unwrap())),
252 _ => panic!(),
253 }
254 }
255 }
256 "memory" => {
257 assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
258 assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
259
260 for (j, row) in rows.iter().enumerate() {
261 let v = row.values[i].value_data.as_ref();
262 match j {
263 0 => assert_eq!(None, v),
264 1 => assert_eq!(1029f64, extract_f64_value(v.as_ref().unwrap())),
265 _ => panic!(),
266 }
267 }
268 }
269 "ts" => {
270 assert_eq!(
271 ColumnDataType::TimestampNanosecond as i32,
272 column_schema.datatype
273 );
274 assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
275
276 for (j, row) in rows.iter().enumerate() {
277 let v = row.values[i].value_data.as_ref();
278 match j {
279 0 => assert_eq!(
280 1663840496100023102,
281 extract_ts_nanos_value(v.as_ref().unwrap())
282 ),
283 1 => assert_eq!(
284 1663840496400340003,
285 extract_ts_nanos_value(v.as_ref().unwrap())
286 ),
287 _ => panic!(),
288 }
289 }
290 }
291 _ => panic!(),
292 }
293 }
294 }
295
296 fn extract_string_value(value: &ValueData) -> &str {
297 match value {
298 ValueData::StringValue(v) => v,
299 _ => unreachable!(),
300 }
301 }
302
303 fn extract_f64_value(value: &ValueData) -> f64 {
304 match value {
305 ValueData::F64Value(v) => *v,
306 _ => unreachable!(),
307 }
308 }
309
310 fn extract_ts_nanos_value(value: &ValueData) -> i64 {
311 match value {
312 ValueData::TimestampNanosecondValue(v) => *v,
313 _ => unreachable!(),
314 }
315 }
316}