tests_integration/
influxdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod test {
17    use std::sync::Arc;
18
19    use client::OutputData;
20    use common_recordbatch::RecordBatches;
21    use rstest::rstest;
22    use rstest_reuse::apply;
23    use servers::influxdb::InfluxdbRequest;
24    use servers::query_handler::sql::SqlQueryHandler;
25    use servers::query_handler::InfluxdbLineProtocolHandler;
26    use session::context::QueryContext;
27
28    use crate::tests::test_util::{both_instances_cases, distributed, standalone, MockInstance};
29
30    #[apply(both_instances_cases)]
31    async fn test_put_influxdb_lines_without_time_column(instance: Arc<dyn MockInstance>) {
32        let instance = instance.frontend();
33
34        let lines = r"
35monitor1,host=host1 cpu=66.6,memory=1024
36monitor1,host=host2 memory=1027";
37        let request = InfluxdbRequest {
38            precision: None,
39            lines: lines.to_string(),
40        };
41        assert!(instance.exec(request, QueryContext::arc()).await.is_ok());
42
43        let mut output = instance
44            .do_query(
45                "SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts",
46                QueryContext::arc(),
47            )
48            .await;
49        let output = output.remove(0).unwrap();
50        let OutputData::Stream(stream) = output.data else {
51            unreachable!()
52        };
53
54        let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
55        let recordbatches: Vec<_> = recordbatches.iter().collect();
56        let total = recordbatches
57            .into_iter()
58            .fold(0, |total, recordbatch| total + recordbatch.num_rows());
59        assert_eq!(total, 2);
60    }
61
62    #[apply(both_instances_cases)]
63    async fn test_put_influxdb_lines(instance: Arc<dyn MockInstance>) {
64        let instance = instance.frontend();
65
66        let lines = r"
67monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
68monitor1,host=host2 memory=1027 1663840496400340001";
69        let request = InfluxdbRequest {
70            precision: None,
71            lines: lines.to_string(),
72        };
73        instance.exec(request, QueryContext::arc()).await.unwrap();
74
75        let mut output = instance
76            .do_query(
77                "SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts",
78                QueryContext::arc(),
79            )
80            .await;
81        let output = output.remove(0).unwrap();
82        let OutputData::Stream(stream) = output.data else {
83            unreachable!()
84        };
85        let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
86        assert_eq!(
87            recordbatches.pretty_print().unwrap(),
88            "\
89+-------------------------------+-------+------+--------+
90| ts                            | host  | cpu  | memory |
91+-------------------------------+-------+------+--------+
92| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 |
93| 2022-09-22T09:54:56.400340001 | host2 |      | 1027.0 |
94+-------------------------------+-------+------+--------+"
95        );
96
97        // Put the cpu column for host2.
98        let lines = r"
99monitor1,host=host2 cpu=32 1663840496400340001";
100        let request = InfluxdbRequest {
101            precision: None,
102            lines: lines.to_string(),
103        };
104        instance.exec(request, QueryContext::arc()).await.unwrap();
105
106        let mut output = instance
107            .do_query(
108                "SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts",
109                QueryContext::arc(),
110            )
111            .await;
112        let output = output.remove(0).unwrap();
113        let OutputData::Stream(stream) = output.data else {
114            unreachable!()
115        };
116        let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
117        assert_eq!(
118            recordbatches.pretty_print().unwrap(),
119            "\
120+-------------------------------+-------+------+--------+
121| ts                            | host  | cpu  | memory |
122+-------------------------------+-------+------+--------+
123| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 |
124| 2022-09-22T09:54:56.400340001 | host2 | 32.0 | 1027.0 |
125+-------------------------------+-------+------+--------+"
126        );
127    }
128}