tests_integration/
prom_store.rs1#[cfg(test)]
16mod tests {
17 use std::sync::Arc;
18
19 use api::prom_store::remote::label_matcher::Type as MatcherType;
20 use api::prom_store::remote::{
21 Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, WriteRequest,
22 };
23 use common_catalog::consts::DEFAULT_CATALOG_NAME;
24 use frontend::instance::Instance;
25 use prost::Message;
26 use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
27 use servers::prom_store;
28 use servers::prom_store::to_grpc_row_insert_requests;
29 use servers::query_handler::PromStoreProtocolHandler;
30 use servers::query_handler::sql::SqlQueryHandler;
31 use session::context::QueryContext;
32
33 use crate::standalone::GreptimeDbStandaloneBuilder;
34 use crate::tests;
35
36 #[tokio::test(flavor = "multi_thread")]
37 async fn test_standalone_prom_store_remote_rw_default_physical_table() {
38 common_telemetry::init_default_ut_logging();
39 let standalone = GreptimeDbStandaloneBuilder::new(
40 "test_standalone_prom_store_remote_rw_default_physical_table",
41 )
42 .build()
43 .await;
44 let instance = standalone.fe_instance();
45
46 test_prom_store_remote_rw(instance, None).await;
47 }
48
49 #[tokio::test(flavor = "multi_thread")]
50 async fn test_distributed_prom_store_remote_rw_default_physical_table() {
51 common_telemetry::init_default_ut_logging();
52 let distributed = tests::create_distributed_instance(
53 "test_distributed_prom_store_remote_rw_default_physical_table",
54 )
55 .await;
56 test_prom_store_remote_rw(&distributed.frontend(), None).await;
57 }
58
59 #[tokio::test(flavor = "multi_thread")]
60 async fn test_standalone_prom_store_remote_rw_custom_physical_table() {
61 common_telemetry::init_default_ut_logging();
62 let standalone = GreptimeDbStandaloneBuilder::new(
63 "test_standalone_prom_store_remote_rw_custom_physical_table",
64 )
65 .build()
66 .await;
67 let instance = standalone.fe_instance();
68
69 test_prom_store_remote_rw(instance, Some("my_custom_physical_table".to_string())).await;
70 }
71
72 #[tokio::test(flavor = "multi_thread")]
73 async fn test_distributed_prom_store_remote_rw_custom_physical_table() {
74 common_telemetry::init_default_ut_logging();
75 let distributed = tests::create_distributed_instance(
76 "test_distributed_prom_store_remote_rw_custom_physical_table",
77 )
78 .await;
79 test_prom_store_remote_rw(
80 &distributed.frontend(),
81 Some("my_custom_physical_table".to_string()),
82 )
83 .await;
84 }
85
86 async fn test_prom_store_remote_rw(instance: &Arc<Instance>, physical_table: Option<String>) {
87 let write_request = WriteRequest {
88 timeseries: prom_store::mock_timeseries(),
89 ..Default::default()
90 };
91
92 let db = "prometheus";
93 let mut ctx = Arc::into_inner(QueryContext::with(DEFAULT_CATALOG_NAME, db).into()).unwrap();
94
95 if let Some(physical_table) = &physical_table {
97 ctx.set_extension(PHYSICAL_TABLE_PARAM.to_string(), physical_table.clone());
98 }
99 let ctx = Arc::new(ctx);
100
101 assert!(
102 SqlQueryHandler::do_query(
103 instance.as_ref(),
104 "CREATE DATABASE IF NOT EXISTS prometheus",
105 ctx.clone(),
106 )
107 .await
108 .first()
109 .unwrap()
110 .is_ok()
111 );
112
113 let (row_inserts, _) = to_grpc_row_insert_requests(&write_request).unwrap();
114 instance
115 .write(row_inserts, ctx.clone(), true)
116 .await
117 .unwrap();
118
119 let read_request = ReadRequest {
120 queries: vec![
121 Query {
122 start_timestamp_ms: 1000,
123 end_timestamp_ms: 2000,
124 matchers: vec![LabelMatcher {
125 name: prom_store::METRIC_NAME_LABEL.to_string(),
126 value: "metric1".to_string(),
127 r#type: 0,
128 }],
129 ..Default::default()
130 },
131 Query {
132 start_timestamp_ms: 1000,
133 end_timestamp_ms: 3000,
134 matchers: vec![
135 LabelMatcher {
136 name: prom_store::METRIC_NAME_LABEL.to_string(),
137 value: "metric3".to_string(),
138 r#type: 0,
139 },
140 LabelMatcher {
141 name: "app".to_string(),
142 value: "biz".to_string(),
143 r#type: MatcherType::Eq as i32,
144 },
145 ],
146 ..Default::default()
147 },
148 ],
149 ..Default::default()
150 };
151
152 let resp = instance.read(read_request, ctx.clone()).await.unwrap();
153 assert_eq!(resp.content_type, "application/x-protobuf");
154 assert_eq!(resp.content_encoding, "snappy");
155 let body = prom_store::snappy_decompress(&resp.body).unwrap();
156 let read_response = ReadResponse::decode(&body[..]).unwrap();
157 let query_results = read_response.results;
158 assert_eq!(2, query_results.len());
159
160 assert_eq!(1, query_results[0].timeseries.len());
161 let timeseries = &query_results[0].timeseries[0];
162
163 assert_eq!(
164 vec![
165 Label {
166 name: prom_store::METRIC_NAME_LABEL.to_string(),
167 value: "metric1".to_string(),
168 },
169 Label {
170 name: "job".to_string(),
171 value: "spark".to_string(),
172 },
173 ],
174 timeseries.labels
175 );
176
177 assert_eq!(
178 timeseries.samples,
179 vec![
180 Sample {
181 value: 1.0,
182 timestamp: 1000,
183 },
184 Sample {
185 value: 2.0,
186 timestamp: 2000,
187 }
188 ]
189 );
190
191 assert_eq!(1, query_results[1].timeseries.len());
192 let timeseries = &query_results[1].timeseries[0];
193
194 assert_eq!(
195 vec![
196 Label {
197 name: prom_store::METRIC_NAME_LABEL.to_string(),
198 value: "metric3".to_string(),
199 },
200 Label {
201 name: "app".to_string(),
202 value: "biz".to_string(),
203 },
204 Label {
205 name: "idc".to_string(),
206 value: "z002".to_string(),
207 },
208 ],
209 timeseries.labels
210 );
211
212 assert_eq!(
213 timeseries.samples,
214 vec![
215 Sample {
216 value: 5.0,
217 timestamp: 1000,
218 },
219 Sample {
220 value: 6.0,
221 timestamp: 2000,
222 },
223 Sample {
224 value: 7.0,
225 timestamp: 3000,
226 }
227 ]
228 );
229
230 if let Some(physical_table) = physical_table {
232 let sql = format!("DESC TABLE {physical_table};");
233 instance.do_query(&sql, ctx).await[0].as_ref().unwrap();
234 }
235 }
236}