tests_integration/
influxdb.rs1#[cfg(test)]
16mod test {
17 use std::sync::Arc;
18
19 use client::OutputData;
20 use common_grpc::precision::Precision;
21 use common_recordbatch::RecordBatches;
22 use common_test_util::recordbatch::check_output_stream;
23 use rstest::rstest;
24 use rstest_reuse::apply;
25 use servers::influxdb::InfluxdbRequest;
26 use servers::query_handler::InfluxdbLineProtocolHandler;
27 use servers::query_handler::sql::SqlQueryHandler;
28 use session::context::QueryContext;
29
30 use crate::tests::test_util::{MockInstance, both_instances_cases, distributed, standalone};
31
32 #[apply(both_instances_cases)]
33 async fn test_put_influxdb_lines_without_time_column(instance: Arc<dyn MockInstance>) {
34 let instance = instance.frontend();
35
36 let lines = r"
37monitor1,host=host1 cpu=66.6,memory=1024
38monitor1,host=host2 memory=1027";
39 let request = InfluxdbRequest {
40 precision: None,
41 lines: lines.to_string(),
42 };
43 assert!(instance.exec(request, QueryContext::arc()).await.is_ok());
44
45 let mut output = instance
46 .do_query(
47 "SELECT greptime_timestamp, host, cpu, memory FROM monitor1 ORDER BY greptime_timestamp",
48 QueryContext::arc(),
49 )
50 .await;
51 let output = output.remove(0).unwrap();
52 let OutputData::Stream(stream) = output.data else {
53 unreachable!()
54 };
55
56 let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
57 let recordbatches: Vec<_> = recordbatches.iter().collect();
58 let total = recordbatches
59 .into_iter()
60 .fold(0, |total, recordbatch| total + recordbatch.num_rows());
61 assert_eq!(total, 2);
62 }
63
64 #[apply(both_instances_cases)]
65 async fn test_put_influxdb_lines(instance: Arc<dyn MockInstance>) {
66 let instance = instance.frontend();
67
68 let lines = r"
69monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
70monitor1,host=host2 memory=1027 1663840496400340001";
71 let request = InfluxdbRequest {
72 precision: None,
73 lines: lines.to_string(),
74 };
75 instance.exec(request, QueryContext::arc()).await.unwrap();
76
77 let mut output = instance
78 .do_query(
79 "SELECT greptime_timestamp, host, cpu, memory FROM monitor1 ORDER BY greptime_timestamp",
80 QueryContext::arc(),
81 )
82 .await;
83 let output = output.remove(0).unwrap();
84 let OutputData::Stream(stream) = output.data else {
85 unreachable!()
86 };
87 let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
88 assert_eq!(
89 recordbatches.pretty_print().unwrap(),
90 "\
91+-------------------------------+-------+------+--------+
92| greptime_timestamp | host | cpu | memory |
93+-------------------------------+-------+------+--------+
94| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 |
95| 2022-09-22T09:54:56.400340001 | host2 | | 1027.0 |
96+-------------------------------+-------+------+--------+"
97 );
98
99 let lines = r"
101monitor1,host=host2 cpu=32 1663840496400340001";
102 let request = InfluxdbRequest {
103 precision: None,
104 lines: lines.to_string(),
105 };
106 instance.exec(request, QueryContext::arc()).await.unwrap();
107
108 let mut output = instance
109 .do_query(
110 "SELECT greptime_timestamp, host, cpu, memory FROM monitor1 ORDER BY greptime_timestamp",
111 QueryContext::arc(),
112 )
113 .await;
114 let output = output.remove(0).unwrap();
115 let expected = "\
116+-------------------------------+-------+------+--------+
117| greptime_timestamp | host | cpu | memory |
118+-------------------------------+-------+------+--------+
119| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 |
120| 2022-09-22T09:54:56.400340001 | host2 | 32.0 | 1027.0 |
121+-------------------------------+-------+------+--------+";
122 check_output_stream(output.data, expected).await;
123 }
124
125 #[apply(both_instances_cases)]
126 async fn test_put_influxdb_lines_with_auto_aligning_timestamps(
127 instance: Arc<dyn MockInstance>,
128 ) {
129 let instance = instance.frontend();
130
131 let sql = "create table monitor (
133 ts timestamp time index,
134 host string primary key,
135 cpu double,
136 memory double,
137 )";
138 instance.do_query(sql, QueryContext::arc()).await;
139
140 let lines = r"
142monitor,host=127.0.0.1 cpu=0.1,memory=1.0 1719460800001
143monitor,host=127.0.0.2 cpu=0.2,memory=2.0 1719460800002";
144 let request = InfluxdbRequest {
145 precision: Some(Precision::Millisecond),
146 lines: lines.to_string(),
147 };
148 instance.exec(request, QueryContext::arc()).await.unwrap();
149
150 let lines = r"
155monitor,host=127.0.0.1 cpu=0.3,memory=3.0 1719460800003000000
156monitor,host=127.0.0.2 cpu=0.4,memory=4.0 1719460800004000000";
157 let request = InfluxdbRequest {
158 precision: None,
159 lines: lines.to_string(),
160 };
161 instance.exec(request, QueryContext::arc()).await.unwrap();
162
163 let lines = r"
165monitor,host=127.0.0.1 cpu=0.5,memory=5.0 1719460800005000000
166monitor,host=127.0.0.2 cpu=0.6,memory=6.0 1719460800006000000";
167 let request = InfluxdbRequest {
168 precision: Some(Precision::Nanosecond),
169 lines: lines.to_string(),
170 };
171 instance.exec(request, QueryContext::arc()).await.unwrap();
172
173 let lines = r"
175monitor,host=127.0.0.1 cpu=0.7,memory=7.0 1719460801
176monitor,host=127.0.0.2 cpu=0.8,memory=8.0 1719460802";
177 let request = InfluxdbRequest {
178 precision: Some(Precision::Second),
179 lines: lines.to_string(),
180 };
181 instance.exec(request, QueryContext::arc()).await.unwrap();
182
183 let mut output = instance
185 .do_query(
186 "SELECT ts, host, cpu, memory FROM monitor ORDER BY ts",
187 QueryContext::arc(),
188 )
189 .await;
190 let output = output.remove(0).unwrap();
191 let expected = "\
192+-------------------------+-----------+-----+--------+
193| ts | host | cpu | memory |
194+-------------------------+-----------+-----+--------+
195| 2024-06-27T04:00:00.001 | 127.0.0.1 | 0.1 | 1.0 |
196| 2024-06-27T04:00:00.002 | 127.0.0.2 | 0.2 | 2.0 |
197| 2024-06-27T04:00:00.003 | 127.0.0.1 | 0.3 | 3.0 |
198| 2024-06-27T04:00:00.004 | 127.0.0.2 | 0.4 | 4.0 |
199| 2024-06-27T04:00:00.005 | 127.0.0.1 | 0.5 | 5.0 |
200| 2024-06-27T04:00:00.006 | 127.0.0.2 | 0.6 | 6.0 |
201| 2024-06-27T04:00:01 | 127.0.0.1 | 0.7 | 7.0 |
202| 2024-06-27T04:00:02 | 127.0.0.2 | 0.8 | 8.0 |
203+-------------------------+-----------+-----+--------+";
204 check_output_stream(output.data, expected).await;
205 }
206}