servers/otlp/
metrics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use ahash::{HashMap, HashSet};
16use api::v1::{RowInsertRequests, Value};
17use common_grpc::precision::Precision;
18use common_query::prelude::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
19use itertools::Itertools;
20use lazy_static::lazy_static;
21use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
22use otel_arrow_rust::proto::opentelemetry::common::v1::{any_value, AnyValue, KeyValue};
23use otel_arrow_rust::proto::opentelemetry::metrics::v1::{metric, number_data_point, *};
24use regex::Regex;
25use session::protocol_ctx::{MetricType, OtlpMetricCtx};
26
27use crate::error::Result;
28use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME};
29use crate::row_writer::{self, MultiTableData, TableData};
30
31/// the default column count for table writer
32const APPROXIMATE_COLUMN_COUNT: usize = 8;
33
34const COUNT_TABLE_SUFFIX: &str = "_count";
35const SUM_TABLE_SUFFIX: &str = "_sum";
36
37const JOB_KEY: &str = "job";
38const INSTANCE_KEY: &str = "instance";
39
40const UNDERSCORE: &str = "_";
41const DOUBLE_UNDERSCORE: &str = "__";
42const TOTAL: &str = "total";
43const RATIO: &str = "ratio";
44
45// see: https://prometheus.io/docs/guides/opentelemetry/#promoting-resource-attributes
46const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [
47    "service.instance.id",
48    "service.name",
49    "service.namespace",
50    "service.version",
51    "cloud.availability_zone",
52    "cloud.region",
53    "container.name",
54    "deployment.environment",
55    "deployment.environment.name",
56    "k8s.cluster.name",
57    "k8s.container.name",
58    "k8s.cronjob.name",
59    "k8s.daemonset.name",
60    "k8s.deployment.name",
61    "k8s.job.name",
62    "k8s.namespace.name",
63    "k8s.pod.name",
64    "k8s.replicaset.name",
65    "k8s.statefulset.name",
66];
67
68lazy_static! {
69    static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet<String> =
70        HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string()));
71    static ref NON_ALPHA_NUM_CHAR: Regex = Regex::new(r"[^a-zA-Z0-9]").unwrap();
72    static ref UNIT_MAP: HashMap<String, String> = [
73        // Time
74        ("d", "days"),
75        ("h", "hours"),
76        ("min", "minutes"),
77        ("s", "seconds"),
78        ("ms", "milliseconds"),
79        ("us", "microseconds"),
80        ("ns", "nanoseconds"),
81        // Bytes
82        ("By", "bytes"),
83        ("KiBy", "kibibytes"),
84        ("MiBy", "mebibytes"),
85        ("GiBy", "gibibytes"),
86        ("TiBy", "tibibytes"),
87        ("KBy", "kilobytes"),
88        ("MBy", "megabytes"),
89        ("GBy", "gigabytes"),
90        ("TBy", "terabytes"),
91        // SI
92        ("m", "meters"),
93        ("V", "volts"),
94        ("A", "amperes"),
95        ("J", "joules"),
96        ("W", "watts"),
97        ("g", "grams"),
98        // Misc
99        ("Cel", "celsius"),
100        ("Hz", "hertz"),
101        ("1", ""),
102        ("%", "percent"),
103    ].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect();
104    static ref PER_UNIT_MAP: HashMap<String, String> = [
105        ("s", "second"),
106        ("m", "minute"),
107        ("h", "hour"),
108        ("d", "day"),
109        ("w", "week"),
110        ("mo", "month"),
111        ("y", "year"),
112    ].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect();
113}
114
115const OTEL_SCOPE_NAME: &str = "name";
116const OTEL_SCOPE_VERSION: &str = "version";
117const OTEL_SCOPE_SCHEMA_URL: &str = "schema_url";
118
119/// Convert OpenTelemetry metrics to GreptimeDB insert requests
120///
121/// See
122/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto>
123/// for data structure of OTLP metrics.
124///
125/// Returns `InsertRequests` and total number of rows to ingest
126pub fn to_grpc_insert_requests(
127    request: ExportMetricsServiceRequest,
128    metric_ctx: &mut OtlpMetricCtx,
129) -> Result<(RowInsertRequests, usize)> {
130    let mut table_writer = MultiTableData::default();
131
132    for resource in &request.resource_metrics {
133        let resource_attrs = resource.resource.as_ref().map(|r| {
134            let mut attrs = r.attributes.clone();
135            process_resource_attrs(&mut attrs, metric_ctx);
136            attrs
137        });
138
139        for scope in &resource.scope_metrics {
140            let scope_attrs = process_scope_attrs(scope, metric_ctx);
141
142            for metric in &scope.metrics {
143                if metric.data.is_none() {
144                    continue;
145                }
146                if let Some(t) = metric.data.as_ref().map(from_metric_type) {
147                    metric_ctx.set_metric_type(t);
148                }
149
150                encode_metrics(
151                    &mut table_writer,
152                    metric,
153                    resource_attrs.as_ref(),
154                    scope_attrs.as_ref(),
155                    metric_ctx,
156                )?;
157            }
158        }
159    }
160
161    Ok(table_writer.into_row_insert_requests())
162}
163
164fn from_metric_type(data: &metric::Data) -> MetricType {
165    match data {
166        metric::Data::Gauge(_) => MetricType::Gauge,
167        metric::Data::Sum(s) => {
168            if s.is_monotonic {
169                MetricType::MonotonicSum
170            } else {
171                MetricType::NonMonotonicSum
172            }
173        }
174        metric::Data::Histogram(_) => MetricType::Histogram,
175        metric::Data::ExponentialHistogram(_) => MetricType::ExponentialHistogram,
176        metric::Data::Summary(_) => MetricType::Summary,
177    }
178}
179
180fn process_resource_attrs(attrs: &mut Vec<KeyValue>, metric_ctx: &OtlpMetricCtx) {
181    if metric_ctx.is_legacy {
182        return;
183    }
184
185    // remap service.name and service.instance.id to job and instance
186    let mut tmp = Vec::with_capacity(2);
187    for kv in attrs.iter() {
188        match &kv.key as &str {
189            KEY_SERVICE_NAME => {
190                tmp.push(KeyValue {
191                    key: JOB_KEY.to_string(),
192                    value: kv.value.clone(),
193                });
194            }
195            KEY_SERVICE_INSTANCE_ID => {
196                tmp.push(KeyValue {
197                    key: INSTANCE_KEY.to_string(),
198                    value: kv.value.clone(),
199                });
200            }
201            _ => {}
202        }
203    }
204
205    // if promote all, then exclude the list, else, include the list
206    if metric_ctx.promote_all_resource_attrs {
207        attrs.retain(|kv| !metric_ctx.resource_attrs.contains(&kv.key));
208    } else {
209        attrs.retain(|kv| {
210            metric_ctx.resource_attrs.contains(&kv.key)
211                || DEFAULT_PROMOTE_ATTRS_SET.contains(&kv.key)
212        });
213    }
214
215    attrs.extend(tmp);
216}
217
218fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Option<Vec<KeyValue>> {
219    if metric_ctx.is_legacy {
220        return scope.scope.as_ref().map(|s| s.attributes.clone());
221    };
222
223    if !metric_ctx.promote_scope_attrs {
224        return None;
225    }
226
227    // persist scope attrs with name, version and schema_url
228    scope.scope.as_ref().map(|s| {
229        let mut attrs = s.attributes.clone();
230        attrs.push(KeyValue {
231            key: OTEL_SCOPE_NAME.to_string(),
232            value: Some(AnyValue {
233                value: Some(any_value::Value::StringValue(s.name.clone())),
234            }),
235        });
236        attrs.push(KeyValue {
237            key: OTEL_SCOPE_VERSION.to_string(),
238            value: Some(AnyValue {
239                value: Some(any_value::Value::StringValue(s.version.clone())),
240            }),
241        });
242        attrs.push(KeyValue {
243            key: OTEL_SCOPE_SCHEMA_URL.to_string(),
244            value: Some(AnyValue {
245                value: Some(any_value::Value::StringValue(scope.schema_url.clone())),
246            }),
247        });
248        attrs
249    })
250}
251
252// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_name.go#L55
253pub fn normalize_metric_name(metric: &Metric, metric_type: &MetricType) -> String {
254    let mut name_tokens = NON_ALPHA_NUM_CHAR
255        .split(&metric.name)
256        .map(|s| s.to_string())
257        .collect_vec();
258    if !metric.unit.is_empty() {
259        let (main, per) = build_unit_suffix(&metric.unit);
260        if let Some(main) = main
261            && !name_tokens.contains(&main)
262        {
263            name_tokens.push(main);
264        }
265        if let Some(per) = per
266            && !name_tokens.contains(&per)
267        {
268            name_tokens.push("per".to_string());
269            name_tokens.push(per);
270        }
271    }
272
273    if matches!(metric_type, MetricType::MonotonicSum) {
274        name_tokens.retain(|t| t != TOTAL);
275        name_tokens.push(TOTAL.to_string());
276    }
277    if metric.unit == "1" && matches!(metric_type, MetricType::Gauge) {
278        name_tokens.retain(|t| t != RATIO);
279        name_tokens.push(RATIO.to_string());
280    }
281
282    let name = name_tokens.join(UNDERSCORE);
283
284    if let Some((_, first)) = name.char_indices().next()
285        && first >= '0'
286        && first <= '9'
287    {
288        format!("_{}", name)
289    } else {
290        name
291    }
292}
293
294fn build_unit_suffix(unit: &str) -> (Option<String>, Option<String>) {
295    let (main, per) = unit.split_once('/').unwrap_or((unit, ""));
296    (check_unit(main, &UNIT_MAP), check_unit(per, &PER_UNIT_MAP))
297}
298
299fn check_unit(unit_str: &str, unit_map: &HashMap<String, String>) -> Option<String> {
300    let u = unit_str.trim();
301    if !u.is_empty() && !u.contains("{}") {
302        let u = unit_map.get(u).map(|s| s.as_ref()).unwrap_or(u);
303        let u = clean_unit_name(u);
304        if !u.is_empty() {
305            return Some(u);
306        }
307    }
308    None
309}
310
311fn clean_unit_name(name: &str) -> String {
312    NON_ALPHA_NUM_CHAR.split(name).join(UNDERSCORE)
313}
314
315// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_label.go#L27
316pub fn normalize_label_name(name: &str) -> String {
317    if name.is_empty() {
318        return name.to_string();
319    }
320
321    let n = NON_ALPHA_NUM_CHAR.replace_all(name, UNDERSCORE);
322    if let Some((_, first)) = n.char_indices().next()
323        && first >= '0'
324        && first <= '9'
325    {
326        return format!("key_{}", n);
327    }
328    if n.starts_with(UNDERSCORE) && !n.starts_with(DOUBLE_UNDERSCORE) {
329        return format!("key{}", n);
330    }
331    n.to_string()
332}
333
334/// Normalize otlp instrumentation, metric and attribute names
335///
336/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-name-syntax>
337/// - since the name are case-insensitive, we transform them to lowercase for
338///   better sql usability
339/// - replace `.` and `-` with `_`
340pub fn legacy_normalize_otlp_name(name: &str) -> String {
341    name.to_lowercase().replace(['.', '-'], "_")
342}
343
344fn encode_metrics(
345    table_writer: &mut MultiTableData,
346    metric: &Metric,
347    resource_attrs: Option<&Vec<KeyValue>>,
348    scope_attrs: Option<&Vec<KeyValue>>,
349    metric_ctx: &OtlpMetricCtx,
350) -> Result<()> {
351    let name = if metric_ctx.is_legacy {
352        legacy_normalize_otlp_name(&metric.name)
353    } else {
354        normalize_metric_name(metric, &metric_ctx.metric_type)
355    };
356
357    // note that we don't store description or unit, we might want to deal with
358    // these fields in the future.
359    if let Some(data) = &metric.data {
360        match data {
361            metric::Data::Gauge(gauge) => {
362                encode_gauge(
363                    table_writer,
364                    &name,
365                    gauge,
366                    resource_attrs,
367                    scope_attrs,
368                    metric_ctx,
369                )?;
370            }
371            metric::Data::Sum(sum) => {
372                encode_sum(
373                    table_writer,
374                    &name,
375                    sum,
376                    resource_attrs,
377                    scope_attrs,
378                    metric_ctx,
379                )?;
380            }
381            metric::Data::Summary(summary) => {
382                encode_summary(
383                    table_writer,
384                    &name,
385                    summary,
386                    resource_attrs,
387                    scope_attrs,
388                    metric_ctx,
389                )?;
390            }
391            metric::Data::Histogram(hist) => {
392                encode_histogram(
393                    table_writer,
394                    &name,
395                    hist,
396                    resource_attrs,
397                    scope_attrs,
398                    metric_ctx,
399                )?;
400            }
401            // TODO(sunng87) leave ExponentialHistogram for next release
402            metric::Data::ExponentialHistogram(_hist) => {}
403        }
404    }
405
406    Ok(())
407}
408
409#[derive(Debug, Clone, Copy, PartialEq, Eq)]
410enum AttributeType {
411    Resource,
412    Scope,
413    DataPoint,
414    Legacy,
415}
416
417fn write_attributes(
418    writer: &mut TableData,
419    row: &mut Vec<Value>,
420    attrs: Option<&Vec<KeyValue>>,
421    attribute_type: AttributeType,
422) -> Result<()> {
423    let Some(attrs) = attrs else {
424        return Ok(());
425    };
426
427    let tags = attrs.iter().filter_map(|attr| {
428        attr.value
429            .as_ref()
430            .and_then(|v| v.value.as_ref())
431            .and_then(|val| {
432                let key = match attribute_type {
433                    AttributeType::Resource | AttributeType::DataPoint => {
434                        normalize_label_name(&attr.key)
435                    }
436                    AttributeType::Scope => {
437                        format!("otel_scope_{}", normalize_label_name(&attr.key))
438                    }
439                    AttributeType::Legacy => legacy_normalize_otlp_name(&attr.key),
440                };
441                match val {
442                    any_value::Value::StringValue(s) => Some((key, s.clone())),
443                    any_value::Value::IntValue(v) => Some((key, v.to_string())),
444                    any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
445                    _ => None, // TODO(sunng87): allow different type of values
446                }
447            })
448    });
449    row_writer::write_tags(writer, tags, row)?;
450
451    Ok(())
452}
453
454fn write_timestamp(
455    table: &mut TableData,
456    row: &mut Vec<Value>,
457    time_nano: i64,
458    legacy_mode: bool,
459) -> Result<()> {
460    if legacy_mode {
461        row_writer::write_ts_to_nanos(
462            table,
463            GREPTIME_TIMESTAMP,
464            Some(time_nano),
465            Precision::Nanosecond,
466            row,
467        )
468    } else {
469        row_writer::write_ts_to_millis(
470            table,
471            GREPTIME_TIMESTAMP,
472            Some(time_nano / 1000000),
473            Precision::Millisecond,
474            row,
475        )
476    }
477}
478
479fn write_data_point_value(
480    table: &mut TableData,
481    row: &mut Vec<Value>,
482    field: &str,
483    value: &Option<number_data_point::Value>,
484) -> Result<()> {
485    match value {
486        Some(number_data_point::Value::AsInt(val)) => {
487            // we coerce all values to f64
488            row_writer::write_f64(table, field, *val as f64, row)?;
489        }
490        Some(number_data_point::Value::AsDouble(val)) => {
491            row_writer::write_f64(table, field, *val, row)?;
492        }
493        _ => {}
494    }
495    Ok(())
496}
497
498fn write_tags_and_timestamp(
499    table: &mut TableData,
500    row: &mut Vec<Value>,
501    resource_attrs: Option<&Vec<KeyValue>>,
502    scope_attrs: Option<&Vec<KeyValue>>,
503    data_point_attrs: Option<&Vec<KeyValue>>,
504    timestamp_nanos: i64,
505    metric_ctx: &OtlpMetricCtx,
506) -> Result<()> {
507    if metric_ctx.is_legacy {
508        write_attributes(table, row, resource_attrs, AttributeType::Legacy)?;
509        write_attributes(table, row, scope_attrs, AttributeType::Legacy)?;
510        write_attributes(table, row, data_point_attrs, AttributeType::Legacy)?;
511    } else {
512        // TODO(shuiyisong): check `__type__` and `__unit__` tags in prometheus
513        write_attributes(table, row, resource_attrs, AttributeType::Resource)?;
514        write_attributes(table, row, scope_attrs, AttributeType::Scope)?;
515        write_attributes(table, row, data_point_attrs, AttributeType::DataPoint)?;
516    }
517
518    write_timestamp(table, row, timestamp_nanos, metric_ctx.is_legacy)?;
519
520    Ok(())
521}
522
523/// encode this gauge metric
524///
525/// note that there can be multiple data points in the request, it's going to be
526/// stored as multiple rows
527fn encode_gauge(
528    table_writer: &mut MultiTableData,
529    name: &str,
530    gauge: &Gauge,
531    resource_attrs: Option<&Vec<KeyValue>>,
532    scope_attrs: Option<&Vec<KeyValue>>,
533    metric_ctx: &OtlpMetricCtx,
534) -> Result<()> {
535    let table = table_writer.get_or_default_table_data(
536        name,
537        APPROXIMATE_COLUMN_COUNT,
538        gauge.data_points.len(),
539    );
540
541    for data_point in &gauge.data_points {
542        let mut row = table.alloc_one_row();
543        write_tags_and_timestamp(
544            table,
545            &mut row,
546            resource_attrs,
547            scope_attrs,
548            Some(data_point.attributes.as_ref()),
549            data_point.time_unix_nano as i64,
550            metric_ctx,
551        )?;
552
553        write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
554        table.add_row(row);
555    }
556
557    Ok(())
558}
559
560/// encode this sum metric
561///
562/// `aggregation_temporality` and `monotonic` are ignored for now
563fn encode_sum(
564    table_writer: &mut MultiTableData,
565    name: &str,
566    sum: &Sum,
567    resource_attrs: Option<&Vec<KeyValue>>,
568    scope_attrs: Option<&Vec<KeyValue>>,
569    metric_ctx: &OtlpMetricCtx,
570) -> Result<()> {
571    let table = table_writer.get_or_default_table_data(
572        name,
573        APPROXIMATE_COLUMN_COUNT,
574        sum.data_points.len(),
575    );
576
577    for data_point in &sum.data_points {
578        let mut row = table.alloc_one_row();
579        write_tags_and_timestamp(
580            table,
581            &mut row,
582            resource_attrs,
583            scope_attrs,
584            Some(data_point.attributes.as_ref()),
585            data_point.time_unix_nano as i64,
586            metric_ctx,
587        )?;
588        write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
589        table.add_row(row);
590    }
591
592    Ok(())
593}
594
595const HISTOGRAM_LE_COLUMN: &str = "le";
596
597/// Encode histogram data. This function returns 3 insert requests for 3 tables.
598///
599/// The implementation has been following Prometheus histogram table format:
600///
601/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper
602///   limit, and `greptime_value` for bucket count
603/// - A `%metric%_sum` table storing sum of samples
604/// -  A `%metric%_count` table storing count of samples.
605///
606/// By its Prometheus compatibility, we hope to be able to use prometheus
607/// quantile functions on this table.
608fn encode_histogram(
609    table_writer: &mut MultiTableData,
610    name: &str,
611    hist: &Histogram,
612    resource_attrs: Option<&Vec<KeyValue>>,
613    scope_attrs: Option<&Vec<KeyValue>>,
614    metric_ctx: &OtlpMetricCtx,
615) -> Result<()> {
616    let normalized_name = name;
617
618    let bucket_table_name = format!("{}_bucket", normalized_name);
619    let sum_table_name = format!("{}_sum", normalized_name);
620    let count_table_name = format!("{}_count", normalized_name);
621
622    let data_points_len = hist.data_points.len();
623    // Note that the row and columns number here is approximate
624    let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
625    let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
626    let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
627
628    for data_point in &hist.data_points {
629        let mut accumulated_count = 0;
630        for (idx, count) in data_point.bucket_counts.iter().enumerate() {
631            let mut bucket_row = bucket_table.alloc_one_row();
632            write_tags_and_timestamp(
633                &mut bucket_table,
634                &mut bucket_row,
635                resource_attrs,
636                scope_attrs,
637                Some(data_point.attributes.as_ref()),
638                data_point.time_unix_nano as i64,
639                metric_ctx,
640            )?;
641
642            if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
643                row_writer::write_tag(
644                    &mut bucket_table,
645                    HISTOGRAM_LE_COLUMN,
646                    upper_bounds,
647                    &mut bucket_row,
648                )?;
649            } else if idx == data_point.explicit_bounds.len() {
650                // The last bucket
651                row_writer::write_tag(
652                    &mut bucket_table,
653                    HISTOGRAM_LE_COLUMN,
654                    f64::INFINITY,
655                    &mut bucket_row,
656                )?;
657            }
658
659            accumulated_count += count;
660            row_writer::write_f64(
661                &mut bucket_table,
662                GREPTIME_VALUE,
663                accumulated_count as f64,
664                &mut bucket_row,
665            )?;
666
667            bucket_table.add_row(bucket_row);
668        }
669
670        if let Some(sum) = data_point.sum {
671            let mut sum_row = sum_table.alloc_one_row();
672            write_tags_and_timestamp(
673                &mut sum_table,
674                &mut sum_row,
675                resource_attrs,
676                scope_attrs,
677                Some(data_point.attributes.as_ref()),
678                data_point.time_unix_nano as i64,
679                metric_ctx,
680            )?;
681
682            row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?;
683            sum_table.add_row(sum_row);
684        }
685
686        let mut count_row = count_table.alloc_one_row();
687        write_tags_and_timestamp(
688            &mut count_table,
689            &mut count_row,
690            resource_attrs,
691            scope_attrs,
692            Some(data_point.attributes.as_ref()),
693            data_point.time_unix_nano as i64,
694            metric_ctx,
695        )?;
696
697        row_writer::write_f64(
698            &mut count_table,
699            GREPTIME_VALUE,
700            data_point.count as f64,
701            &mut count_row,
702        )?;
703        count_table.add_row(count_row);
704    }
705
706    table_writer.add_table_data(bucket_table_name, bucket_table);
707    table_writer.add_table_data(sum_table_name, sum_table);
708    table_writer.add_table_data(count_table_name, count_table);
709
710    Ok(())
711}
712
713#[allow(dead_code)]
714fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
715    // TODO(sunng87): implement this using a prometheus compatible way
716    Ok(())
717}
718
719fn encode_summary(
720    table_writer: &mut MultiTableData,
721    name: &str,
722    summary: &Summary,
723    resource_attrs: Option<&Vec<KeyValue>>,
724    scope_attrs: Option<&Vec<KeyValue>>,
725    metric_ctx: &OtlpMetricCtx,
726) -> Result<()> {
727    if metric_ctx.is_legacy {
728        let table = table_writer.get_or_default_table_data(
729            name,
730            APPROXIMATE_COLUMN_COUNT,
731            summary.data_points.len(),
732        );
733
734        for data_point in &summary.data_points {
735            let mut row = table.alloc_one_row();
736            write_tags_and_timestamp(
737                table,
738                &mut row,
739                resource_attrs,
740                scope_attrs,
741                Some(data_point.attributes.as_ref()),
742                data_point.time_unix_nano as i64,
743                metric_ctx,
744            )?;
745
746            for quantile in &data_point.quantile_values {
747                row_writer::write_f64(
748                    table,
749                    format!("greptime_p{:02}", quantile.quantile * 100f64),
750                    quantile.value,
751                    &mut row,
752                )?;
753            }
754
755            row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
756            table.add_row(row);
757        }
758    } else {
759        // 1. quantile table
760        // 2. count table
761        // 3. sum table
762
763        let metric_name = name;
764        let count_name = format!("{}{}", metric_name, COUNT_TABLE_SUFFIX);
765        let sum_name = format!("{}{}", metric_name, SUM_TABLE_SUFFIX);
766
767        for data_point in &summary.data_points {
768            {
769                let quantile_table = table_writer.get_or_default_table_data(
770                    metric_name,
771                    APPROXIMATE_COLUMN_COUNT,
772                    summary.data_points.len(),
773                );
774
775                for quantile in &data_point.quantile_values {
776                    let mut row = quantile_table.alloc_one_row();
777                    write_tags_and_timestamp(
778                        quantile_table,
779                        &mut row,
780                        resource_attrs,
781                        scope_attrs,
782                        Some(data_point.attributes.as_ref()),
783                        data_point.time_unix_nano as i64,
784                        metric_ctx,
785                    )?;
786                    row_writer::write_tag(quantile_table, "quantile", quantile.quantile, &mut row)?;
787                    row_writer::write_f64(
788                        quantile_table,
789                        GREPTIME_VALUE,
790                        quantile.value,
791                        &mut row,
792                    )?;
793                    quantile_table.add_row(row);
794                }
795            }
796            {
797                let count_table = table_writer.get_or_default_table_data(
798                    &count_name,
799                    APPROXIMATE_COLUMN_COUNT,
800                    summary.data_points.len(),
801                );
802                let mut row = count_table.alloc_one_row();
803                write_tags_and_timestamp(
804                    count_table,
805                    &mut row,
806                    resource_attrs,
807                    scope_attrs,
808                    Some(data_point.attributes.as_ref()),
809                    data_point.time_unix_nano as i64,
810                    metric_ctx,
811                )?;
812
813                row_writer::write_f64(
814                    count_table,
815                    GREPTIME_VALUE,
816                    data_point.count as f64,
817                    &mut row,
818                )?;
819
820                count_table.add_row(row);
821            }
822            {
823                let sum_table = table_writer.get_or_default_table_data(
824                    &sum_name,
825                    APPROXIMATE_COLUMN_COUNT,
826                    summary.data_points.len(),
827                );
828
829                let mut row = sum_table.alloc_one_row();
830                write_tags_and_timestamp(
831                    sum_table,
832                    &mut row,
833                    resource_attrs,
834                    scope_attrs,
835                    Some(data_point.attributes.as_ref()),
836                    data_point.time_unix_nano as i64,
837                    metric_ctx,
838                )?;
839
840                row_writer::write_f64(sum_table, GREPTIME_VALUE, data_point.sum, &mut row)?;
841
842                sum_table.add_row(row);
843            }
844        }
845    }
846
847    Ok(())
848}
849
850#[cfg(test)]
851mod tests {
852    use otel_arrow_rust::proto::opentelemetry::common::v1::any_value::Value as Val;
853    use otel_arrow_rust::proto::opentelemetry::common::v1::AnyValue;
854    use otel_arrow_rust::proto::opentelemetry::metrics::v1::number_data_point::Value;
855    use otel_arrow_rust::proto::opentelemetry::metrics::v1::summary_data_point::ValueAtQuantile;
856    use otel_arrow_rust::proto::opentelemetry::metrics::v1::{
857        AggregationTemporality, HistogramDataPoint, NumberDataPoint, SummaryDataPoint,
858    };
859
860    use super::*;
861
862    #[test]
863    fn test_legacy_normalize_otlp_name() {
864        assert_eq!(
865            legacy_normalize_otlp_name("jvm.memory.free"),
866            "jvm_memory_free"
867        );
868        assert_eq!(
869            legacy_normalize_otlp_name("jvm-memory-free"),
870            "jvm_memory_free"
871        );
872        assert_eq!(
873            legacy_normalize_otlp_name("jvm_memory_free"),
874            "jvm_memory_free"
875        );
876        assert_eq!(
877            legacy_normalize_otlp_name("JVM_MEMORY_FREE"),
878            "jvm_memory_free"
879        );
880        assert_eq!(
881            legacy_normalize_otlp_name("JVM_memory_FREE"),
882            "jvm_memory_free"
883        );
884    }
885
886    #[test]
887    fn test_normalize_metric_name() {
888        let test_cases = vec![
889            // Default case
890            (Metric::default(), MetricType::Init, ""),
891            // Basic metric with just name
892            (
893                Metric {
894                    name: "foo".to_string(),
895                    ..Default::default()
896                },
897                MetricType::Init,
898                "foo",
899            ),
900            // Metric with unit "s" should append "seconds"
901            (
902                Metric {
903                    name: "foo".to_string(),
904                    unit: "s".to_string(),
905                    ..Default::default()
906                },
907                MetricType::Init,
908                "foo_seconds",
909            ),
910            // Metric already ending with unit suffix should not duplicate
911            (
912                Metric {
913                    name: "foo_seconds".to_string(),
914                    unit: "s".to_string(),
915                    ..Default::default()
916                },
917                MetricType::Init,
918                "foo_seconds",
919            ),
920            // Monotonic sum should append "total"
921            (
922                Metric {
923                    name: "foo".to_string(),
924                    ..Default::default()
925                },
926                MetricType::MonotonicSum,
927                "foo_total",
928            ),
929            // Metric already ending with "total" should not duplicate
930            (
931                Metric {
932                    name: "foo_total".to_string(),
933                    ..Default::default()
934                },
935                MetricType::MonotonicSum,
936                "foo_total",
937            ),
938            // Monotonic sum with unit should append both unit and "total"
939            (
940                Metric {
941                    name: "foo".to_string(),
942                    unit: "s".to_string(),
943                    ..Default::default()
944                },
945                MetricType::MonotonicSum,
946                "foo_seconds_total",
947            ),
948            // Metric with unit suffix and monotonic sum
949            (
950                Metric {
951                    name: "foo_seconds".to_string(),
952                    unit: "s".to_string(),
953                    ..Default::default()
954                },
955                MetricType::MonotonicSum,
956                "foo_seconds_total",
957            ),
958            // Metric already ending with "total" and has unit
959            (
960                Metric {
961                    name: "foo_total".to_string(),
962                    unit: "s".to_string(),
963                    ..Default::default()
964                },
965                MetricType::MonotonicSum,
966                "foo_seconds_total",
967            ),
968            // Metric already ending with both unit and "total"
969            (
970                Metric {
971                    name: "foo_seconds_total".to_string(),
972                    unit: "s".to_string(),
973                    ..Default::default()
974                },
975                MetricType::MonotonicSum,
976                "foo_seconds_total",
977            ),
978            // Metric with unusual order (total_seconds) should be normalized
979            (
980                Metric {
981                    name: "foo_total_seconds".to_string(),
982                    unit: "s".to_string(),
983                    ..Default::default()
984                },
985                MetricType::MonotonicSum,
986                "foo_seconds_total",
987            ),
988            // Gauge with unit "1" should append "ratio"
989            (
990                Metric {
991                    name: "foo".to_string(),
992                    unit: "1".to_string(),
993                    ..Default::default()
994                },
995                MetricType::Gauge,
996                "foo_ratio",
997            ),
998            // Complex unit like "m/s" should be converted to "meters_per_second"
999            (
1000                Metric {
1001                    name: "foo".to_string(),
1002                    unit: "m/s".to_string(),
1003                    ..Default::default()
1004                },
1005                MetricType::Init,
1006                "foo_meters_per_second",
1007            ),
1008            // Metric with partial unit match
1009            (
1010                Metric {
1011                    name: "foo_second".to_string(),
1012                    unit: "m/s".to_string(),
1013                    ..Default::default()
1014                },
1015                MetricType::Init,
1016                "foo_second_meters",
1017            ),
1018            // Metric already containing the main unit
1019            (
1020                Metric {
1021                    name: "foo_meters".to_string(),
1022                    unit: "m/s".to_string(),
1023                    ..Default::default()
1024                },
1025                MetricType::Init,
1026                "foo_meters_per_second",
1027            ),
1028        ];
1029
1030        for (metric, metric_type, expected) in test_cases {
1031            let result = normalize_metric_name(&metric, &metric_type);
1032            assert_eq!(
1033                result, expected,
1034                "Failed for metric name: '{}', unit: '{}', type: {:?}",
1035                metric.name, metric.unit, metric_type
1036            );
1037        }
1038    }
1039
1040    #[test]
1041    fn test_normalize_label_name() {
1042        let test_cases = vec![
1043            ("", ""),
1044            ("foo", "foo"),
1045            ("foo_bar/baz:abc", "foo_bar_baz_abc"),
1046            ("1foo", "key_1foo"),
1047            ("_foo", "key_foo"),
1048            ("__bar", "__bar"),
1049        ];
1050
1051        for (input, expected) in test_cases {
1052            let result = normalize_label_name(input);
1053            assert_eq!(
1054                result, expected,
1055                "unexpected result for input '{}'; got '{}'; want '{}'",
1056                input, result, expected
1057            );
1058        }
1059    }
1060
1061    fn keyvalue(key: &str, value: &str) -> KeyValue {
1062        KeyValue {
1063            key: key.into(),
1064            value: Some(AnyValue {
1065                value: Some(Val::StringValue(value.into())),
1066            }),
1067        }
1068    }
1069
1070    #[test]
1071    fn test_encode_gauge() {
1072        let mut tables = MultiTableData::default();
1073
1074        let data_points = vec![
1075            NumberDataPoint {
1076                attributes: vec![keyvalue("host", "testsevrer")],
1077                time_unix_nano: 100,
1078                value: Some(Value::AsInt(100)),
1079                ..Default::default()
1080            },
1081            NumberDataPoint {
1082                attributes: vec![keyvalue("host", "testserver")],
1083                time_unix_nano: 105,
1084                value: Some(Value::AsInt(105)),
1085                ..Default::default()
1086            },
1087        ];
1088        let gauge = Gauge { data_points };
1089        encode_gauge(
1090            &mut tables,
1091            "datamon",
1092            &gauge,
1093            Some(&vec![]),
1094            Some(&vec![keyvalue("scope", "otel")]),
1095            &OtlpMetricCtx::default(),
1096        )
1097        .unwrap();
1098
1099        let table = tables.get_or_default_table_data("datamon", 0, 0);
1100        assert_eq!(table.num_rows(), 2);
1101        assert_eq!(table.num_columns(), 4);
1102        assert_eq!(
1103            table
1104                .columns()
1105                .iter()
1106                .map(|c| &c.column_name)
1107                .collect::<Vec<&String>>(),
1108            vec![
1109                "otel_scope_scope",
1110                "host",
1111                "greptime_timestamp",
1112                "greptime_value"
1113            ]
1114        );
1115    }
1116
1117    #[test]
1118    fn test_encode_sum() {
1119        let mut tables = MultiTableData::default();
1120
1121        let data_points = vec![
1122            NumberDataPoint {
1123                attributes: vec![keyvalue("host", "testserver")],
1124                time_unix_nano: 100,
1125                value: Some(Value::AsInt(100)),
1126                ..Default::default()
1127            },
1128            NumberDataPoint {
1129                attributes: vec![keyvalue("host", "testserver")],
1130                time_unix_nano: 105,
1131                value: Some(Value::AsInt(0)),
1132                ..Default::default()
1133            },
1134        ];
1135        let sum = Sum {
1136            data_points,
1137            ..Default::default()
1138        };
1139        encode_sum(
1140            &mut tables,
1141            "datamon",
1142            &sum,
1143            Some(&vec![]),
1144            Some(&vec![keyvalue("scope", "otel")]),
1145            &OtlpMetricCtx::default(),
1146        )
1147        .unwrap();
1148
1149        let table = tables.get_or_default_table_data("datamon", 0, 0);
1150        assert_eq!(table.num_rows(), 2);
1151        assert_eq!(table.num_columns(), 4);
1152        assert_eq!(
1153            table
1154                .columns()
1155                .iter()
1156                .map(|c| &c.column_name)
1157                .collect::<Vec<&String>>(),
1158            vec![
1159                "otel_scope_scope",
1160                "host",
1161                "greptime_timestamp",
1162                "greptime_value"
1163            ]
1164        );
1165    }
1166
1167    #[test]
1168    fn test_encode_summary() {
1169        let mut tables = MultiTableData::default();
1170
1171        let data_points = vec![SummaryDataPoint {
1172            attributes: vec![keyvalue("host", "testserver")],
1173            time_unix_nano: 100,
1174            count: 25,
1175            sum: 5400.0,
1176            quantile_values: vec![
1177                ValueAtQuantile {
1178                    quantile: 0.90,
1179                    value: 1000.0,
1180                },
1181                ValueAtQuantile {
1182                    quantile: 0.95,
1183                    value: 3030.0,
1184                },
1185            ],
1186            ..Default::default()
1187        }];
1188        let summary = Summary { data_points };
1189        encode_summary(
1190            &mut tables,
1191            "datamon",
1192            &summary,
1193            Some(&vec![]),
1194            Some(&vec![keyvalue("scope", "otel")]),
1195            &OtlpMetricCtx::default(),
1196        )
1197        .unwrap();
1198
1199        let table = tables.get_or_default_table_data("datamon", 0, 0);
1200        assert_eq!(table.num_rows(), 2);
1201        assert_eq!(table.num_columns(), 5);
1202        assert_eq!(
1203            table
1204                .columns()
1205                .iter()
1206                .map(|c| &c.column_name)
1207                .collect::<Vec<&String>>(),
1208            vec![
1209                "otel_scope_scope",
1210                "host",
1211                "greptime_timestamp",
1212                "quantile",
1213                "greptime_value"
1214            ]
1215        );
1216
1217        let table = tables.get_or_default_table_data("datamon_count", 0, 0);
1218        assert_eq!(table.num_rows(), 1);
1219        assert_eq!(table.num_columns(), 4);
1220        assert_eq!(
1221            table
1222                .columns()
1223                .iter()
1224                .map(|c| &c.column_name)
1225                .collect::<Vec<&String>>(),
1226            vec![
1227                "otel_scope_scope",
1228                "host",
1229                "greptime_timestamp",
1230                "greptime_value"
1231            ]
1232        );
1233
1234        let table = tables.get_or_default_table_data("datamon_sum", 0, 0);
1235        assert_eq!(table.num_rows(), 1);
1236        assert_eq!(table.num_columns(), 4);
1237        assert_eq!(
1238            table
1239                .columns()
1240                .iter()
1241                .map(|c| &c.column_name)
1242                .collect::<Vec<&String>>(),
1243            vec![
1244                "otel_scope_scope",
1245                "host",
1246                "greptime_timestamp",
1247                "greptime_value"
1248            ]
1249        );
1250    }
1251
1252    #[test]
1253    fn test_encode_histogram() {
1254        let mut tables = MultiTableData::default();
1255
1256        let data_points = vec![HistogramDataPoint {
1257            attributes: vec![keyvalue("host", "testserver")],
1258            time_unix_nano: 100,
1259            start_time_unix_nano: 23,
1260            count: 25,
1261            sum: Some(100.),
1262            max: Some(200.),
1263            min: Some(0.03),
1264            bucket_counts: vec![2, 4, 6, 9, 4],
1265            explicit_bounds: vec![0.1, 1., 10., 100.],
1266            ..Default::default()
1267        }];
1268
1269        let histogram = Histogram {
1270            data_points,
1271            aggregation_temporality: AggregationTemporality::Delta.into(),
1272        };
1273        encode_histogram(
1274            &mut tables,
1275            "histo",
1276            &histogram,
1277            Some(&vec![]),
1278            Some(&vec![keyvalue("scope", "otel")]),
1279            &OtlpMetricCtx::default(),
1280        )
1281        .unwrap();
1282
1283        assert_eq!(3, tables.num_tables());
1284
1285        // bucket table
1286        let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
1287        assert_eq!(bucket_table.num_rows(), 5);
1288        assert_eq!(bucket_table.num_columns(), 5);
1289        assert_eq!(
1290            bucket_table
1291                .columns()
1292                .iter()
1293                .map(|c| &c.column_name)
1294                .collect::<Vec<&String>>(),
1295            vec![
1296                "otel_scope_scope",
1297                "host",
1298                "greptime_timestamp",
1299                "le",
1300                "greptime_value",
1301            ]
1302        );
1303
1304        let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
1305        assert_eq!(sum_table.num_rows(), 1);
1306        assert_eq!(sum_table.num_columns(), 4);
1307        assert_eq!(
1308            sum_table
1309                .columns()
1310                .iter()
1311                .map(|c| &c.column_name)
1312                .collect::<Vec<&String>>(),
1313            vec![
1314                "otel_scope_scope",
1315                "host",
1316                "greptime_timestamp",
1317                "greptime_value"
1318            ]
1319        );
1320
1321        let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
1322        assert_eq!(count_table.num_rows(), 1);
1323        assert_eq!(count_table.num_columns(), 4);
1324        assert_eq!(
1325            count_table
1326                .columns()
1327                .iter()
1328                .map(|c| &c.column_name)
1329                .collect::<Vec<&String>>(),
1330            vec![
1331                "otel_scope_scope",
1332                "host",
1333                "greptime_timestamp",
1334                "greptime_value"
1335            ]
1336        );
1337    }
1338}