1use api::v1::value::ValueData;
16use api::v1::{ColumnDataType, RowInsertRequests};
17use common_grpc::precision::Precision;
18use common_query::prelude::greptime_timestamp;
19use hyper::Request;
20use influxdb_line_protocol::{FieldValue, parse_lines};
21use snafu::ResultExt;
22
23use crate::error::{Error, InfluxdbLineProtocolSnafu};
24use crate::row_writer::{self, MultiTableData};
25
26const INFLUXDB_API_PATH_NAME: &str = "influxdb";
27const INFLUXDB_API_V2_PATH_NAME: &str = "influxdb/api/v2";
28const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
29
30#[inline]
31pub(crate) fn is_influxdb_request<T>(req: &Request<T>) -> bool {
32 req.uri().path().contains(INFLUXDB_API_PATH_NAME)
33}
34
35#[inline]
36pub(crate) fn is_influxdb_v2_request<T>(req: &Request<T>) -> bool {
37 req.uri().path().contains(INFLUXDB_API_V2_PATH_NAME)
38}
39
40#[derive(Debug)]
41pub struct InfluxdbRequest {
42 pub precision: Option<Precision>,
43 pub lines: String,
44}
45
46impl TryFrom<InfluxdbRequest> for RowInsertRequests {
47 type Error = Error;
48
49 fn try_from(value: InfluxdbRequest) -> Result<Self, Self::Error> {
50 let lines = parse_lines(&value.lines)
51 .collect::<influxdb_line_protocol::Result<Vec<_>>>()
52 .context(InfluxdbLineProtocolSnafu)?;
53
54 let mut multi_table_data = MultiTableData::new();
55 let precision = unwrap_or_default_precision(value.precision);
56 for line in &lines {
57 let table_name = line.series.measurement.as_str();
58 let tags = &line.series.tag_set;
59 let fields = &line.field_set;
60 let ts = line.timestamp;
61 let num_columns = tags.as_ref().map(|x| x.len()).unwrap_or(0) + fields.len() + 1;
63
64 let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 0);
65 let mut one_row = table_data.alloc_one_row();
66
67 if let Some(tags) = tags {
69 let kvs = tags.iter().map(|(k, v)| (k.to_string(), v.to_string()));
70 row_writer::write_tags(table_data, kvs, &mut one_row)?;
71 }
72
73 let fields = fields.iter().map(|(k, v)| {
75 let (datatype, value) = match v {
76 FieldValue::I64(v) => (ColumnDataType::Int64, Some(ValueData::I64Value(*v))),
77 FieldValue::U64(v) => (ColumnDataType::Uint64, Some(ValueData::U64Value(*v))),
78 FieldValue::F64(v) => (ColumnDataType::Float64, Some(ValueData::F64Value(*v))),
79 FieldValue::String(v) => (
80 ColumnDataType::String,
81 Some(ValueData::StringValue(v.to_string())),
82 ),
83 FieldValue::Boolean(v) => {
84 (ColumnDataType::Boolean, Some(ValueData::BoolValue(*v)))
85 }
86 };
87 (k.to_string(), datatype, value)
88 });
89 row_writer::write_fields(table_data, fields, &mut one_row)?;
90
91 row_writer::write_ts_to_nanos(
93 table_data,
94 greptime_timestamp(),
95 ts,
96 precision,
97 &mut one_row,
98 )?;
99
100 table_data.add_row(one_row);
101 }
102
103 Ok(multi_table_data.into_row_insert_requests().0)
104 }
105}
106
107#[inline]
108fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
109 if let Some(val) = precision {
110 val
111 } else {
112 DEFAULT_TIME_PRECISION
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use api::v1::value::ValueData;
119 use api::v1::{ColumnDataType, RowInsertRequests, Rows, SemanticType};
120 use common_query::prelude::greptime_timestamp;
121
122 use crate::influxdb::InfluxdbRequest;
123
124 #[test]
125 fn test_convert_influxdb_lines_to_rows() {
126 let lines = r"
127monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
128monitor1,host=host2 memory=1027 1663840496400340001
129monitor2,host=host3 cpu=66.5 1663840496100023102
130monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
131
132 let influxdb_req = InfluxdbRequest {
133 precision: None,
134 lines: lines.to_string(),
135 };
136
137 let requests: RowInsertRequests = influxdb_req.try_into().unwrap();
138 assert_eq!(2, requests.inserts.len());
139
140 for request in requests.inserts {
141 match &request.table_name[..] {
142 "monitor1" => assert_monitor1_rows(&request.rows),
143 "monitor2" => assert_monitor2_rows(&request.rows),
144 _ => panic!(),
145 }
146 }
147 }
148
149 fn assert_monitor1_rows(rows: &Option<Rows>) {
150 let rows = rows.as_ref().unwrap();
151 let schema = &rows.schema;
152 let rows = &rows.rows;
153 assert_eq!(4, schema.len());
154 assert_eq!(2, rows.len());
155
156 for (i, column_schema) in schema.iter().enumerate() {
157 match &column_schema.column_name[..] {
158 "host" => {
159 assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
160 assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
161
162 for (j, row) in rows.iter().enumerate() {
163 let v = row.values[i].value_data.as_ref().unwrap();
164 match j {
165 0 => assert_eq!("host1", extract_string_value(v)),
166 1 => assert_eq!("host2", extract_string_value(v)),
167 _ => panic!(),
168 }
169 }
170 }
171 "cpu" => {
172 assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
173 assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
174
175 for (j, row) in rows.iter().enumerate() {
176 let v = row.values[i].value_data.as_ref();
177 match j {
178 0 => assert_eq!(66.6f64, extract_f64_value(v.as_ref().unwrap())),
179 1 => assert_eq!(None, v),
180 _ => panic!(),
181 }
182 }
183 }
184 "memory" => {
185 assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
186 assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
187
188 for (j, row) in rows.iter().enumerate() {
189 let v = row.values[i].value_data.as_ref();
190 match j {
191 0 => assert_eq!(1024f64, extract_f64_value(v.as_ref().unwrap())),
192 1 => assert_eq!(1027f64, extract_f64_value(v.as_ref().unwrap())),
193 _ => panic!(),
194 }
195 }
196 }
197 _ if column_schema.column_name == greptime_timestamp() => {
198 assert_eq!(
199 ColumnDataType::TimestampNanosecond as i32,
200 column_schema.datatype
201 );
202 assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
203
204 for (j, row) in rows.iter().enumerate() {
205 let v = row.values[i].value_data.as_ref();
206 match j {
207 0 => assert_eq!(
208 1663840496100023100,
209 extract_ts_nanos_value(v.as_ref().unwrap())
210 ),
211 1 => assert_eq!(
212 1663840496400340001,
213 extract_ts_nanos_value(v.as_ref().unwrap())
214 ),
215 _ => panic!(),
216 }
217 }
218 }
219 _ => panic!(),
220 }
221 }
222 }
223
224 fn assert_monitor2_rows(rows: &Option<Rows>) {
225 let rows = rows.as_ref().unwrap();
226 let schema = &rows.schema;
227 let rows = &rows.rows;
228 assert_eq!(4, schema.len());
229 assert_eq!(2, rows.len());
230
231 for (i, column_schema) in schema.iter().enumerate() {
232 match &column_schema.column_name[..] {
233 "host" => {
234 assert_eq!(ColumnDataType::String as i32, column_schema.datatype);
235 assert_eq!(SemanticType::Tag as i32, column_schema.semantic_type);
236
237 for (j, row) in rows.iter().enumerate() {
238 let v = row.values[i].value_data.as_ref().unwrap();
239 match j {
240 0 => assert_eq!("host3", extract_string_value(v)),
241 1 => assert_eq!("host4", extract_string_value(v)),
242 _ => panic!(),
243 }
244 }
245 }
246 "cpu" => {
247 assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
248 assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
249
250 for (j, row) in rows.iter().enumerate() {
251 let v = row.values[i].value_data.as_ref();
252 match j {
253 0 => assert_eq!(66.5f64, extract_f64_value(v.as_ref().unwrap())),
254 1 => assert_eq!(66.3f64, extract_f64_value(v.as_ref().unwrap())),
255 _ => panic!(),
256 }
257 }
258 }
259 "memory" => {
260 assert_eq!(ColumnDataType::Float64 as i32, column_schema.datatype);
261 assert_eq!(SemanticType::Field as i32, column_schema.semantic_type);
262
263 for (j, row) in rows.iter().enumerate() {
264 let v = row.values[i].value_data.as_ref();
265 match j {
266 0 => assert_eq!(None, v),
267 1 => assert_eq!(1029f64, extract_f64_value(v.as_ref().unwrap())),
268 _ => panic!(),
269 }
270 }
271 }
272 _ if column_schema.column_name == greptime_timestamp() => {
273 assert_eq!(
274 ColumnDataType::TimestampNanosecond as i32,
275 column_schema.datatype
276 );
277 assert_eq!(SemanticType::Timestamp as i32, column_schema.semantic_type);
278
279 for (j, row) in rows.iter().enumerate() {
280 let v = row.values[i].value_data.as_ref();
281 match j {
282 0 => assert_eq!(
283 1663840496100023102,
284 extract_ts_nanos_value(v.as_ref().unwrap())
285 ),
286 1 => assert_eq!(
287 1663840496400340003,
288 extract_ts_nanos_value(v.as_ref().unwrap())
289 ),
290 _ => panic!(),
291 }
292 }
293 }
294 _ => panic!(),
295 }
296 }
297 }
298
299 fn extract_string_value(value: &ValueData) -> &str {
300 match value {
301 ValueData::StringValue(v) => v,
302 _ => unreachable!(),
303 }
304 }
305
306 fn extract_f64_value(value: &ValueData) -> f64 {
307 match value {
308 ValueData::F64Value(v) => *v,
309 _ => unreachable!(),
310 }
311 }
312
313 fn extract_ts_nanos_value(value: &ValueData) -> i64 {
314 match value {
315 ValueData::TimestampNanosecondValue(v) => *v,
316 _ => unreachable!(),
317 }
318 }
319}