tests_integration/
prom_store.rs1#[cfg(test)]
16mod tests {
17 use std::sync::Arc;
18
19 use api::prom_store::remote::label_matcher::Type as MatcherType;
20 use api::prom_store::remote::{
21 Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, WriteRequest,
22 };
23 use common_catalog::consts::DEFAULT_CATALOG_NAME;
24 use frontend::instance::Instance;
25 use prost::Message;
26 use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
27 use servers::prom_store;
28 use servers::prom_store::to_grpc_row_insert_requests;
29 use servers::query_handler::sql::SqlQueryHandler;
30 use servers::query_handler::PromStoreProtocolHandler;
31 use session::context::QueryContext;
32
33 use crate::standalone::GreptimeDbStandaloneBuilder;
34 use crate::tests;
35
36 #[tokio::test(flavor = "multi_thread")]
37 async fn test_standalone_prom_store_remote_rw_default_physical_table() {
38 common_telemetry::init_default_ut_logging();
39 let standalone = GreptimeDbStandaloneBuilder::new(
40 "test_standalone_prom_store_remote_rw_default_physical_table",
41 )
42 .build()
43 .await;
44 let instance = standalone.fe_instance();
45
46 test_prom_store_remote_rw(instance, None).await;
47 }
48
49 #[tokio::test(flavor = "multi_thread")]
50 async fn test_distributed_prom_store_remote_rw_default_physical_table() {
51 common_telemetry::init_default_ut_logging();
52 let distributed = tests::create_distributed_instance(
53 "test_distributed_prom_store_remote_rw_default_physical_table",
54 )
55 .await;
56 test_prom_store_remote_rw(&distributed.frontend(), None).await;
57 }
58
59 #[tokio::test(flavor = "multi_thread")]
60 async fn test_standalone_prom_store_remote_rw_custom_physical_table() {
61 common_telemetry::init_default_ut_logging();
62 let standalone = GreptimeDbStandaloneBuilder::new(
63 "test_standalone_prom_store_remote_rw_custom_physical_table",
64 )
65 .build()
66 .await;
67 let instance = standalone.fe_instance();
68
69 test_prom_store_remote_rw(instance, Some("my_custom_physical_table".to_string())).await;
70 }
71
72 #[tokio::test(flavor = "multi_thread")]
73 async fn test_distributed_prom_store_remote_rw_custom_physical_table() {
74 common_telemetry::init_default_ut_logging();
75 let distributed = tests::create_distributed_instance(
76 "test_distributed_prom_store_remote_rw_custom_physical_table",
77 )
78 .await;
79 test_prom_store_remote_rw(
80 &distributed.frontend(),
81 Some("my_custom_physical_table".to_string()),
82 )
83 .await;
84 }
85
86 async fn test_prom_store_remote_rw(instance: &Arc<Instance>, physical_table: Option<String>) {
87 let write_request = WriteRequest {
88 timeseries: prom_store::mock_timeseries(),
89 ..Default::default()
90 };
91
92 let db = "prometheus";
93 let mut ctx = Arc::into_inner(QueryContext::with(DEFAULT_CATALOG_NAME, db).into()).unwrap();
94
95 if let Some(physical_table) = &physical_table {
97 ctx.set_extension(PHYSICAL_TABLE_PARAM.to_string(), physical_table.clone());
98 }
99 let ctx = Arc::new(ctx);
100
101 assert!(SqlQueryHandler::do_query(
102 instance.as_ref(),
103 "CREATE DATABASE IF NOT EXISTS prometheus",
104 ctx.clone(),
105 )
106 .await
107 .first()
108 .unwrap()
109 .is_ok());
110
111 let (row_inserts, _) = to_grpc_row_insert_requests(&write_request).unwrap();
112 instance
113 .write(row_inserts, ctx.clone(), true)
114 .await
115 .unwrap();
116
117 let read_request = ReadRequest {
118 queries: vec![
119 Query {
120 start_timestamp_ms: 1000,
121 end_timestamp_ms: 2000,
122 matchers: vec![LabelMatcher {
123 name: prom_store::METRIC_NAME_LABEL.to_string(),
124 value: "metric1".to_string(),
125 r#type: 0,
126 }],
127 ..Default::default()
128 },
129 Query {
130 start_timestamp_ms: 1000,
131 end_timestamp_ms: 3000,
132 matchers: vec![
133 LabelMatcher {
134 name: prom_store::METRIC_NAME_LABEL.to_string(),
135 value: "metric3".to_string(),
136 r#type: 0,
137 },
138 LabelMatcher {
139 name: "app".to_string(),
140 value: "biz".to_string(),
141 r#type: MatcherType::Eq as i32,
142 },
143 ],
144 ..Default::default()
145 },
146 ],
147 ..Default::default()
148 };
149
150 let resp = instance.read(read_request, ctx.clone()).await.unwrap();
151 assert_eq!(resp.content_type, "application/x-protobuf");
152 assert_eq!(resp.content_encoding, "snappy");
153 let body = prom_store::snappy_decompress(&resp.body).unwrap();
154 let read_response = ReadResponse::decode(&body[..]).unwrap();
155 let query_results = read_response.results;
156 assert_eq!(2, query_results.len());
157
158 assert_eq!(1, query_results[0].timeseries.len());
159 let timeseries = &query_results[0].timeseries[0];
160
161 assert_eq!(
162 vec![
163 Label {
164 name: prom_store::METRIC_NAME_LABEL.to_string(),
165 value: "metric1".to_string(),
166 },
167 Label {
168 name: "job".to_string(),
169 value: "spark".to_string(),
170 },
171 ],
172 timeseries.labels
173 );
174
175 assert_eq!(
176 timeseries.samples,
177 vec![
178 Sample {
179 value: 1.0,
180 timestamp: 1000,
181 },
182 Sample {
183 value: 2.0,
184 timestamp: 2000,
185 }
186 ]
187 );
188
189 assert_eq!(1, query_results[1].timeseries.len());
190 let timeseries = &query_results[1].timeseries[0];
191
192 assert_eq!(
193 vec![
194 Label {
195 name: prom_store::METRIC_NAME_LABEL.to_string(),
196 value: "metric3".to_string(),
197 },
198 Label {
199 name: "app".to_string(),
200 value: "biz".to_string(),
201 },
202 Label {
203 name: "idc".to_string(),
204 value: "z002".to_string(),
205 },
206 ],
207 timeseries.labels
208 );
209
210 assert_eq!(
211 timeseries.samples,
212 vec![
213 Sample {
214 value: 5.0,
215 timestamp: 1000,
216 },
217 Sample {
218 value: 6.0,
219 timestamp: 2000,
220 },
221 Sample {
222 value: 7.0,
223 timestamp: 3000,
224 }
225 ]
226 );
227
228 if let Some(physical_table) = physical_table {
230 let sql = format!("DESC TABLE {physical_table};");
231 instance.do_query(&sql, ctx).await[0].as_ref().unwrap();
232 }
233 }
234}