tests_integration/
influxdb.rs1#[cfg(test)]
16mod test {
17 use std::sync::Arc;
18
19 use client::OutputData;
20 use common_recordbatch::RecordBatches;
21 use rstest::rstest;
22 use rstest_reuse::apply;
23 use servers::influxdb::InfluxdbRequest;
24 use servers::query_handler::sql::SqlQueryHandler;
25 use servers::query_handler::InfluxdbLineProtocolHandler;
26 use session::context::QueryContext;
27
28 use crate::tests::test_util::{both_instances_cases, distributed, standalone, MockInstance};
29
30 #[apply(both_instances_cases)]
31 async fn test_put_influxdb_lines_without_time_column(instance: Arc<dyn MockInstance>) {
32 let instance = instance.frontend();
33
34 let lines = r"
35monitor1,host=host1 cpu=66.6,memory=1024
36monitor1,host=host2 memory=1027";
37 let request = InfluxdbRequest {
38 precision: None,
39 lines: lines.to_string(),
40 };
41 assert!(instance.exec(request, QueryContext::arc()).await.is_ok());
42
43 let mut output = instance
44 .do_query(
45 "SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts",
46 QueryContext::arc(),
47 )
48 .await;
49 let output = output.remove(0).unwrap();
50 let OutputData::Stream(stream) = output.data else {
51 unreachable!()
52 };
53
54 let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
55 let recordbatches: Vec<_> = recordbatches.iter().collect();
56 let total = recordbatches
57 .into_iter()
58 .fold(0, |total, recordbatch| total + recordbatch.num_rows());
59 assert_eq!(total, 2);
60 }
61
62 #[apply(both_instances_cases)]
63 async fn test_put_influxdb_lines(instance: Arc<dyn MockInstance>) {
64 let instance = instance.frontend();
65
66 let lines = r"
67monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
68monitor1,host=host2 memory=1027 1663840496400340001";
69 let request = InfluxdbRequest {
70 precision: None,
71 lines: lines.to_string(),
72 };
73 instance.exec(request, QueryContext::arc()).await.unwrap();
74
75 let mut output = instance
76 .do_query(
77 "SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts",
78 QueryContext::arc(),
79 )
80 .await;
81 let output = output.remove(0).unwrap();
82 let OutputData::Stream(stream) = output.data else {
83 unreachable!()
84 };
85 let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
86 assert_eq!(
87 recordbatches.pretty_print().unwrap(),
88 "\
89+-------------------------------+-------+------+--------+
90| ts | host | cpu | memory |
91+-------------------------------+-------+------+--------+
92| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 |
93| 2022-09-22T09:54:56.400340001 | host2 | | 1027.0 |
94+-------------------------------+-------+------+--------+"
95 );
96
97 let lines = r"
99monitor1,host=host2 cpu=32 1663840496400340001";
100 let request = InfluxdbRequest {
101 precision: None,
102 lines: lines.to_string(),
103 };
104 instance.exec(request, QueryContext::arc()).await.unwrap();
105
106 let mut output = instance
107 .do_query(
108 "SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts",
109 QueryContext::arc(),
110 )
111 .await;
112 let output = output.remove(0).unwrap();
113 let OutputData::Stream(stream) = output.data else {
114 unreachable!()
115 };
116 let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
117 assert_eq!(
118 recordbatches.pretty_print().unwrap(),
119 "\
120+-------------------------------+-------+------+--------+
121| ts | host | cpu | memory |
122+-------------------------------+-------+------+--------+
123| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 |
124| 2022-09-22T09:54:56.400340001 | host2 | 32.0 | 1027.0 |
125+-------------------------------+-------+------+--------+"
126 );
127 }
128}