servers/otlp/
metrics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::{RowInsertRequests, Value};
16use common_grpc::precision::Precision;
17use common_query::prelude::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
18use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
19use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue};
20use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *};
21
22use crate::error::Result;
23use crate::row_writer::{self, MultiTableData, TableData};
24
25/// the default column count for table writer
26const APPROXIMATE_COLUMN_COUNT: usize = 8;
27
28/// Normalize otlp instrumentation, metric and attribute names
29///
30/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-name-syntax>
31/// - since the name are case-insensitive, we transform them to lowercase for
32///   better sql usability
33/// - replace `.` and `-` with `_`
34fn normalize_otlp_name(name: &str) -> String {
35    name.to_lowercase().replace(['.', '-'], "_")
36}
37
38/// Convert OpenTelemetry metrics to GreptimeDB insert requests
39///
40/// See
41/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto>
42/// for data structure of OTLP metrics.
43///
44/// Returns `InsertRequests` and total number of rows to ingest
45pub fn to_grpc_insert_requests(
46    request: ExportMetricsServiceRequest,
47) -> Result<(RowInsertRequests, usize)> {
48    let mut table_writer = MultiTableData::default();
49
50    for resource in &request.resource_metrics {
51        let resource_attrs = resource.resource.as_ref().map(|r| &r.attributes);
52        for scope in &resource.scope_metrics {
53            let scope_attrs = scope.scope.as_ref().map(|s| &s.attributes);
54            for metric in &scope.metrics {
55                encode_metrics(&mut table_writer, metric, resource_attrs, scope_attrs)?;
56            }
57        }
58    }
59
60    Ok(table_writer.into_row_insert_requests())
61}
62
63fn encode_metrics(
64    table_writer: &mut MultiTableData,
65    metric: &Metric,
66    resource_attrs: Option<&Vec<KeyValue>>,
67    scope_attrs: Option<&Vec<KeyValue>>,
68) -> Result<()> {
69    let name = &metric.name;
70    // note that we don't store description or unit, we might want to deal with
71    // these fields in the future.
72    if let Some(data) = &metric.data {
73        match data {
74            metric::Data::Gauge(gauge) => {
75                encode_gauge(table_writer, name, gauge, resource_attrs, scope_attrs)?;
76            }
77            metric::Data::Sum(sum) => {
78                encode_sum(table_writer, name, sum, resource_attrs, scope_attrs)?;
79            }
80            metric::Data::Summary(summary) => {
81                encode_summary(table_writer, name, summary, resource_attrs, scope_attrs)?;
82            }
83            metric::Data::Histogram(hist) => {
84                encode_histogram(table_writer, name, hist, resource_attrs, scope_attrs)?;
85            }
86            // TODO(sunng87) leave ExponentialHistogram for next release
87            metric::Data::ExponentialHistogram(_hist) => {}
88        }
89    }
90
91    Ok(())
92}
93
94fn write_attributes(
95    writer: &mut TableData,
96    row: &mut Vec<Value>,
97    attrs: Option<&Vec<KeyValue>>,
98) -> Result<()> {
99    if let Some(attrs) = attrs {
100        let table_tags = attrs.iter().filter_map(|attr| {
101            if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) {
102                let key = normalize_otlp_name(&attr.key);
103                match val {
104                    any_value::Value::StringValue(s) => Some((key, s.to_string())),
105                    any_value::Value::IntValue(v) => Some((key, v.to_string())),
106                    any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
107                    _ => None, // TODO(sunng87): allow different type of values
108                }
109            } else {
110                None
111            }
112        });
113
114        row_writer::write_tags(writer, table_tags, row)?;
115    }
116    Ok(())
117}
118
119fn write_timestamp(table: &mut TableData, row: &mut Vec<Value>, time_nano: i64) -> Result<()> {
120    row_writer::write_ts_to_nanos(
121        table,
122        GREPTIME_TIMESTAMP,
123        Some(time_nano),
124        Precision::Nanosecond,
125        row,
126    )
127}
128
129fn write_data_point_value(
130    table: &mut TableData,
131    row: &mut Vec<Value>,
132    field: &str,
133    value: &Option<number_data_point::Value>,
134) -> Result<()> {
135    match value {
136        Some(number_data_point::Value::AsInt(val)) => {
137            // we coerce all values to f64
138            row_writer::write_f64(table, field, *val as f64, row)?;
139        }
140        Some(number_data_point::Value::AsDouble(val)) => {
141            row_writer::write_f64(table, field, *val, row)?;
142        }
143        _ => {}
144    }
145    Ok(())
146}
147
148fn write_tags_and_timestamp(
149    table: &mut TableData,
150    row: &mut Vec<Value>,
151    resource_attrs: Option<&Vec<KeyValue>>,
152    scope_attrs: Option<&Vec<KeyValue>>,
153    data_point_attrs: Option<&Vec<KeyValue>>,
154    timestamp_nanos: i64,
155) -> Result<()> {
156    write_attributes(table, row, resource_attrs)?;
157    write_attributes(table, row, scope_attrs)?;
158    write_attributes(table, row, data_point_attrs)?;
159
160    write_timestamp(table, row, timestamp_nanos)?;
161
162    Ok(())
163}
164
165/// encode this gauge metric
166///
167/// note that there can be multiple data points in the request, it's going to be
168/// stored as multiple rows
169fn encode_gauge(
170    table_writer: &mut MultiTableData,
171    name: &str,
172    gauge: &Gauge,
173    resource_attrs: Option<&Vec<KeyValue>>,
174    scope_attrs: Option<&Vec<KeyValue>>,
175) -> Result<()> {
176    let table = table_writer.get_or_default_table_data(
177        normalize_otlp_name(name),
178        APPROXIMATE_COLUMN_COUNT,
179        gauge.data_points.len(),
180    );
181
182    for data_point in &gauge.data_points {
183        let mut row = table.alloc_one_row();
184        write_tags_and_timestamp(
185            table,
186            &mut row,
187            resource_attrs,
188            scope_attrs,
189            Some(data_point.attributes.as_ref()),
190            data_point.time_unix_nano as i64,
191        )?;
192
193        write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
194        table.add_row(row);
195    }
196
197    Ok(())
198}
199
200/// encode this sum metric
201///
202/// `aggregation_temporality` and `monotonic` are ignored for now
203fn encode_sum(
204    table_writer: &mut MultiTableData,
205    name: &str,
206    sum: &Sum,
207    resource_attrs: Option<&Vec<KeyValue>>,
208    scope_attrs: Option<&Vec<KeyValue>>,
209) -> Result<()> {
210    let table = table_writer.get_or_default_table_data(
211        normalize_otlp_name(name),
212        APPROXIMATE_COLUMN_COUNT,
213        sum.data_points.len(),
214    );
215
216    for data_point in &sum.data_points {
217        let mut row = table.alloc_one_row();
218        write_tags_and_timestamp(
219            table,
220            &mut row,
221            resource_attrs,
222            scope_attrs,
223            Some(data_point.attributes.as_ref()),
224            data_point.time_unix_nano as i64,
225        )?;
226        write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
227        table.add_row(row);
228    }
229
230    Ok(())
231}
232
233const HISTOGRAM_LE_COLUMN: &str = "le";
234
235/// Encode histogram data. This function returns 3 insert requests for 3 tables.
236///
237/// The implementation has been following Prometheus histogram table format:
238///
239/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper
240///   limit, and `greptime_value` for bucket count
241/// - A `%metric%_sum` table storing sum of samples
242/// -  A `%metric%_count` table storing count of samples.
243///
244/// By its Prometheus compatibility, we hope to be able to use prometheus
245/// quantile functions on this table.
246fn encode_histogram(
247    table_writer: &mut MultiTableData,
248    name: &str,
249    hist: &Histogram,
250    resource_attrs: Option<&Vec<KeyValue>>,
251    scope_attrs: Option<&Vec<KeyValue>>,
252) -> Result<()> {
253    let normalized_name = normalize_otlp_name(name);
254
255    let bucket_table_name = format!("{}_bucket", normalized_name);
256    let sum_table_name = format!("{}_sum", normalized_name);
257    let count_table_name = format!("{}_count", normalized_name);
258
259    let data_points_len = hist.data_points.len();
260    // Note that the row and columns number here is approximate
261    let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
262    let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
263    let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
264
265    for data_point in &hist.data_points {
266        let mut accumulated_count = 0;
267        for (idx, count) in data_point.bucket_counts.iter().enumerate() {
268            let mut bucket_row = bucket_table.alloc_one_row();
269            write_tags_and_timestamp(
270                &mut bucket_table,
271                &mut bucket_row,
272                resource_attrs,
273                scope_attrs,
274                Some(data_point.attributes.as_ref()),
275                data_point.time_unix_nano as i64,
276            )?;
277
278            if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
279                row_writer::write_tag(
280                    &mut bucket_table,
281                    HISTOGRAM_LE_COLUMN,
282                    upper_bounds,
283                    &mut bucket_row,
284                )?;
285            } else if idx == data_point.explicit_bounds.len() {
286                // The last bucket
287                row_writer::write_tag(
288                    &mut bucket_table,
289                    HISTOGRAM_LE_COLUMN,
290                    f64::INFINITY,
291                    &mut bucket_row,
292                )?;
293            }
294
295            accumulated_count += count;
296            row_writer::write_f64(
297                &mut bucket_table,
298                GREPTIME_VALUE,
299                accumulated_count as f64,
300                &mut bucket_row,
301            )?;
302
303            bucket_table.add_row(bucket_row);
304        }
305
306        if let Some(sum) = data_point.sum {
307            let mut sum_row = sum_table.alloc_one_row();
308            write_tags_and_timestamp(
309                &mut sum_table,
310                &mut sum_row,
311                resource_attrs,
312                scope_attrs,
313                Some(data_point.attributes.as_ref()),
314                data_point.time_unix_nano as i64,
315            )?;
316
317            row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?;
318            sum_table.add_row(sum_row);
319        }
320
321        let mut count_row = count_table.alloc_one_row();
322        write_tags_and_timestamp(
323            &mut count_table,
324            &mut count_row,
325            resource_attrs,
326            scope_attrs,
327            Some(data_point.attributes.as_ref()),
328            data_point.time_unix_nano as i64,
329        )?;
330
331        row_writer::write_f64(
332            &mut count_table,
333            GREPTIME_VALUE,
334            data_point.count as f64,
335            &mut count_row,
336        )?;
337        count_table.add_row(count_row);
338    }
339
340    table_writer.add_table_data(bucket_table_name, bucket_table);
341    table_writer.add_table_data(sum_table_name, sum_table);
342    table_writer.add_table_data(count_table_name, count_table);
343
344    Ok(())
345}
346
347#[allow(dead_code)]
348fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
349    // TODO(sunng87): implement this using a prometheus compatible way
350    Ok(())
351}
352
353fn encode_summary(
354    table_writer: &mut MultiTableData,
355    name: &str,
356    summary: &Summary,
357    resource_attrs: Option<&Vec<KeyValue>>,
358    scope_attrs: Option<&Vec<KeyValue>>,
359) -> Result<()> {
360    let table = table_writer.get_or_default_table_data(
361        normalize_otlp_name(name),
362        APPROXIMATE_COLUMN_COUNT,
363        summary.data_points.len(),
364    );
365
366    for data_point in &summary.data_points {
367        let mut row = table.alloc_one_row();
368        write_tags_and_timestamp(
369            table,
370            &mut row,
371            resource_attrs,
372            scope_attrs,
373            Some(data_point.attributes.as_ref()),
374            data_point.time_unix_nano as i64,
375        )?;
376
377        for quantile in &data_point.quantile_values {
378            row_writer::write_f64(
379                table,
380                format!("greptime_p{:02}", quantile.quantile * 100f64),
381                quantile.value,
382                &mut row,
383            )?;
384        }
385
386        row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
387        table.add_row(row);
388    }
389
390    Ok(())
391}
392
393#[cfg(test)]
394mod tests {
395    use opentelemetry_proto::tonic::common::v1::any_value::Value as Val;
396    use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
397    use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value;
398    use opentelemetry_proto::tonic::metrics::v1::summary_data_point::ValueAtQuantile;
399    use opentelemetry_proto::tonic::metrics::v1::{HistogramDataPoint, NumberDataPoint};
400
401    use super::*;
402
403    #[test]
404    fn test_normalize_otlp_name() {
405        assert_eq!(normalize_otlp_name("jvm.memory.free"), "jvm_memory_free");
406        assert_eq!(normalize_otlp_name("jvm-memory-free"), "jvm_memory_free");
407        assert_eq!(normalize_otlp_name("jvm_memory_free"), "jvm_memory_free");
408        assert_eq!(normalize_otlp_name("JVM_MEMORY_FREE"), "jvm_memory_free");
409        assert_eq!(normalize_otlp_name("JVM_memory_FREE"), "jvm_memory_free");
410    }
411
412    fn keyvalue(key: &str, value: &str) -> KeyValue {
413        KeyValue {
414            key: key.into(),
415            value: Some(AnyValue {
416                value: Some(Val::StringValue(value.into())),
417            }),
418        }
419    }
420
421    #[test]
422    fn test_encode_gauge() {
423        let mut tables = MultiTableData::default();
424
425        let data_points = vec![
426            NumberDataPoint {
427                attributes: vec![keyvalue("host", "testsevrer")],
428                time_unix_nano: 100,
429                value: Some(Value::AsInt(100)),
430                ..Default::default()
431            },
432            NumberDataPoint {
433                attributes: vec![keyvalue("host", "testserver")],
434                time_unix_nano: 105,
435                value: Some(Value::AsInt(105)),
436                ..Default::default()
437            },
438        ];
439        let gauge = Gauge { data_points };
440        encode_gauge(
441            &mut tables,
442            "datamon",
443            &gauge,
444            Some(&vec![keyvalue("resource", "app")]),
445            Some(&vec![keyvalue("scope", "otel")]),
446        )
447        .unwrap();
448
449        let table = tables.get_or_default_table_data("datamon", 0, 0);
450        assert_eq!(table.num_rows(), 2);
451        assert_eq!(table.num_columns(), 5);
452        assert_eq!(
453            table
454                .columns()
455                .iter()
456                .map(|c| &c.column_name)
457                .collect::<Vec<&String>>(),
458            vec![
459                "resource",
460                "scope",
461                "host",
462                "greptime_timestamp",
463                "greptime_value"
464            ]
465        );
466    }
467
468    #[test]
469    fn test_encode_sum() {
470        let mut tables = MultiTableData::default();
471
472        let data_points = vec![
473            NumberDataPoint {
474                attributes: vec![keyvalue("host", "testserver")],
475                time_unix_nano: 100,
476                value: Some(Value::AsInt(100)),
477                ..Default::default()
478            },
479            NumberDataPoint {
480                attributes: vec![keyvalue("host", "testserver")],
481                time_unix_nano: 105,
482                value: Some(Value::AsInt(0)),
483                ..Default::default()
484            },
485        ];
486        let sum = Sum {
487            data_points,
488            ..Default::default()
489        };
490        encode_sum(
491            &mut tables,
492            "datamon",
493            &sum,
494            Some(&vec![keyvalue("resource", "app")]),
495            Some(&vec![keyvalue("scope", "otel")]),
496        )
497        .unwrap();
498
499        let table = tables.get_or_default_table_data("datamon", 0, 0);
500        assert_eq!(table.num_rows(), 2);
501        assert_eq!(table.num_columns(), 5);
502        assert_eq!(
503            table
504                .columns()
505                .iter()
506                .map(|c| &c.column_name)
507                .collect::<Vec<&String>>(),
508            vec![
509                "resource",
510                "scope",
511                "host",
512                "greptime_timestamp",
513                "greptime_value"
514            ]
515        );
516    }
517
518    #[test]
519    fn test_encode_summary() {
520        let mut tables = MultiTableData::default();
521
522        let data_points = vec![SummaryDataPoint {
523            attributes: vec![keyvalue("host", "testserver")],
524            time_unix_nano: 100,
525            count: 25,
526            sum: 5400.0,
527            quantile_values: vec![
528                ValueAtQuantile {
529                    quantile: 0.90,
530                    value: 1000.0,
531                },
532                ValueAtQuantile {
533                    quantile: 0.95,
534                    value: 3030.0,
535                },
536            ],
537            ..Default::default()
538        }];
539        let summary = Summary { data_points };
540        encode_summary(
541            &mut tables,
542            "datamon",
543            &summary,
544            Some(&vec![keyvalue("resource", "app")]),
545            Some(&vec![keyvalue("scope", "otel")]),
546        )
547        .unwrap();
548
549        let table = tables.get_or_default_table_data("datamon", 0, 0);
550        assert_eq!(table.num_rows(), 1);
551        assert_eq!(table.num_columns(), 7);
552        assert_eq!(
553            table
554                .columns()
555                .iter()
556                .map(|c| &c.column_name)
557                .collect::<Vec<&String>>(),
558            vec![
559                "resource",
560                "scope",
561                "host",
562                "greptime_timestamp",
563                "greptime_p90",
564                "greptime_p95",
565                "greptime_count"
566            ]
567        );
568    }
569
570    #[test]
571    fn test_encode_histogram() {
572        let mut tables = MultiTableData::default();
573
574        let data_points = vec![HistogramDataPoint {
575            attributes: vec![keyvalue("host", "testserver")],
576            time_unix_nano: 100,
577            start_time_unix_nano: 23,
578            count: 25,
579            sum: Some(100.),
580            max: Some(200.),
581            min: Some(0.03),
582            bucket_counts: vec![2, 4, 6, 9, 4],
583            explicit_bounds: vec![0.1, 1., 10., 100.],
584            ..Default::default()
585        }];
586
587        let histogram = Histogram {
588            data_points,
589            aggregation_temporality: AggregationTemporality::Delta.into(),
590        };
591        encode_histogram(
592            &mut tables,
593            "histo",
594            &histogram,
595            Some(&vec![keyvalue("resource", "app")]),
596            Some(&vec![keyvalue("scope", "otel")]),
597        )
598        .unwrap();
599
600        assert_eq!(3, tables.num_tables());
601
602        // bucket table
603        let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
604        assert_eq!(bucket_table.num_rows(), 5);
605        assert_eq!(bucket_table.num_columns(), 6);
606        assert_eq!(
607            bucket_table
608                .columns()
609                .iter()
610                .map(|c| &c.column_name)
611                .collect::<Vec<&String>>(),
612            vec![
613                "resource",
614                "scope",
615                "host",
616                "greptime_timestamp",
617                "le",
618                "greptime_value",
619            ]
620        );
621
622        let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
623        assert_eq!(sum_table.num_rows(), 1);
624        assert_eq!(sum_table.num_columns(), 5);
625        assert_eq!(
626            sum_table
627                .columns()
628                .iter()
629                .map(|c| &c.column_name)
630                .collect::<Vec<&String>>(),
631            vec![
632                "resource",
633                "scope",
634                "host",
635                "greptime_timestamp",
636                "greptime_value",
637            ]
638        );
639
640        let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
641        assert_eq!(count_table.num_rows(), 1);
642        assert_eq!(count_table.num_columns(), 5);
643        assert_eq!(
644            count_table
645                .columns()
646                .iter()
647                .map(|c| &c.column_name)
648                .collect::<Vec<&String>>(),
649            vec![
650                "resource",
651                "scope",
652                "host",
653                "greptime_timestamp",
654                "greptime_value",
655            ]
656        );
657    }
658}