common_telemetry/
metric.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use greptime_proto::prometheus::remote::{Sample, TimeSeries};
18use greptime_proto::prometheus::*;
19use prometheus::proto::{LabelPair, MetricFamily, MetricType};
20use prometheus::{Encoder, TextEncoder};
21
22pub fn dump_metrics() -> Result<String, String> {
23    let mut buffer = Vec::new();
24    let encoder = TextEncoder::new();
25    let metric_families = prometheus::gather();
26    encoder
27        .encode(&metric_families, &mut buffer)
28        .map_err(|_| "Encode metrics failed".to_string())?;
29    String::from_utf8(buffer).map_err(|e| e.to_string())
30}
31
32/// `MetricFilter` used in `report_metric_task`.
33/// for metric user don't want collect, return a `false`, else return a `true`
34#[derive(Clone)]
35pub struct MetricFilter {
36    inner: Arc<dyn Fn(&MetricFamily) -> bool + Send + Sync>,
37}
38
39impl MetricFilter {
40    pub fn new(inner: Arc<dyn Fn(&MetricFamily) -> bool + Send + Sync>) -> Self {
41        Self { inner }
42    }
43    pub fn filter(&self, mf: &MetricFamily) -> bool {
44        (self.inner)(mf)
45    }
46}
47
48pub fn convert_metric_to_write_request(
49    metric_families: Vec<MetricFamily>,
50    metric_filter: Option<&MetricFilter>,
51    default_timestamp: i64,
52) -> remote::WriteRequest {
53    let mut timeseries: Vec<TimeSeries> = Vec::with_capacity(metric_families.len());
54    for mf in metric_families {
55        if !metric_filter.map(|f| f.filter(&mf)).unwrap_or(true) {
56            continue;
57        }
58        let mf_type = mf.get_field_type();
59        let mf_name = mf.get_name();
60        for m in mf.get_metric() {
61            let timestamp = if m.get_timestamp_ms() == 0 {
62                default_timestamp
63            } else {
64                m.get_timestamp_ms()
65            };
66            match mf_type {
67                MetricType::COUNTER => timeseries.push(TimeSeries {
68                    labels: convert_label(m.get_label(), mf_name, None),
69                    samples: vec![Sample {
70                        value: m.get_counter().get_value(),
71                        timestamp,
72                    }],
73                    exemplars: vec![],
74                    histograms: vec![],
75                }),
76                MetricType::GAUGE => timeseries.push(TimeSeries {
77                    labels: convert_label(m.get_label(), mf_name, None),
78                    samples: vec![Sample {
79                        value: m.get_gauge().get_value(),
80                        timestamp,
81                    }],
82                    exemplars: vec![],
83                    histograms: vec![],
84                }),
85                MetricType::HISTOGRAM => {
86                    let h = m.get_histogram();
87                    let mut inf_seen = false;
88                    let metric_name = format!("{}_bucket", mf_name);
89                    for b in h.get_bucket() {
90                        let upper_bound = b.get_upper_bound();
91                        timeseries.push(TimeSeries {
92                            labels: convert_label(
93                                m.get_label(),
94                                metric_name.as_str(),
95                                Some(("le", upper_bound.to_string())),
96                            ),
97                            samples: vec![Sample {
98                                value: b.get_cumulative_count() as f64,
99                                timestamp,
100                            }],
101                            exemplars: vec![],
102                            histograms: vec![],
103                        });
104                        if upper_bound.is_sign_positive() && upper_bound.is_infinite() {
105                            inf_seen = true;
106                        }
107                    }
108                    if !inf_seen {
109                        timeseries.push(TimeSeries {
110                            labels: convert_label(
111                                m.get_label(),
112                                metric_name.as_str(),
113                                Some(("le", "+Inf".to_string())),
114                            ),
115                            samples: vec![Sample {
116                                value: h.get_sample_count() as f64,
117                                timestamp,
118                            }],
119                            exemplars: vec![],
120                            histograms: vec![],
121                        });
122                    }
123                    timeseries.push(TimeSeries {
124                        labels: convert_label(
125                            m.get_label(),
126                            format!("{}_sum", mf_name).as_str(),
127                            None,
128                        ),
129                        samples: vec![Sample {
130                            value: h.get_sample_sum(),
131                            timestamp,
132                        }],
133                        exemplars: vec![],
134                        histograms: vec![],
135                    });
136                    timeseries.push(TimeSeries {
137                        labels: convert_label(
138                            m.get_label(),
139                            format!("{}_count", mf_name).as_str(),
140                            None,
141                        ),
142                        samples: vec![Sample {
143                            value: h.get_sample_count() as f64,
144                            timestamp,
145                        }],
146                        exemplars: vec![],
147                        histograms: vec![],
148                    });
149                }
150                MetricType::SUMMARY => {
151                    let s = m.get_summary();
152                    for q in s.get_quantile() {
153                        timeseries.push(TimeSeries {
154                            labels: convert_label(
155                                m.get_label(),
156                                mf_name,
157                                Some(("quantile", q.get_quantile().to_string())),
158                            ),
159                            samples: vec![Sample {
160                                value: q.get_value(),
161                                timestamp,
162                            }],
163                            exemplars: vec![],
164                            histograms: vec![],
165                        });
166                    }
167                    timeseries.push(TimeSeries {
168                        labels: convert_label(
169                            m.get_label(),
170                            format!("{}_sum", mf_name).as_str(),
171                            None,
172                        ),
173                        samples: vec![Sample {
174                            value: s.get_sample_sum(),
175                            timestamp,
176                        }],
177                        exemplars: vec![],
178                        histograms: vec![],
179                    });
180                    timeseries.push(TimeSeries {
181                        labels: convert_label(
182                            m.get_label(),
183                            format!("{}_count", mf_name).as_str(),
184                            None,
185                        ),
186                        samples: vec![Sample {
187                            value: s.get_sample_count() as f64,
188                            timestamp,
189                        }],
190                        exemplars: vec![],
191                        histograms: vec![],
192                    });
193                }
194                MetricType::UNTYPED => {
195                    // `TextEncoder` `MetricType::UNTYPED` unimplemented
196                    // To keep the implementation consistent and not cause unexpected panics, we do nothing here.
197                }
198            };
199        }
200    }
201    remote::WriteRequest {
202        timeseries,
203        metadata: vec![],
204    }
205}
206
207fn convert_label(
208    pairs: &[LabelPair],
209    name: &str,
210    addon: Option<(&'static str, String)>,
211) -> Vec<remote::Label> {
212    let mut labels = Vec::with_capacity(pairs.len() + 1 + if addon.is_some() { 1 } else { 0 });
213    for label in pairs {
214        labels.push(remote::Label {
215            name: label.get_name().to_string(),
216            value: label.get_value().to_string(),
217        });
218    }
219    labels.push(remote::Label {
220        name: "__name__".to_string(),
221        value: name.to_string(),
222    });
223    if let Some(addon) = addon {
224        labels.push(remote::Label {
225            name: addon.0.to_string(),
226            value: addon.1,
227        });
228    }
229    // Remote write protocol need label names sorted in lexicographical order.
230    labels.sort_unstable_by(|a, b| a.name.cmp(&b.name));
231    labels
232}
233
234#[cfg(test)]
235mod test {
236    use std::sync::Arc;
237
238    use prometheus::core::Collector;
239    use prometheus::proto::{LabelPair, MetricFamily, MetricType};
240    use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Opts};
241
242    use super::convert_label;
243    use crate::metric::{MetricFilter, convert_metric_to_write_request};
244
245    #[test]
246    fn test_convert_label() {
247        let pairs = vec![
248            {
249                let mut pair = LabelPair::new();
250                pair.set_name(String::from("a"));
251                pair.set_value(String::from("b"));
252                pair
253            },
254            {
255                let mut pair = LabelPair::new();
256                pair.set_name(String::from("e"));
257                pair.set_value(String::from("g"));
258                pair
259            },
260        ];
261        let label1 = convert_label(&pairs, "label1", None);
262        assert_eq!(
263            format!("{:?}", label1),
264            r#"[Label { name: "__name__", value: "label1" }, Label { name: "a", value: "b" }, Label { name: "e", value: "g" }]"#
265        );
266        let label2 = convert_label(&pairs, "label2", Some(("c", "c".to_string())));
267        assert_eq!(
268            format!("{:?}", label2),
269            r#"[Label { name: "__name__", value: "label2" }, Label { name: "a", value: "b" }, Label { name: "c", value: "c" }, Label { name: "e", value: "g" }]"#
270        );
271    }
272
273    #[test]
274    fn test_write_request_encoder() {
275        let counter_opts = Opts::new("test_counter", "test help")
276            .const_label("a", "1")
277            .const_label("b", "2");
278        let counter = Counter::with_opts(counter_opts).unwrap();
279        counter.inc();
280
281        let mf = counter.collect();
282        let write_quest = convert_metric_to_write_request(mf, None, 0);
283
284        assert_eq!(
285            format!("{:?}", write_quest.timeseries),
286            r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
287        );
288
289        let gauge_opts = Opts::new("test_gauge", "test help")
290            .const_label("a", "1")
291            .const_label("b", "2");
292        let gauge = Gauge::with_opts(gauge_opts).unwrap();
293        gauge.inc();
294        gauge.set(42.0);
295
296        let mf = gauge.collect();
297        let write_quest = convert_metric_to_write_request(mf, None, 0);
298        assert_eq!(
299            format!("{:?}", write_quest.timeseries),
300            r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_gauge" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 42.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
301        );
302    }
303
304    #[test]
305    fn test_write_request_histogram() {
306        let opts = HistogramOpts::new("test_histogram", "test help").const_label("a", "1");
307        let histogram = Histogram::with_opts(opts).unwrap();
308        histogram.observe(0.25);
309
310        let mf = histogram.collect();
311        let write_quest = convert_metric_to_write_request(mf, None, 0);
312        let write_quest_str: Vec<_> = write_quest
313            .timeseries
314            .iter()
315            .map(|x| format!("{:?}", x))
316            .collect();
317        let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.005" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
318TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.01" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
319TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.025" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
320TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.05" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
321TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.1" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
322TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.25" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
323TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
324TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
325TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "2.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
326TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
327TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "10" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
328TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "+Inf" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
329TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_sum" }, Label { name: "a", value: "1" }], samples: [Sample { value: 0.25, timestamp: 0 }], exemplars: [], histograms: [] }
330TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_count" }, Label { name: "a", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }"#;
331        assert_eq!(write_quest_str.join("\n"), ans);
332    }
333
334    #[test]
335    fn test_write_request_summary() {
336        use prometheus::proto::{Metric, Quantile, Summary};
337
338        let mut metric_family = MetricFamily::default();
339        metric_family.set_name("test_summary".to_string());
340        metric_family.set_help("This is a test summary statistic".to_string());
341        metric_family.set_field_type(MetricType::SUMMARY);
342
343        let mut summary = Summary::default();
344        summary.set_sample_count(5.0 as u64);
345        summary.set_sample_sum(15.0);
346
347        let mut quantile1 = Quantile::default();
348        quantile1.set_quantile(50.0);
349        quantile1.set_value(3.0);
350
351        let mut quantile2 = Quantile::default();
352        quantile2.set_quantile(100.0);
353        quantile2.set_value(5.0);
354
355        summary.set_quantile(vec![quantile1, quantile2].into());
356
357        let mut metric = Metric::default();
358        metric.set_summary(summary);
359        metric_family.set_metric(vec![metric].into());
360
361        let write_quest = convert_metric_to_write_request(vec![metric_family], None, 20);
362        let write_quest_str: Vec<_> = write_quest
363            .timeseries
364            .iter()
365            .map(|x| format!("{:?}", x))
366            .collect();
367        let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "50" }], samples: [Sample { value: 3.0, timestamp: 20 }], exemplars: [], histograms: [] }
368TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "100" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [], histograms: [] }
369TimeSeries { labels: [Label { name: "__name__", value: "test_summary_sum" }], samples: [Sample { value: 15.0, timestamp: 20 }], exemplars: [], histograms: [] }
370TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [], histograms: [] }"#;
371        assert_eq!(write_quest_str.join("\n"), ans);
372    }
373
374    #[test]
375    fn test_metric_filter() {
376        let counter_opts = Opts::new("filter_counter", "test help")
377            .const_label("a", "1")
378            .const_label("b", "2");
379        let counter_1 = Counter::with_opts(counter_opts).unwrap();
380        counter_1.inc_by(1.0);
381        let counter_opts = Opts::new("test_counter", "test help")
382            .const_label("a", "1")
383            .const_label("b", "2");
384        let counter_2 = Counter::with_opts(counter_opts).unwrap();
385        counter_2.inc_by(2.0);
386
387        let mut mf = counter_1.collect();
388        mf.append(&mut counter_2.collect());
389
390        let filter = MetricFilter::new(Arc::new(|mf: &MetricFamily| {
391            !mf.get_name().starts_with("filter")
392        }));
393        let write_quest1 = convert_metric_to_write_request(mf.clone(), None, 0);
394        let write_quest2 = convert_metric_to_write_request(mf, Some(&filter), 0);
395        assert_eq!(
396            format!("{:?}", write_quest1.timeseries),
397            r#"[TimeSeries { labels: [Label { name: "__name__", value: "filter_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }, TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
398        );
399        assert_eq!(
400            format!("{:?}", write_quest2.timeseries),
401            r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
402        );
403    }
404}