tests_integration/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests {
17    use std::sync::Arc;
18
19    use api::prom_store::remote::label_matcher::Type as MatcherType;
20    use api::prom_store::remote::{
21        Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, WriteRequest,
22    };
23    use common_catalog::consts::DEFAULT_CATALOG_NAME;
24    use frontend::instance::Instance;
25    use prost::Message;
26    use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
27    use servers::prom_store;
28    use servers::prom_store::to_grpc_row_insert_requests;
29    use servers::query_handler::PromStoreProtocolHandler;
30    use servers::query_handler::sql::SqlQueryHandler;
31    use session::context::QueryContext;
32
33    use crate::standalone::GreptimeDbStandaloneBuilder;
34    use crate::tests;
35
36    #[tokio::test(flavor = "multi_thread")]
37    async fn test_standalone_prom_store_remote_rw_default_physical_table() {
38        common_telemetry::init_default_ut_logging();
39        let standalone = GreptimeDbStandaloneBuilder::new(
40            "test_standalone_prom_store_remote_rw_default_physical_table",
41        )
42        .build()
43        .await;
44        let instance = standalone.fe_instance();
45
46        test_prom_store_remote_rw(instance, None).await;
47    }
48
49    #[tokio::test(flavor = "multi_thread")]
50    async fn test_distributed_prom_store_remote_rw_default_physical_table() {
51        common_telemetry::init_default_ut_logging();
52        let distributed = tests::create_distributed_instance(
53            "test_distributed_prom_store_remote_rw_default_physical_table",
54        )
55        .await;
56        test_prom_store_remote_rw(&distributed.frontend(), None).await;
57    }
58
59    #[tokio::test(flavor = "multi_thread")]
60    async fn test_standalone_prom_store_remote_rw_custom_physical_table() {
61        common_telemetry::init_default_ut_logging();
62        let standalone = GreptimeDbStandaloneBuilder::new(
63            "test_standalone_prom_store_remote_rw_custom_physical_table",
64        )
65        .build()
66        .await;
67        let instance = standalone.fe_instance();
68
69        test_prom_store_remote_rw(instance, Some("my_custom_physical_table".to_string())).await;
70    }
71
72    #[tokio::test(flavor = "multi_thread")]
73    async fn test_distributed_prom_store_remote_rw_custom_physical_table() {
74        common_telemetry::init_default_ut_logging();
75        let distributed = tests::create_distributed_instance(
76            "test_distributed_prom_store_remote_rw_custom_physical_table",
77        )
78        .await;
79        test_prom_store_remote_rw(
80            &distributed.frontend(),
81            Some("my_custom_physical_table".to_string()),
82        )
83        .await;
84    }
85
86    async fn test_prom_store_remote_rw(instance: &Arc<Instance>, physical_table: Option<String>) {
87        let write_request = WriteRequest {
88            timeseries: prom_store::mock_timeseries(),
89            ..Default::default()
90        };
91
92        let db = "prometheus";
93        let mut ctx = Arc::into_inner(QueryContext::with(DEFAULT_CATALOG_NAME, db).into()).unwrap();
94
95        // set physical table if provided
96        if let Some(physical_table) = &physical_table {
97            ctx.set_extension(PHYSICAL_TABLE_PARAM.to_string(), physical_table.clone());
98        }
99        let ctx = Arc::new(ctx);
100
101        assert!(
102            SqlQueryHandler::do_query(
103                instance.as_ref(),
104                "CREATE DATABASE IF NOT EXISTS prometheus",
105                ctx.clone(),
106            )
107            .await
108            .first()
109            .unwrap()
110            .is_ok()
111        );
112
113        let (row_inserts, _) = to_grpc_row_insert_requests(&write_request).unwrap();
114        instance
115            .write(row_inserts, ctx.clone(), true)
116            .await
117            .unwrap();
118
119        let read_request = ReadRequest {
120            queries: vec![
121                Query {
122                    start_timestamp_ms: 1000,
123                    end_timestamp_ms: 2000,
124                    matchers: vec![LabelMatcher {
125                        name: prom_store::METRIC_NAME_LABEL.to_string(),
126                        value: "metric1".to_string(),
127                        r#type: 0,
128                    }],
129                    ..Default::default()
130                },
131                Query {
132                    start_timestamp_ms: 1000,
133                    end_timestamp_ms: 3000,
134                    matchers: vec![
135                        LabelMatcher {
136                            name: prom_store::METRIC_NAME_LABEL.to_string(),
137                            value: "metric3".to_string(),
138                            r#type: 0,
139                        },
140                        LabelMatcher {
141                            name: "app".to_string(),
142                            value: "biz".to_string(),
143                            r#type: MatcherType::Eq as i32,
144                        },
145                    ],
146                    ..Default::default()
147                },
148            ],
149            ..Default::default()
150        };
151
152        let resp = instance.read(read_request, ctx.clone()).await.unwrap();
153        assert_eq!(resp.content_type, "application/x-protobuf");
154        assert_eq!(resp.content_encoding, "snappy");
155        let body = prom_store::snappy_decompress(&resp.body).unwrap();
156        let read_response = ReadResponse::decode(&body[..]).unwrap();
157        let query_results = read_response.results;
158        assert_eq!(2, query_results.len());
159
160        assert_eq!(1, query_results[0].timeseries.len());
161        let timeseries = &query_results[0].timeseries[0];
162
163        assert_eq!(
164            vec![
165                Label {
166                    name: prom_store::METRIC_NAME_LABEL.to_string(),
167                    value: "metric1".to_string(),
168                },
169                Label {
170                    name: "job".to_string(),
171                    value: "spark".to_string(),
172                },
173            ],
174            timeseries.labels
175        );
176
177        assert_eq!(
178            timeseries.samples,
179            vec![
180                Sample {
181                    value: 1.0,
182                    timestamp: 1000,
183                },
184                Sample {
185                    value: 2.0,
186                    timestamp: 2000,
187                }
188            ]
189        );
190
191        assert_eq!(1, query_results[1].timeseries.len());
192        let timeseries = &query_results[1].timeseries[0];
193
194        assert_eq!(
195            vec![
196                Label {
197                    name: prom_store::METRIC_NAME_LABEL.to_string(),
198                    value: "metric3".to_string(),
199                },
200                Label {
201                    name: "app".to_string(),
202                    value: "biz".to_string(),
203                },
204                Label {
205                    name: "idc".to_string(),
206                    value: "z002".to_string(),
207                },
208            ],
209            timeseries.labels
210        );
211
212        assert_eq!(
213            timeseries.samples,
214            vec![
215                Sample {
216                    value: 5.0,
217                    timestamp: 1000,
218                },
219                Sample {
220                    value: 6.0,
221                    timestamp: 2000,
222                },
223                Sample {
224                    value: 7.0,
225                    timestamp: 3000,
226                }
227            ]
228        );
229
230        // check physical table if provided
231        if let Some(physical_table) = physical_table {
232            let sql = format!("DESC TABLE {physical_table};");
233            instance.do_query(&sql, ctx).await[0].as_ref().unwrap();
234        }
235    }
236}