common_telemetry/
metric.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use greptime_proto::prometheus::remote::{Sample, TimeSeries};
18use greptime_proto::prometheus::*;
19use prometheus::proto::{LabelPair, MetricFamily, MetricType};
20use prometheus::{Encoder, TextEncoder};
21
22pub fn dump_metrics() -> Result<String, String> {
23    let mut buffer = Vec::new();
24    let encoder = TextEncoder::new();
25    let metric_families = prometheus::gather();
26    encoder
27        .encode(&metric_families, &mut buffer)
28        .map_err(|_| "Encode metrics failed".to_string())?;
29    String::from_utf8(buffer).map_err(|e| e.to_string())
30}
31
32/// `MetricFilter` used in `report_metric_task`.
33/// for metric user don't want collect, return a `false`, else return a `true`
34#[derive(Clone)]
35pub struct MetricFilter {
36    inner: Arc<dyn Fn(&MetricFamily) -> bool + Send + Sync>,
37}
38
39impl MetricFilter {
40    pub fn new(inner: Arc<dyn Fn(&MetricFamily) -> bool + Send + Sync>) -> Self {
41        Self { inner }
42    }
43    pub fn filter(&self, mf: &MetricFamily) -> bool {
44        (self.inner)(mf)
45    }
46}
47
48pub fn convert_metric_to_write_request(
49    metric_families: Vec<MetricFamily>,
50    metric_filter: Option<&MetricFilter>,
51    default_timestamp: i64,
52) -> remote::WriteRequest {
53    let mut timeseries: Vec<TimeSeries> = Vec::with_capacity(metric_families.len());
54    for mf in metric_families {
55        if !metric_filter.map(|f| f.filter(&mf)).unwrap_or(true) {
56            continue;
57        }
58        let mf_type = mf.get_field_type();
59        let mf_name = mf.get_name();
60        for m in mf.get_metric() {
61            let timestamp = if m.get_timestamp_ms() == 0 {
62                default_timestamp
63            } else {
64                m.get_timestamp_ms()
65            };
66            match mf_type {
67                MetricType::COUNTER => timeseries.push(TimeSeries {
68                    labels: convert_label(m.get_label(), mf_name, None),
69                    samples: vec![Sample {
70                        value: m.get_counter().get_value(),
71                        timestamp,
72                    }],
73                    exemplars: vec![],
74                }),
75                MetricType::GAUGE => timeseries.push(TimeSeries {
76                    labels: convert_label(m.get_label(), mf_name, None),
77                    samples: vec![Sample {
78                        value: m.get_gauge().get_value(),
79                        timestamp,
80                    }],
81                    exemplars: vec![],
82                }),
83                MetricType::HISTOGRAM => {
84                    let h = m.get_histogram();
85                    let mut inf_seen = false;
86                    let metric_name = format!("{}_bucket", mf_name);
87                    for b in h.get_bucket() {
88                        let upper_bound = b.get_upper_bound();
89                        timeseries.push(TimeSeries {
90                            labels: convert_label(
91                                m.get_label(),
92                                metric_name.as_str(),
93                                Some(("le", upper_bound.to_string())),
94                            ),
95                            samples: vec![Sample {
96                                value: b.get_cumulative_count() as f64,
97                                timestamp,
98                            }],
99                            exemplars: vec![],
100                        });
101                        if upper_bound.is_sign_positive() && upper_bound.is_infinite() {
102                            inf_seen = true;
103                        }
104                    }
105                    if !inf_seen {
106                        timeseries.push(TimeSeries {
107                            labels: convert_label(
108                                m.get_label(),
109                                metric_name.as_str(),
110                                Some(("le", "+Inf".to_string())),
111                            ),
112                            samples: vec![Sample {
113                                value: h.get_sample_count() as f64,
114                                timestamp,
115                            }],
116                            exemplars: vec![],
117                        });
118                    }
119                    timeseries.push(TimeSeries {
120                        labels: convert_label(
121                            m.get_label(),
122                            format!("{}_sum", mf_name).as_str(),
123                            None,
124                        ),
125                        samples: vec![Sample {
126                            value: h.get_sample_sum(),
127                            timestamp,
128                        }],
129                        exemplars: vec![],
130                    });
131                    timeseries.push(TimeSeries {
132                        labels: convert_label(
133                            m.get_label(),
134                            format!("{}_count", mf_name).as_str(),
135                            None,
136                        ),
137                        samples: vec![Sample {
138                            value: h.get_sample_count() as f64,
139                            timestamp,
140                        }],
141                        exemplars: vec![],
142                    });
143                }
144                MetricType::SUMMARY => {
145                    let s = m.get_summary();
146                    for q in s.get_quantile() {
147                        timeseries.push(TimeSeries {
148                            labels: convert_label(
149                                m.get_label(),
150                                mf_name,
151                                Some(("quantile", q.get_quantile().to_string())),
152                            ),
153                            samples: vec![Sample {
154                                value: q.get_value(),
155                                timestamp,
156                            }],
157                            exemplars: vec![],
158                        });
159                    }
160                    timeseries.push(TimeSeries {
161                        labels: convert_label(
162                            m.get_label(),
163                            format!("{}_sum", mf_name).as_str(),
164                            None,
165                        ),
166                        samples: vec![Sample {
167                            value: s.get_sample_sum(),
168                            timestamp,
169                        }],
170                        exemplars: vec![],
171                    });
172                    timeseries.push(TimeSeries {
173                        labels: convert_label(
174                            m.get_label(),
175                            format!("{}_count", mf_name).as_str(),
176                            None,
177                        ),
178                        samples: vec![Sample {
179                            value: s.get_sample_count() as f64,
180                            timestamp,
181                        }],
182                        exemplars: vec![],
183                    });
184                }
185                MetricType::UNTYPED => {
186                    // `TextEncoder` `MetricType::UNTYPED` unimplemented
187                    // To keep the implementation consistent and not cause unexpected panics, we do nothing here.
188                }
189            };
190        }
191    }
192    remote::WriteRequest {
193        timeseries,
194        metadata: vec![],
195    }
196}
197
198fn convert_label(
199    pairs: &[LabelPair],
200    name: &str,
201    addon: Option<(&'static str, String)>,
202) -> Vec<remote::Label> {
203    let mut labels = Vec::with_capacity(pairs.len() + 1 + if addon.is_some() { 1 } else { 0 });
204    for label in pairs {
205        labels.push(remote::Label {
206            name: label.get_name().to_string(),
207            value: label.get_value().to_string(),
208        });
209    }
210    labels.push(remote::Label {
211        name: "__name__".to_string(),
212        value: name.to_string(),
213    });
214    if let Some(addon) = addon {
215        labels.push(remote::Label {
216            name: addon.0.to_string(),
217            value: addon.1,
218        });
219    }
220    // Remote write protocol need label names sorted in lexicographical order.
221    labels.sort_unstable_by(|a, b| a.name.cmp(&b.name));
222    labels
223}
224
225#[cfg(test)]
226mod test {
227    use std::sync::Arc;
228
229    use prometheus::core::Collector;
230    use prometheus::proto::{LabelPair, MetricFamily, MetricType};
231    use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Opts};
232
233    use super::convert_label;
234    use crate::metric::{convert_metric_to_write_request, MetricFilter};
235
236    #[test]
237    fn test_convert_label() {
238        let pairs = vec![
239            {
240                let mut pair = LabelPair::new();
241                pair.set_name(String::from("a"));
242                pair.set_value(String::from("b"));
243                pair
244            },
245            {
246                let mut pair = LabelPair::new();
247                pair.set_name(String::from("e"));
248                pair.set_value(String::from("g"));
249                pair
250            },
251        ];
252        let label1 = convert_label(&pairs, "label1", None);
253        assert_eq!(
254            format!("{:?}", label1),
255            r#"[Label { name: "__name__", value: "label1" }, Label { name: "a", value: "b" }, Label { name: "e", value: "g" }]"#
256        );
257        let label2 = convert_label(&pairs, "label2", Some(("c", "c".to_string())));
258        assert_eq!(
259            format!("{:?}", label2),
260            r#"[Label { name: "__name__", value: "label2" }, Label { name: "a", value: "b" }, Label { name: "c", value: "c" }, Label { name: "e", value: "g" }]"#
261        );
262    }
263
264    #[test]
265    fn test_write_request_encoder() {
266        let counter_opts = Opts::new("test_counter", "test help")
267            .const_label("a", "1")
268            .const_label("b", "2");
269        let counter = Counter::with_opts(counter_opts).unwrap();
270        counter.inc();
271
272        let mf = counter.collect();
273        let write_quest = convert_metric_to_write_request(mf, None, 0);
274
275        assert_eq!(
276            format!("{:?}", write_quest.timeseries),
277            r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }]"#
278        );
279
280        let gauge_opts = Opts::new("test_gauge", "test help")
281            .const_label("a", "1")
282            .const_label("b", "2");
283        let gauge = Gauge::with_opts(gauge_opts).unwrap();
284        gauge.inc();
285        gauge.set(42.0);
286
287        let mf = gauge.collect();
288        let write_quest = convert_metric_to_write_request(mf, None, 0);
289        assert_eq!(
290            format!("{:?}", write_quest.timeseries),
291            r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_gauge" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 42.0, timestamp: 0 }], exemplars: [] }]"#
292        );
293    }
294
295    #[test]
296    fn test_write_request_histogram() {
297        let opts = HistogramOpts::new("test_histogram", "test help").const_label("a", "1");
298        let histogram = Histogram::with_opts(opts).unwrap();
299        histogram.observe(0.25);
300
301        let mf = histogram.collect();
302        let write_quest = convert_metric_to_write_request(mf, None, 0);
303        let write_quest_str: Vec<_> = write_quest
304            .timeseries
305            .iter()
306            .map(|x| format!("{:?}", x))
307            .collect();
308        let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.005" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
309TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.01" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
310TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.025" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
311TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.05" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
312TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.1" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
313TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.25" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
314TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
315TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
316TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "2.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
317TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
318TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "10" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
319TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "+Inf" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
320TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_sum" }, Label { name: "a", value: "1" }], samples: [Sample { value: 0.25, timestamp: 0 }], exemplars: [] }
321TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_count" }, Label { name: "a", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }"#;
322        assert_eq!(write_quest_str.join("\n"), ans);
323    }
324
325    #[test]
326    fn test_write_request_summary() {
327        use prometheus::proto::{Metric, Quantile, Summary};
328
329        let mut metric_family = MetricFamily::default();
330        metric_family.set_name("test_summary".to_string());
331        metric_family.set_help("This is a test summary statistic".to_string());
332        metric_family.set_field_type(MetricType::SUMMARY);
333
334        let mut summary = Summary::default();
335        summary.set_sample_count(5.0 as u64);
336        summary.set_sample_sum(15.0);
337
338        let mut quantile1 = Quantile::default();
339        quantile1.set_quantile(50.0);
340        quantile1.set_value(3.0);
341
342        let mut quantile2 = Quantile::default();
343        quantile2.set_quantile(100.0);
344        quantile2.set_value(5.0);
345
346        summary.set_quantile(vec![quantile1, quantile2].into());
347
348        let mut metric = Metric::default();
349        metric.set_summary(summary);
350        metric_family.set_metric(vec![metric].into());
351
352        let write_quest = convert_metric_to_write_request(vec![metric_family], None, 20);
353        let write_quest_str: Vec<_> = write_quest
354            .timeseries
355            .iter()
356            .map(|x| format!("{:?}", x))
357            .collect();
358        let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "50" }], samples: [Sample { value: 3.0, timestamp: 20 }], exemplars: [] }
359TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "100" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] }
360TimeSeries { labels: [Label { name: "__name__", value: "test_summary_sum" }], samples: [Sample { value: 15.0, timestamp: 20 }], exemplars: [] }
361TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] }"#;
362        assert_eq!(write_quest_str.join("\n"), ans);
363    }
364
365    #[test]
366    fn test_metric_filter() {
367        let counter_opts = Opts::new("filter_counter", "test help")
368            .const_label("a", "1")
369            .const_label("b", "2");
370        let counter_1 = Counter::with_opts(counter_opts).unwrap();
371        counter_1.inc_by(1.0);
372        let counter_opts = Opts::new("test_counter", "test help")
373            .const_label("a", "1")
374            .const_label("b", "2");
375        let counter_2 = Counter::with_opts(counter_opts).unwrap();
376        counter_2.inc_by(2.0);
377
378        let mut mf = counter_1.collect();
379        mf.append(&mut counter_2.collect());
380
381        let filter = MetricFilter::new(Arc::new(|mf: &MetricFamily| {
382            !mf.get_name().starts_with("filter")
383        }));
384        let write_quest1 = convert_metric_to_write_request(mf.clone(), None, 0);
385        let write_quest2 = convert_metric_to_write_request(mf, Some(&filter), 0);
386        assert_eq!(
387            format!("{:?}", write_quest1.timeseries),
388            r#"[TimeSeries { labels: [Label { name: "__name__", value: "filter_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }, TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"#
389        );
390        assert_eq!(
391            format!("{:?}", write_quest2.timeseries),
392            r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"#
393        );
394    }
395}