tests_integration/
influxdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod test {
17    use std::sync::Arc;
18
19    use client::OutputData;
20    use common_grpc::precision::Precision;
21    use common_recordbatch::RecordBatches;
22    use common_test_util::recordbatch::check_output_stream;
23    use rstest::rstest;
24    use rstest_reuse::apply;
25    use servers::influxdb::InfluxdbRequest;
26    use servers::query_handler::InfluxdbLineProtocolHandler;
27    use servers::query_handler::sql::SqlQueryHandler;
28    use session::context::QueryContext;
29
30    use crate::tests::test_util::{MockInstance, both_instances_cases, distributed, standalone};
31
32    #[apply(both_instances_cases)]
33    async fn test_put_influxdb_lines_without_time_column(instance: Arc<dyn MockInstance>) {
34        let instance = instance.frontend();
35
36        let lines = r"
37monitor1,host=host1 cpu=66.6,memory=1024
38monitor1,host=host2 memory=1027";
39        let request = InfluxdbRequest {
40            precision: None,
41            lines: lines.to_string(),
42        };
43        assert!(instance.exec(request, QueryContext::arc()).await.is_ok());
44
45        let mut output = instance
46            .do_query(
47                "SELECT greptime_timestamp, host, cpu, memory FROM monitor1 ORDER BY greptime_timestamp",
48                QueryContext::arc(),
49            )
50            .await;
51        let output = output.remove(0).unwrap();
52        let OutputData::Stream(stream) = output.data else {
53            unreachable!()
54        };
55
56        let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
57        let recordbatches: Vec<_> = recordbatches.iter().collect();
58        let total = recordbatches
59            .into_iter()
60            .fold(0, |total, recordbatch| total + recordbatch.num_rows());
61        assert_eq!(total, 2);
62    }
63
64    #[apply(both_instances_cases)]
65    async fn test_put_influxdb_lines(instance: Arc<dyn MockInstance>) {
66        let instance = instance.frontend();
67
68        let lines = r"
69monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
70monitor1,host=host2 memory=1027 1663840496400340001";
71        let request = InfluxdbRequest {
72            precision: None,
73            lines: lines.to_string(),
74        };
75        instance.exec(request, QueryContext::arc()).await.unwrap();
76
77        let mut output = instance
78            .do_query(
79                "SELECT greptime_timestamp, host, cpu, memory FROM monitor1 ORDER BY greptime_timestamp",
80                QueryContext::arc(),
81            )
82            .await;
83        let output = output.remove(0).unwrap();
84        let OutputData::Stream(stream) = output.data else {
85            unreachable!()
86        };
87        let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
88        assert_eq!(
89            recordbatches.pretty_print().unwrap(),
90            "\
91+-------------------------------+-------+------+--------+
92| greptime_timestamp            | host  | cpu  | memory |
93+-------------------------------+-------+------+--------+
94| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 |
95| 2022-09-22T09:54:56.400340001 | host2 |      | 1027.0 |
96+-------------------------------+-------+------+--------+"
97        );
98
99        // Put the cpu column for host2.
100        let lines = r"
101monitor1,host=host2 cpu=32 1663840496400340001";
102        let request = InfluxdbRequest {
103            precision: None,
104            lines: lines.to_string(),
105        };
106        instance.exec(request, QueryContext::arc()).await.unwrap();
107
108        let mut output = instance
109            .do_query(
110                "SELECT greptime_timestamp, host, cpu, memory FROM monitor1 ORDER BY greptime_timestamp",
111                QueryContext::arc(),
112            )
113            .await;
114        let output = output.remove(0).unwrap();
115        let expected = "\
116+-------------------------------+-------+------+--------+
117| greptime_timestamp            | host  | cpu  | memory |
118+-------------------------------+-------+------+--------+
119| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 |
120| 2022-09-22T09:54:56.400340001 | host2 | 32.0 | 1027.0 |
121+-------------------------------+-------+------+--------+";
122        check_output_stream(output.data, expected).await;
123    }
124
125    #[apply(both_instances_cases)]
126    async fn test_put_influxdb_lines_with_auto_aligning_timestamps(
127        instance: Arc<dyn MockInstance>,
128    ) {
129        let instance = instance.frontend();
130
131        // First create a table with millisecond time index.
132        let sql = "create table monitor (
133                    ts timestamp time index,
134                    host string primary key,
135                    cpu double,
136                    memory double,
137                )";
138        instance.do_query(sql, QueryContext::arc()).await;
139
140        // Insert some influxdb lines with millisecond precision.
141        let lines = r"
142monitor,host=127.0.0.1 cpu=0.1,memory=1.0 1719460800001
143monitor,host=127.0.0.2 cpu=0.2,memory=2.0 1719460800002";
144        let request = InfluxdbRequest {
145            precision: Some(Precision::Millisecond),
146            lines: lines.to_string(),
147        };
148        instance.exec(request, QueryContext::arc()).await.unwrap();
149
150        // Insert some influxdb lines without precision.
151        // According to the specification (both v1 and v2), if precision is not set, it is default
152        // to "nanosecond". The lines here will be converted to insert requests as usual and then
153        // be aligned to millisecond time unit.
154        let lines = r"
155monitor,host=127.0.0.1 cpu=0.3,memory=3.0 1719460800003000000
156monitor,host=127.0.0.2 cpu=0.4,memory=4.0 1719460800004000000";
157        let request = InfluxdbRequest {
158            precision: None,
159            lines: lines.to_string(),
160        };
161        instance.exec(request, QueryContext::arc()).await.unwrap();
162
163        // Insert other influxdb lines with nanosecond precision.
164        let lines = r"
165monitor,host=127.0.0.1 cpu=0.5,memory=5.0 1719460800005000000
166monitor,host=127.0.0.2 cpu=0.6,memory=6.0 1719460800006000000";
167        let request = InfluxdbRequest {
168            precision: Some(Precision::Nanosecond),
169            lines: lines.to_string(),
170        };
171        instance.exec(request, QueryContext::arc()).await.unwrap();
172
173        // Insert other influxdb lines with second precision.
174        let lines = r"
175monitor,host=127.0.0.1 cpu=0.7,memory=7.0 1719460801
176monitor,host=127.0.0.2 cpu=0.8,memory=8.0 1719460802";
177        let request = InfluxdbRequest {
178            precision: Some(Precision::Second),
179            lines: lines.to_string(),
180        };
181        instance.exec(request, QueryContext::arc()).await.unwrap();
182
183        // Check the data.
184        let mut output = instance
185            .do_query(
186                "SELECT ts, host, cpu, memory FROM monitor ORDER BY ts",
187                QueryContext::arc(),
188            )
189            .await;
190        let output = output.remove(0).unwrap();
191        let expected = "\
192+-------------------------+-----------+-----+--------+
193| ts                      | host      | cpu | memory |
194+-------------------------+-----------+-----+--------+
195| 2024-06-27T04:00:00.001 | 127.0.0.1 | 0.1 | 1.0    |
196| 2024-06-27T04:00:00.002 | 127.0.0.2 | 0.2 | 2.0    |
197| 2024-06-27T04:00:00.003 | 127.0.0.1 | 0.3 | 3.0    |
198| 2024-06-27T04:00:00.004 | 127.0.0.2 | 0.4 | 4.0    |
199| 2024-06-27T04:00:00.005 | 127.0.0.1 | 0.5 | 5.0    |
200| 2024-06-27T04:00:00.006 | 127.0.0.2 | 0.6 | 6.0    |
201| 2024-06-27T04:00:01     | 127.0.0.1 | 0.7 | 7.0    |
202| 2024-06-27T04:00:02     | 127.0.0.2 | 0.8 | 8.0    |
203+-------------------------+-----------+-----+--------+";
204        check_output_stream(output.data, expected).await;
205    }
206}