1use std::sync::Arc;
16
17use greptime_proto::prometheus::remote::{Sample, TimeSeries};
18use greptime_proto::prometheus::*;
19use prometheus::proto::{LabelPair, MetricFamily, MetricType};
20use prometheus::{Encoder, TextEncoder};
21
22pub fn dump_metrics() -> Result<String, String> {
23 let mut buffer = Vec::new();
24 let encoder = TextEncoder::new();
25 let metric_families = prometheus::gather();
26 encoder
27 .encode(&metric_families, &mut buffer)
28 .map_err(|_| "Encode metrics failed".to_string())?;
29 String::from_utf8(buffer).map_err(|e| e.to_string())
30}
31
32#[derive(Clone)]
35pub struct MetricFilter {
36 inner: Arc<dyn Fn(&MetricFamily) -> bool + Send + Sync>,
37}
38
39impl MetricFilter {
40 pub fn new(inner: Arc<dyn Fn(&MetricFamily) -> bool + Send + Sync>) -> Self {
41 Self { inner }
42 }
43 pub fn filter(&self, mf: &MetricFamily) -> bool {
44 (self.inner)(mf)
45 }
46}
47
48pub fn convert_metric_to_write_request(
49 metric_families: Vec<MetricFamily>,
50 metric_filter: Option<&MetricFilter>,
51 default_timestamp: i64,
52) -> remote::WriteRequest {
53 let mut timeseries: Vec<TimeSeries> = Vec::with_capacity(metric_families.len());
54 for mf in metric_families {
55 if !metric_filter.map(|f| f.filter(&mf)).unwrap_or(true) {
56 continue;
57 }
58 let mf_type = mf.get_field_type();
59 let mf_name = mf.get_name();
60 for m in mf.get_metric() {
61 let timestamp = if m.get_timestamp_ms() == 0 {
62 default_timestamp
63 } else {
64 m.get_timestamp_ms()
65 };
66 match mf_type {
67 MetricType::COUNTER => timeseries.push(TimeSeries {
68 labels: convert_label(m.get_label(), mf_name, None),
69 samples: vec![Sample {
70 value: m.get_counter().get_value(),
71 timestamp,
72 }],
73 exemplars: vec![],
74 }),
75 MetricType::GAUGE => timeseries.push(TimeSeries {
76 labels: convert_label(m.get_label(), mf_name, None),
77 samples: vec![Sample {
78 value: m.get_gauge().get_value(),
79 timestamp,
80 }],
81 exemplars: vec![],
82 }),
83 MetricType::HISTOGRAM => {
84 let h = m.get_histogram();
85 let mut inf_seen = false;
86 let metric_name = format!("{}_bucket", mf_name);
87 for b in h.get_bucket() {
88 let upper_bound = b.get_upper_bound();
89 timeseries.push(TimeSeries {
90 labels: convert_label(
91 m.get_label(),
92 metric_name.as_str(),
93 Some(("le", upper_bound.to_string())),
94 ),
95 samples: vec![Sample {
96 value: b.get_cumulative_count() as f64,
97 timestamp,
98 }],
99 exemplars: vec![],
100 });
101 if upper_bound.is_sign_positive() && upper_bound.is_infinite() {
102 inf_seen = true;
103 }
104 }
105 if !inf_seen {
106 timeseries.push(TimeSeries {
107 labels: convert_label(
108 m.get_label(),
109 metric_name.as_str(),
110 Some(("le", "+Inf".to_string())),
111 ),
112 samples: vec![Sample {
113 value: h.get_sample_count() as f64,
114 timestamp,
115 }],
116 exemplars: vec![],
117 });
118 }
119 timeseries.push(TimeSeries {
120 labels: convert_label(
121 m.get_label(),
122 format!("{}_sum", mf_name).as_str(),
123 None,
124 ),
125 samples: vec![Sample {
126 value: h.get_sample_sum(),
127 timestamp,
128 }],
129 exemplars: vec![],
130 });
131 timeseries.push(TimeSeries {
132 labels: convert_label(
133 m.get_label(),
134 format!("{}_count", mf_name).as_str(),
135 None,
136 ),
137 samples: vec![Sample {
138 value: h.get_sample_count() as f64,
139 timestamp,
140 }],
141 exemplars: vec![],
142 });
143 }
144 MetricType::SUMMARY => {
145 let s = m.get_summary();
146 for q in s.get_quantile() {
147 timeseries.push(TimeSeries {
148 labels: convert_label(
149 m.get_label(),
150 mf_name,
151 Some(("quantile", q.get_quantile().to_string())),
152 ),
153 samples: vec![Sample {
154 value: q.get_value(),
155 timestamp,
156 }],
157 exemplars: vec![],
158 });
159 }
160 timeseries.push(TimeSeries {
161 labels: convert_label(
162 m.get_label(),
163 format!("{}_sum", mf_name).as_str(),
164 None,
165 ),
166 samples: vec![Sample {
167 value: s.get_sample_sum(),
168 timestamp,
169 }],
170 exemplars: vec![],
171 });
172 timeseries.push(TimeSeries {
173 labels: convert_label(
174 m.get_label(),
175 format!("{}_count", mf_name).as_str(),
176 None,
177 ),
178 samples: vec![Sample {
179 value: s.get_sample_count() as f64,
180 timestamp,
181 }],
182 exemplars: vec![],
183 });
184 }
185 MetricType::UNTYPED => {
186 }
189 };
190 }
191 }
192 remote::WriteRequest {
193 timeseries,
194 metadata: vec![],
195 }
196}
197
198fn convert_label(
199 pairs: &[LabelPair],
200 name: &str,
201 addon: Option<(&'static str, String)>,
202) -> Vec<remote::Label> {
203 let mut labels = Vec::with_capacity(pairs.len() + 1 + if addon.is_some() { 1 } else { 0 });
204 for label in pairs {
205 labels.push(remote::Label {
206 name: label.get_name().to_string(),
207 value: label.get_value().to_string(),
208 });
209 }
210 labels.push(remote::Label {
211 name: "__name__".to_string(),
212 value: name.to_string(),
213 });
214 if let Some(addon) = addon {
215 labels.push(remote::Label {
216 name: addon.0.to_string(),
217 value: addon.1,
218 });
219 }
220 labels.sort_unstable_by(|a, b| a.name.cmp(&b.name));
222 labels
223}
224
225#[cfg(test)]
226mod test {
227 use std::sync::Arc;
228
229 use prometheus::core::Collector;
230 use prometheus::proto::{LabelPair, MetricFamily, MetricType};
231 use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Opts};
232
233 use super::convert_label;
234 use crate::metric::{convert_metric_to_write_request, MetricFilter};
235
236 #[test]
237 fn test_convert_label() {
238 let pairs = vec![
239 {
240 let mut pair = LabelPair::new();
241 pair.set_name(String::from("a"));
242 pair.set_value(String::from("b"));
243 pair
244 },
245 {
246 let mut pair = LabelPair::new();
247 pair.set_name(String::from("e"));
248 pair.set_value(String::from("g"));
249 pair
250 },
251 ];
252 let label1 = convert_label(&pairs, "label1", None);
253 assert_eq!(
254 format!("{:?}", label1),
255 r#"[Label { name: "__name__", value: "label1" }, Label { name: "a", value: "b" }, Label { name: "e", value: "g" }]"#
256 );
257 let label2 = convert_label(&pairs, "label2", Some(("c", "c".to_string())));
258 assert_eq!(
259 format!("{:?}", label2),
260 r#"[Label { name: "__name__", value: "label2" }, Label { name: "a", value: "b" }, Label { name: "c", value: "c" }, Label { name: "e", value: "g" }]"#
261 );
262 }
263
264 #[test]
265 fn test_write_request_encoder() {
266 let counter_opts = Opts::new("test_counter", "test help")
267 .const_label("a", "1")
268 .const_label("b", "2");
269 let counter = Counter::with_opts(counter_opts).unwrap();
270 counter.inc();
271
272 let mf = counter.collect();
273 let write_quest = convert_metric_to_write_request(mf, None, 0);
274
275 assert_eq!(
276 format!("{:?}", write_quest.timeseries),
277 r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }]"#
278 );
279
280 let gauge_opts = Opts::new("test_gauge", "test help")
281 .const_label("a", "1")
282 .const_label("b", "2");
283 let gauge = Gauge::with_opts(gauge_opts).unwrap();
284 gauge.inc();
285 gauge.set(42.0);
286
287 let mf = gauge.collect();
288 let write_quest = convert_metric_to_write_request(mf, None, 0);
289 assert_eq!(
290 format!("{:?}", write_quest.timeseries),
291 r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_gauge" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 42.0, timestamp: 0 }], exemplars: [] }]"#
292 );
293 }
294
295 #[test]
296 fn test_write_request_histogram() {
297 let opts = HistogramOpts::new("test_histogram", "test help").const_label("a", "1");
298 let histogram = Histogram::with_opts(opts).unwrap();
299 histogram.observe(0.25);
300
301 let mf = histogram.collect();
302 let write_quest = convert_metric_to_write_request(mf, None, 0);
303 let write_quest_str: Vec<_> = write_quest
304 .timeseries
305 .iter()
306 .map(|x| format!("{:?}", x))
307 .collect();
308 let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.005" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
309TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.01" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
310TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.025" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
311TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.05" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
312TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.1" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
313TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.25" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
314TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
315TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
316TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "2.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
317TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
318TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "10" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
319TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "+Inf" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
320TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_sum" }, Label { name: "a", value: "1" }], samples: [Sample { value: 0.25, timestamp: 0 }], exemplars: [] }
321TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_count" }, Label { name: "a", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }"#;
322 assert_eq!(write_quest_str.join("\n"), ans);
323 }
324
325 #[test]
326 fn test_write_request_summary() {
327 use prometheus::proto::{Metric, Quantile, Summary};
328
329 let mut metric_family = MetricFamily::default();
330 metric_family.set_name("test_summary".to_string());
331 metric_family.set_help("This is a test summary statistic".to_string());
332 metric_family.set_field_type(MetricType::SUMMARY);
333
334 let mut summary = Summary::default();
335 summary.set_sample_count(5.0 as u64);
336 summary.set_sample_sum(15.0);
337
338 let mut quantile1 = Quantile::default();
339 quantile1.set_quantile(50.0);
340 quantile1.set_value(3.0);
341
342 let mut quantile2 = Quantile::default();
343 quantile2.set_quantile(100.0);
344 quantile2.set_value(5.0);
345
346 summary.set_quantile(vec![quantile1, quantile2].into());
347
348 let mut metric = Metric::default();
349 metric.set_summary(summary);
350 metric_family.set_metric(vec![metric].into());
351
352 let write_quest = convert_metric_to_write_request(vec![metric_family], None, 20);
353 let write_quest_str: Vec<_> = write_quest
354 .timeseries
355 .iter()
356 .map(|x| format!("{:?}", x))
357 .collect();
358 let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "50" }], samples: [Sample { value: 3.0, timestamp: 20 }], exemplars: [] }
359TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "100" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] }
360TimeSeries { labels: [Label { name: "__name__", value: "test_summary_sum" }], samples: [Sample { value: 15.0, timestamp: 20 }], exemplars: [] }
361TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] }"#;
362 assert_eq!(write_quest_str.join("\n"), ans);
363 }
364
365 #[test]
366 fn test_metric_filter() {
367 let counter_opts = Opts::new("filter_counter", "test help")
368 .const_label("a", "1")
369 .const_label("b", "2");
370 let counter_1 = Counter::with_opts(counter_opts).unwrap();
371 counter_1.inc_by(1.0);
372 let counter_opts = Opts::new("test_counter", "test help")
373 .const_label("a", "1")
374 .const_label("b", "2");
375 let counter_2 = Counter::with_opts(counter_opts).unwrap();
376 counter_2.inc_by(2.0);
377
378 let mut mf = counter_1.collect();
379 mf.append(&mut counter_2.collect());
380
381 let filter = MetricFilter::new(Arc::new(|mf: &MetricFamily| {
382 !mf.get_name().starts_with("filter")
383 }));
384 let write_quest1 = convert_metric_to_write_request(mf.clone(), None, 0);
385 let write_quest2 = convert_metric_to_write_request(mf, Some(&filter), 0);
386 assert_eq!(
387 format!("{:?}", write_quest1.timeseries),
388 r#"[TimeSeries { labels: [Label { name: "__name__", value: "filter_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }, TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"#
389 );
390 assert_eq!(
391 format!("{:?}", write_quest2.timeseries),
392 r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"#
393 );
394 }
395}