tests_integration/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests {
17    use std::sync::Arc;
18
19    use api::prom_store::remote::label_matcher::Type as MatcherType;
20    use api::prom_store::remote::{
21        Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, WriteRequest,
22    };
23    use common_catalog::consts::DEFAULT_CATALOG_NAME;
24    use frontend::instance::Instance;
25    use prost::Message;
26    use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
27    use servers::prom_store;
28    use servers::prom_store::to_grpc_row_insert_requests;
29    use servers::query_handler::sql::SqlQueryHandler;
30    use servers::query_handler::PromStoreProtocolHandler;
31    use session::context::QueryContext;
32
33    use crate::standalone::GreptimeDbStandaloneBuilder;
34    use crate::tests;
35
36    #[tokio::test(flavor = "multi_thread")]
37    async fn test_standalone_prom_store_remote_rw_default_physical_table() {
38        common_telemetry::init_default_ut_logging();
39        let standalone = GreptimeDbStandaloneBuilder::new(
40            "test_standalone_prom_store_remote_rw_default_physical_table",
41        )
42        .build()
43        .await;
44        let instance = standalone.fe_instance();
45
46        test_prom_store_remote_rw(instance, None).await;
47    }
48
49    #[tokio::test(flavor = "multi_thread")]
50    async fn test_distributed_prom_store_remote_rw_default_physical_table() {
51        common_telemetry::init_default_ut_logging();
52        let distributed = tests::create_distributed_instance(
53            "test_distributed_prom_store_remote_rw_default_physical_table",
54        )
55        .await;
56        test_prom_store_remote_rw(&distributed.frontend(), None).await;
57    }
58
59    #[tokio::test(flavor = "multi_thread")]
60    async fn test_standalone_prom_store_remote_rw_custom_physical_table() {
61        common_telemetry::init_default_ut_logging();
62        let standalone = GreptimeDbStandaloneBuilder::new(
63            "test_standalone_prom_store_remote_rw_custom_physical_table",
64        )
65        .build()
66        .await;
67        let instance = standalone.fe_instance();
68
69        test_prom_store_remote_rw(instance, Some("my_custom_physical_table".to_string())).await;
70    }
71
72    #[tokio::test(flavor = "multi_thread")]
73    async fn test_distributed_prom_store_remote_rw_custom_physical_table() {
74        common_telemetry::init_default_ut_logging();
75        let distributed = tests::create_distributed_instance(
76            "test_distributed_prom_store_remote_rw_custom_physical_table",
77        )
78        .await;
79        test_prom_store_remote_rw(
80            &distributed.frontend(),
81            Some("my_custom_physical_table".to_string()),
82        )
83        .await;
84    }
85
86    async fn test_prom_store_remote_rw(instance: &Arc<Instance>, physical_table: Option<String>) {
87        let write_request = WriteRequest {
88            timeseries: prom_store::mock_timeseries(),
89            ..Default::default()
90        };
91
92        let db = "prometheus";
93        let mut ctx = Arc::into_inner(QueryContext::with(DEFAULT_CATALOG_NAME, db).into()).unwrap();
94
95        // set physical table if provided
96        if let Some(physical_table) = &physical_table {
97            ctx.set_extension(PHYSICAL_TABLE_PARAM.to_string(), physical_table.clone());
98        }
99        let ctx = Arc::new(ctx);
100
101        assert!(SqlQueryHandler::do_query(
102            instance.as_ref(),
103            "CREATE DATABASE IF NOT EXISTS prometheus",
104            ctx.clone(),
105        )
106        .await
107        .first()
108        .unwrap()
109        .is_ok());
110
111        let (row_inserts, _) = to_grpc_row_insert_requests(&write_request).unwrap();
112        instance
113            .write(row_inserts, ctx.clone(), true)
114            .await
115            .unwrap();
116
117        let read_request = ReadRequest {
118            queries: vec![
119                Query {
120                    start_timestamp_ms: 1000,
121                    end_timestamp_ms: 2000,
122                    matchers: vec![LabelMatcher {
123                        name: prom_store::METRIC_NAME_LABEL.to_string(),
124                        value: "metric1".to_string(),
125                        r#type: 0,
126                    }],
127                    ..Default::default()
128                },
129                Query {
130                    start_timestamp_ms: 1000,
131                    end_timestamp_ms: 3000,
132                    matchers: vec![
133                        LabelMatcher {
134                            name: prom_store::METRIC_NAME_LABEL.to_string(),
135                            value: "metric3".to_string(),
136                            r#type: 0,
137                        },
138                        LabelMatcher {
139                            name: "app".to_string(),
140                            value: "biz".to_string(),
141                            r#type: MatcherType::Eq as i32,
142                        },
143                    ],
144                    ..Default::default()
145                },
146            ],
147            ..Default::default()
148        };
149
150        let resp = instance.read(read_request, ctx.clone()).await.unwrap();
151        assert_eq!(resp.content_type, "application/x-protobuf");
152        assert_eq!(resp.content_encoding, "snappy");
153        let body = prom_store::snappy_decompress(&resp.body).unwrap();
154        let read_response = ReadResponse::decode(&body[..]).unwrap();
155        let query_results = read_response.results;
156        assert_eq!(2, query_results.len());
157
158        assert_eq!(1, query_results[0].timeseries.len());
159        let timeseries = &query_results[0].timeseries[0];
160
161        assert_eq!(
162            vec![
163                Label {
164                    name: prom_store::METRIC_NAME_LABEL.to_string(),
165                    value: "metric1".to_string(),
166                },
167                Label {
168                    name: "job".to_string(),
169                    value: "spark".to_string(),
170                },
171            ],
172            timeseries.labels
173        );
174
175        assert_eq!(
176            timeseries.samples,
177            vec![
178                Sample {
179                    value: 1.0,
180                    timestamp: 1000,
181                },
182                Sample {
183                    value: 2.0,
184                    timestamp: 2000,
185                }
186            ]
187        );
188
189        assert_eq!(1, query_results[1].timeseries.len());
190        let timeseries = &query_results[1].timeseries[0];
191
192        assert_eq!(
193            vec![
194                Label {
195                    name: prom_store::METRIC_NAME_LABEL.to_string(),
196                    value: "metric3".to_string(),
197                },
198                Label {
199                    name: "app".to_string(),
200                    value: "biz".to_string(),
201                },
202                Label {
203                    name: "idc".to_string(),
204                    value: "z002".to_string(),
205                },
206            ],
207            timeseries.labels
208        );
209
210        assert_eq!(
211            timeseries.samples,
212            vec![
213                Sample {
214                    value: 5.0,
215                    timestamp: 1000,
216                },
217                Sample {
218                    value: 6.0,
219                    timestamp: 2000,
220                },
221                Sample {
222                    value: 7.0,
223                    timestamp: 3000,
224                }
225            ]
226        );
227
228        // check physical table if provided
229        if let Some(physical_table) = physical_table {
230            let sql = format!("DESC TABLE {physical_table};");
231            instance.do_query(&sql, ctx).await[0].as_ref().unwrap();
232        }
233    }
234}