common_telemetry/
metric.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use greptime_proto::prometheus::remote::{Sample, TimeSeries};
18use greptime_proto::prometheus::*;
19use prometheus::proto::{LabelPair, MetricFamily, MetricType};
20use prometheus::{Encoder, TextEncoder};
21
22pub fn dump_metrics() -> Result<String, String> {
23    let mut buffer = Vec::new();
24    let encoder = TextEncoder::new();
25    let metric_families = prometheus::gather();
26    encoder
27        .encode(&metric_families, &mut buffer)
28        .map_err(|_| "Encode metrics failed".to_string())?;
29    String::from_utf8(buffer).map_err(|e| e.to_string())
30}
31
32/// `MetricFilter` used in `report_metric_task`.
33/// for metric user don't want collect, return a `false`, else return a `true`
34#[derive(Clone)]
35pub struct MetricFilter {
36    inner: Arc<dyn Fn(&MetricFamily) -> bool + Send + Sync>,
37}
38
39impl MetricFilter {
40    pub fn new(inner: Arc<dyn Fn(&MetricFamily) -> bool + Send + Sync>) -> Self {
41        Self { inner }
42    }
43    pub fn filter(&self, mf: &MetricFamily) -> bool {
44        (self.inner)(mf)
45    }
46}
47
48pub fn convert_metric_to_write_request(
49    metric_families: Vec<MetricFamily>,
50    metric_filter: Option<&MetricFilter>,
51    default_timestamp: i64,
52) -> remote::WriteRequest {
53    let mut timeseries: Vec<TimeSeries> = Vec::with_capacity(metric_families.len());
54    for mf in metric_families {
55        if !metric_filter.map(|f| f.filter(&mf)).unwrap_or(true) {
56            continue;
57        }
58        let mf_type = mf.type_();
59        let mf_name = mf.name();
60        for m in mf.get_metric() {
61            let timestamp = if m.timestamp_ms() == 0 {
62                default_timestamp
63            } else {
64                m.timestamp_ms()
65            };
66            match mf_type {
67                MetricType::COUNTER => timeseries.push(TimeSeries {
68                    labels: convert_label(&m.label, mf_name, None),
69                    samples: vec![Sample {
70                        value: m.counter.value(),
71                        timestamp,
72                    }],
73                    exemplars: vec![],
74                    histograms: vec![],
75                }),
76                MetricType::GAUGE => timeseries.push(TimeSeries {
77                    labels: convert_label(&m.label, mf_name, None),
78                    samples: vec![Sample {
79                        value: m.gauge.value(),
80                        timestamp,
81                    }],
82                    exemplars: vec![],
83                    histograms: vec![],
84                }),
85                MetricType::HISTOGRAM => {
86                    let h = &m.histogram;
87                    let mut inf_seen = false;
88                    let metric_name = format!("{}_bucket", mf_name);
89                    for b in &h.bucket {
90                        let upper_bound = b.upper_bound();
91                        timeseries.push(TimeSeries {
92                            labels: convert_label(
93                                &m.label,
94                                metric_name.as_str(),
95                                Some(("le", upper_bound.to_string())),
96                            ),
97                            samples: vec![Sample {
98                                value: b.cumulative_count() as f64,
99                                timestamp,
100                            }],
101                            exemplars: vec![],
102                            histograms: vec![],
103                        });
104                        if upper_bound.is_sign_positive() && upper_bound.is_infinite() {
105                            inf_seen = true;
106                        }
107                    }
108                    if !inf_seen {
109                        timeseries.push(TimeSeries {
110                            labels: convert_label(
111                                &m.label,
112                                metric_name.as_str(),
113                                Some(("le", "+Inf".to_string())),
114                            ),
115                            samples: vec![Sample {
116                                value: h.sample_count() as f64,
117                                timestamp,
118                            }],
119                            exemplars: vec![],
120                            histograms: vec![],
121                        });
122                    }
123                    timeseries.push(TimeSeries {
124                        labels: convert_label(&m.label, format!("{}_sum", mf_name).as_str(), None),
125                        samples: vec![Sample {
126                            value: h.sample_sum(),
127                            timestamp,
128                        }],
129                        exemplars: vec![],
130                        histograms: vec![],
131                    });
132                    timeseries.push(TimeSeries {
133                        labels: convert_label(
134                            &m.label,
135                            format!("{}_count", mf_name).as_str(),
136                            None,
137                        ),
138                        samples: vec![Sample {
139                            value: h.sample_count() as f64,
140                            timestamp,
141                        }],
142                        exemplars: vec![],
143                        histograms: vec![],
144                    });
145                }
146                MetricType::SUMMARY => {
147                    let s = &m.summary;
148                    for q in &s.quantile {
149                        timeseries.push(TimeSeries {
150                            labels: convert_label(
151                                &m.label,
152                                mf_name,
153                                Some(("quantile", q.quantile().to_string())),
154                            ),
155                            samples: vec![Sample {
156                                value: q.value(),
157                                timestamp,
158                            }],
159                            exemplars: vec![],
160                            histograms: vec![],
161                        });
162                    }
163                    timeseries.push(TimeSeries {
164                        labels: convert_label(&m.label, format!("{}_sum", mf_name).as_str(), None),
165                        samples: vec![Sample {
166                            value: s.sample_sum(),
167                            timestamp,
168                        }],
169                        exemplars: vec![],
170                        histograms: vec![],
171                    });
172                    timeseries.push(TimeSeries {
173                        labels: convert_label(
174                            &m.label,
175                            format!("{}_count", mf_name).as_str(),
176                            None,
177                        ),
178                        samples: vec![Sample {
179                            value: s.sample_count() as f64,
180                            timestamp,
181                        }],
182                        exemplars: vec![],
183                        histograms: vec![],
184                    });
185                }
186                MetricType::UNTYPED => {
187                    // `TextEncoder` `MetricType::UNTYPED` unimplemented
188                    // To keep the implementation consistent and not cause unexpected panics, we do nothing here.
189                }
190            };
191        }
192    }
193    remote::WriteRequest {
194        timeseries,
195        metadata: vec![],
196    }
197}
198
199fn convert_label(
200    pairs: &[LabelPair],
201    name: &str,
202    addon: Option<(&'static str, String)>,
203) -> Vec<remote::Label> {
204    let mut labels = Vec::with_capacity(pairs.len() + 1 + if addon.is_some() { 1 } else { 0 });
205    for label in pairs {
206        labels.push(remote::Label {
207            name: label.name().to_string(),
208            value: label.value().to_string(),
209        });
210    }
211    labels.push(remote::Label {
212        name: "__name__".to_string(),
213        value: name.to_string(),
214    });
215    if let Some(addon) = addon {
216        labels.push(remote::Label {
217            name: addon.0.to_string(),
218            value: addon.1,
219        });
220    }
221    // Remote write protocol need label names sorted in lexicographical order.
222    labels.sort_unstable_by(|a, b| a.name.cmp(&b.name));
223    labels
224}
225
226#[cfg(test)]
227mod test {
228    use std::sync::Arc;
229
230    use prometheus::core::Collector;
231    use prometheus::proto::{LabelPair, MetricFamily, MetricType};
232    use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Opts};
233
234    use super::convert_label;
235    use crate::metric::{MetricFilter, convert_metric_to_write_request};
236
237    #[test]
238    fn test_convert_label() {
239        let pairs = vec![
240            {
241                let mut pair = LabelPair::new();
242                pair.set_name(String::from("a"));
243                pair.set_value(String::from("b"));
244                pair
245            },
246            {
247                let mut pair = LabelPair::new();
248                pair.set_name(String::from("e"));
249                pair.set_value(String::from("g"));
250                pair
251            },
252        ];
253        let label1 = convert_label(&pairs, "label1", None);
254        assert_eq!(
255            format!("{:?}", label1),
256            r#"[Label { name: "__name__", value: "label1" }, Label { name: "a", value: "b" }, Label { name: "e", value: "g" }]"#
257        );
258        let label2 = convert_label(&pairs, "label2", Some(("c", "c".to_string())));
259        assert_eq!(
260            format!("{:?}", label2),
261            r#"[Label { name: "__name__", value: "label2" }, Label { name: "a", value: "b" }, Label { name: "c", value: "c" }, Label { name: "e", value: "g" }]"#
262        );
263    }
264
265    #[test]
266    fn test_write_request_encoder() {
267        let counter_opts = Opts::new("test_counter", "test help")
268            .const_label("a", "1")
269            .const_label("b", "2");
270        let counter = Counter::with_opts(counter_opts).unwrap();
271        counter.inc();
272
273        let mf = counter.collect();
274        let write_quest = convert_metric_to_write_request(mf, None, 0);
275
276        assert_eq!(
277            format!("{:?}", write_quest.timeseries),
278            r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
279        );
280
281        let gauge_opts = Opts::new("test_gauge", "test help")
282            .const_label("a", "1")
283            .const_label("b", "2");
284        let gauge = Gauge::with_opts(gauge_opts).unwrap();
285        gauge.inc();
286        gauge.set(42.0);
287
288        let mf = gauge.collect();
289        let write_quest = convert_metric_to_write_request(mf, None, 0);
290        assert_eq!(
291            format!("{:?}", write_quest.timeseries),
292            r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_gauge" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 42.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
293        );
294    }
295
296    #[test]
297    fn test_write_request_histogram() {
298        let opts = HistogramOpts::new("test_histogram", "test help").const_label("a", "1");
299        let histogram = Histogram::with_opts(opts).unwrap();
300        histogram.observe(0.25);
301
302        let mf = histogram.collect();
303        let write_quest = convert_metric_to_write_request(mf, None, 0);
304        let write_quest_str: Vec<_> = write_quest
305            .timeseries
306            .iter()
307            .map(|x| format!("{:?}", x))
308            .collect();
309        let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.005" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
310TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.01" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
311TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.025" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
312TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.05" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
313TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.1" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
314TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.25" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
315TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
316TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
317TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "2.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
318TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
319TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "10" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
320TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "+Inf" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
321TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_sum" }, Label { name: "a", value: "1" }], samples: [Sample { value: 0.25, timestamp: 0 }], exemplars: [], histograms: [] }
322TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_count" }, Label { name: "a", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }"#;
323        assert_eq!(write_quest_str.join("\n"), ans);
324    }
325
326    #[test]
327    fn test_write_request_summary() {
328        use prometheus::proto::{Metric, Quantile, Summary};
329
330        let mut metric_family = MetricFamily::default();
331        metric_family.set_name("test_summary".to_string());
332        metric_family.set_help("This is a test summary statistic".to_string());
333        metric_family.set_field_type(MetricType::SUMMARY);
334
335        let mut summary = Summary::default();
336        summary.set_sample_count(5.0 as u64);
337        summary.set_sample_sum(15.0);
338
339        let mut quantile1 = Quantile::default();
340        quantile1.set_quantile(50.0);
341        quantile1.set_value(3.0);
342
343        let mut quantile2 = Quantile::default();
344        quantile2.set_quantile(100.0);
345        quantile2.set_value(5.0);
346
347        summary.set_quantile(vec![quantile1, quantile2]);
348
349        let mut metric = Metric::default();
350        metric.set_summary(summary);
351        metric_family.set_metric(vec![metric]);
352
353        let write_quest = convert_metric_to_write_request(vec![metric_family], None, 20);
354        let write_quest_str: Vec<_> = write_quest
355            .timeseries
356            .iter()
357            .map(|x| format!("{:?}", x))
358            .collect();
359        let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "50" }], samples: [Sample { value: 3.0, timestamp: 20 }], exemplars: [], histograms: [] }
360TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "100" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [], histograms: [] }
361TimeSeries { labels: [Label { name: "__name__", value: "test_summary_sum" }], samples: [Sample { value: 15.0, timestamp: 20 }], exemplars: [], histograms: [] }
362TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [], histograms: [] }"#;
363        assert_eq!(write_quest_str.join("\n"), ans);
364    }
365
366    #[test]
367    fn test_metric_filter() {
368        let counter_opts = Opts::new("filter_counter", "test help")
369            .const_label("a", "1")
370            .const_label("b", "2");
371        let counter_1 = Counter::with_opts(counter_opts).unwrap();
372        counter_1.inc_by(1.0);
373        let counter_opts = Opts::new("test_counter", "test help")
374            .const_label("a", "1")
375            .const_label("b", "2");
376        let counter_2 = Counter::with_opts(counter_opts).unwrap();
377        counter_2.inc_by(2.0);
378
379        let mut mf = counter_1.collect();
380        mf.append(&mut counter_2.collect());
381
382        let filter = MetricFilter::new(Arc::new(|mf: &MetricFamily| {
383            !mf.name().starts_with("filter")
384        }));
385        let write_quest1 = convert_metric_to_write_request(mf.clone(), None, 0);
386        let write_quest2 = convert_metric_to_write_request(mf, Some(&filter), 0);
387        assert_eq!(
388            format!("{:?}", write_quest1.timeseries),
389            r#"[TimeSeries { labels: [Label { name: "__name__", value: "filter_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }, TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
390        );
391        assert_eq!(
392            format!("{:?}", write_quest2.timeseries),
393            r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
394        );
395    }
396}