Skip to main content

servers/otlp/
metrics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use ahash::HashSet;
16use api::v1::{RowInsertRequests, Value};
17use common_grpc::precision::Precision;
18use common_query::prelude::{GREPTIME_COUNT, greptime_timestamp, greptime_value};
19use lazy_static::lazy_static;
20use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
21use otel_arrow_rust::proto::opentelemetry::common::v1::{AnyValue, KeyValue, any_value};
22use otel_arrow_rust::proto::opentelemetry::metrics::v1::{metric, number_data_point, *};
23use session::protocol_ctx::{MetricType, OtlpMetricCtx};
24use table::requests::{
25    METADATA_QUALITY_DECLARED, SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_METRIC_ORIGINAL_NAME,
26    SEMANTIC_METRIC_TEMPORALITY, SEMANTIC_METRIC_TYPE, SEMANTIC_METRIC_UNIT,
27};
28
29use crate::error::Result;
30use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME};
31use crate::row_writer::{self, MultiTableData, TableData};
32
33mod semantic;
34mod translator;
35
36pub use semantic::SemanticIndex;
37pub use translator::legacy_normalize_otlp_name;
38use translator::{translate_label_name, translate_metric_name};
39
40/// the default column count for table writer
41const APPROXIMATE_COLUMN_COUNT: usize = 8;
42
43const COUNT_TABLE_SUFFIX: &str = "_count";
44const SUM_TABLE_SUFFIX: &str = "_sum";
45const BUCKET_TABLE_SUFFIX: &str = "_bucket";
46
47// `greptime.semantic.metric.type` values stamped per emitted table. Must stay
48// within the domain accepted by `validate_semantic_option`; the drift-guard test
49// asserts this.
50const METRIC_TYPE_COUNTER: &str = "counter";
51const METRIC_TYPE_UPDOWN_COUNTER: &str = "updown_counter";
52const METRIC_TYPE_GAUGE: &str = "gauge";
53const METRIC_TYPE_HISTOGRAM: &str = "histogram";
54const METRIC_TYPE_SUMMARY: &str = "summary";
55
56const JOB_KEY: &str = "job";
57const INSTANCE_KEY: &str = "instance";
58
59// see: https://prometheus.io/docs/guides/opentelemetry/#promoting-resource-attributes
60const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [
61    "service.instance.id",
62    "service.name",
63    "service.namespace",
64    "service.version",
65    "cloud.availability_zone",
66    "cloud.region",
67    "container.name",
68    "deployment.environment",
69    "deployment.environment.name",
70    "k8s.cluster.name",
71    "k8s.container.name",
72    "k8s.cronjob.name",
73    "k8s.daemonset.name",
74    "k8s.deployment.name",
75    "k8s.job.name",
76    "k8s.namespace.name",
77    "k8s.pod.name",
78    "k8s.replicaset.name",
79    "k8s.statefulset.name",
80];
81
82lazy_static! {
83    static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet<String> =
84        HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string()));
85}
86
87const OTEL_SCOPE_NAME: &str = "name";
88const OTEL_SCOPE_VERSION: &str = "version";
89const OTEL_SCOPE_SCHEMA_URL: &str = "schema_url";
90
91/// Convert OpenTelemetry metrics to GreptimeDB insert requests
92///
93/// See
94/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto>
95/// for data structure of OTLP metrics.
96///
97/// Returns `InsertRequests`, total number of rows to ingest, and the per-table
98/// semantic index for the auto-create path to stamp as table options.
99pub fn to_grpc_insert_requests(
100    request: ExportMetricsServiceRequest,
101    metric_ctx: &mut OtlpMetricCtx,
102) -> Result<(RowInsertRequests, usize, SemanticIndex)> {
103    let mut table_writer = MultiTableData::default();
104    let mut semantic_index = SemanticIndex::default();
105
106    for resource in &request.resource_metrics {
107        let resource_attrs = resource.resource.as_ref().map(|r| {
108            let mut attrs = r.attributes.clone();
109            process_resource_attrs(&mut attrs, metric_ctx);
110            attrs
111        });
112
113        for scope in &resource.scope_metrics {
114            let scope_attrs = process_scope_attrs(scope, metric_ctx);
115
116            for metric in &scope.metrics {
117                if metric.data.is_none() {
118                    continue;
119                }
120                if let Some(t) = metric.data.as_ref().map(from_metric_type) {
121                    metric_ctx.set_metric_type(t);
122                }
123
124                encode_metrics(
125                    &mut table_writer,
126                    metric,
127                    resource_attrs.as_ref(),
128                    scope_attrs.as_ref(),
129                    metric_ctx,
130                    &mut semantic_index,
131                )?;
132            }
133        }
134    }
135
136    let (requests, rows) = table_writer.into_row_insert_requests();
137    Ok((requests, rows, semantic_index))
138}
139
140/// The tables a metric emits and their per-table `metric.type`. Histogram fans
141/// out into `_bucket` (the histogram) plus `_sum`/`_count` counters; summary
142/// fans out into the quantile table plus `_count`/`_sum` counters (legacy
143/// summary stays a single table).
144fn emitted_semantic_tables(
145    metric_type: &MetricType,
146    is_legacy: bool,
147    base: &str,
148) -> Vec<(String, &'static str)> {
149    match metric_type {
150        MetricType::Gauge => vec![(base.to_string(), METRIC_TYPE_GAUGE)],
151        MetricType::MonotonicSum => vec![(base.to_string(), METRIC_TYPE_COUNTER)],
152        MetricType::NonMonotonicSum => vec![(base.to_string(), METRIC_TYPE_UPDOWN_COUNTER)],
153        MetricType::Histogram => vec![
154            (
155                format!("{base}{BUCKET_TABLE_SUFFIX}"),
156                METRIC_TYPE_HISTOGRAM,
157            ),
158            (format!("{base}{SUM_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER),
159            (format!("{base}{COUNT_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER),
160        ],
161        MetricType::Summary if is_legacy => vec![(base.to_string(), METRIC_TYPE_SUMMARY)],
162        MetricType::Summary => vec![
163            (base.to_string(), METRIC_TYPE_SUMMARY),
164            (format!("{base}{COUNT_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER),
165            (format!("{base}{SUM_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER),
166        ],
167        // ExponentialHistogram is a no-op today; Init never reaches encoding.
168        MetricType::ExponentialHistogram | MetricType::Init => vec![],
169    }
170}
171
172/// Maps OTLP `aggregation_temporality` to the semantic value, or `None` when the
173/// instrument has no temporality (gauge/summary) or it is unspecified.
174fn temporality_value(data: &metric::Data) -> Option<&'static str> {
175    let raw = match data {
176        metric::Data::Sum(sum) => sum.aggregation_temporality,
177        metric::Data::Histogram(hist) => hist.aggregation_temporality,
178        _ => return None,
179    };
180    match AggregationTemporality::try_from(raw) {
181        Ok(AggregationTemporality::Delta) => Some("delta"),
182        Ok(AggregationTemporality::Cumulative) => Some("cumulative"),
183        _ => None,
184    }
185}
186
187/// Records the declared metric-level semantic keys for every table this metric
188/// emits.
189fn record_metric_semantics(
190    index: &mut SemanticIndex,
191    metric: &Metric,
192    name: &str,
193    metric_ctx: &OtlpMetricCtx,
194) {
195    let emitted = emitted_semantic_tables(&metric_ctx.metric_type, metric_ctx.is_legacy, name);
196    if emitted.is_empty() {
197        return;
198    }
199
200    let temporality = metric.data.as_ref().and_then(temporality_value);
201    let unit = metric.unit.trim();
202    // `original_name` is meaningful only when translation renamed the metric.
203    let original_name = (name != metric.name.as_str()).then_some(metric.name.as_str());
204
205    for (table, metric_type) in &emitted {
206        index.record_scalar(table, SEMANTIC_METRIC_TYPE, metric_type);
207        index.record_scalar(
208            table,
209            SEMANTIC_METRIC_METADATA_QUALITY,
210            METADATA_QUALITY_DECLARED,
211        );
212        if let Some(temporality) = temporality {
213            index.record_scalar(table, SEMANTIC_METRIC_TEMPORALITY, temporality);
214        }
215        if !unit.is_empty() {
216            index.record_scalar(table, SEMANTIC_METRIC_UNIT, unit);
217        }
218        if let Some(original_name) = original_name {
219            index.record_scalar(table, SEMANTIC_METRIC_ORIGINAL_NAME, original_name);
220        }
221    }
222}
223
224fn from_metric_type(data: &metric::Data) -> MetricType {
225    match data {
226        metric::Data::Gauge(_) => MetricType::Gauge,
227        metric::Data::Sum(s) => {
228            if s.is_monotonic {
229                MetricType::MonotonicSum
230            } else {
231                MetricType::NonMonotonicSum
232            }
233        }
234        metric::Data::Histogram(_) => MetricType::Histogram,
235        metric::Data::ExponentialHistogram(_) => MetricType::ExponentialHistogram,
236        metric::Data::Summary(_) => MetricType::Summary,
237    }
238}
239
240fn process_resource_attrs(attrs: &mut Vec<KeyValue>, metric_ctx: &OtlpMetricCtx) {
241    if metric_ctx.is_legacy {
242        return;
243    }
244
245    // remap service.name and service.instance.id to job and instance
246    let mut tmp = Vec::with_capacity(2);
247    for kv in attrs.iter() {
248        match &kv.key as &str {
249            KEY_SERVICE_NAME => {
250                tmp.push(KeyValue {
251                    key: JOB_KEY.to_string(),
252                    value: kv.value.clone(),
253                });
254            }
255            KEY_SERVICE_INSTANCE_ID => {
256                tmp.push(KeyValue {
257                    key: INSTANCE_KEY.to_string(),
258                    value: kv.value.clone(),
259                });
260            }
261            _ => {}
262        }
263    }
264
265    // if promote all, then exclude the list, else, include the list
266    if metric_ctx.promote_all_resource_attrs {
267        attrs.retain(|kv| !metric_ctx.resource_attrs.contains(&kv.key));
268    } else {
269        attrs.retain(|kv| {
270            metric_ctx.resource_attrs.contains(&kv.key)
271                || DEFAULT_PROMOTE_ATTRS_SET.contains(&kv.key)
272        });
273    }
274
275    attrs.extend(tmp);
276}
277
278fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Option<Vec<KeyValue>> {
279    if metric_ctx.is_legacy {
280        return scope.scope.as_ref().map(|s| s.attributes.clone());
281    };
282
283    if !metric_ctx.promote_scope_attrs {
284        return None;
285    }
286
287    // persist scope attrs with name, version and schema_url
288    scope.scope.as_ref().map(|s| {
289        let mut attrs = s.attributes.clone();
290        attrs.push(KeyValue {
291            key: OTEL_SCOPE_NAME.to_string(),
292            value: Some(AnyValue {
293                value: Some(any_value::Value::StringValue(s.name.clone())),
294            }),
295        });
296        attrs.push(KeyValue {
297            key: OTEL_SCOPE_VERSION.to_string(),
298            value: Some(AnyValue {
299                value: Some(any_value::Value::StringValue(s.version.clone())),
300            }),
301        });
302        attrs.push(KeyValue {
303            key: OTEL_SCOPE_SCHEMA_URL.to_string(),
304            value: Some(AnyValue {
305                value: Some(any_value::Value::StringValue(scope.schema_url.clone())),
306            }),
307        });
308        attrs
309    })
310}
311
312fn encode_metrics(
313    table_writer: &mut MultiTableData,
314    metric: &Metric,
315    resource_attrs: Option<&Vec<KeyValue>>,
316    scope_attrs: Option<&Vec<KeyValue>>,
317    metric_ctx: &OtlpMetricCtx,
318    semantic_index: &mut SemanticIndex,
319) -> Result<()> {
320    let name = if metric_ctx.is_legacy {
321        legacy_normalize_otlp_name(&metric.name)
322    } else {
323        translate_metric_name(
324            metric,
325            &metric_ctx.metric_type,
326            metric_ctx.metric_translation_strategy,
327        )
328    };
329
330    // Stamp semantic metadata against the same table name(s) the data is written
331    // to below. `unit` is captured here (it is otherwise discarded by the row
332    // encoders) along with the declared type/temporality.
333    record_metric_semantics(semantic_index, metric, &name, metric_ctx);
334
335    if let Some(data) = &metric.data {
336        match data {
337            metric::Data::Gauge(gauge) => {
338                encode_gauge(
339                    table_writer,
340                    &name,
341                    gauge,
342                    resource_attrs,
343                    scope_attrs,
344                    metric_ctx,
345                )?;
346            }
347            metric::Data::Sum(sum) => {
348                encode_sum(
349                    table_writer,
350                    &name,
351                    sum,
352                    resource_attrs,
353                    scope_attrs,
354                    metric_ctx,
355                )?;
356            }
357            metric::Data::Summary(summary) => {
358                encode_summary(
359                    table_writer,
360                    &name,
361                    summary,
362                    resource_attrs,
363                    scope_attrs,
364                    metric_ctx,
365                )?;
366            }
367            metric::Data::Histogram(hist) => {
368                encode_histogram(
369                    table_writer,
370                    &name,
371                    hist,
372                    resource_attrs,
373                    scope_attrs,
374                    metric_ctx,
375                )?;
376            }
377            // TODO(sunng87) leave ExponentialHistogram for next release
378            metric::Data::ExponentialHistogram(_hist) => {}
379        }
380    }
381
382    Ok(())
383}
384
385#[derive(Debug, Clone, Copy, PartialEq, Eq)]
386enum AttributeType {
387    Resource,
388    Scope,
389    DataPoint,
390    Legacy,
391}
392
393fn write_attributes(
394    writer: &mut TableData,
395    row: &mut Vec<Value>,
396    attrs: Option<&Vec<KeyValue>>,
397    attribute_type: AttributeType,
398    metric_ctx: &OtlpMetricCtx,
399) -> Result<()> {
400    let Some(attrs) = attrs else {
401        return Ok(());
402    };
403
404    let tags = attrs.iter().filter_map(|attr| {
405        attr.value
406            .as_ref()
407            .and_then(|v| v.value.as_ref())
408            .and_then(|val| {
409                let key = match attribute_type {
410                    AttributeType::Resource | AttributeType::DataPoint => {
411                        translate_label_name(&attr.key, metric_ctx.metric_translation_strategy)
412                    }
413                    AttributeType::Scope => {
414                        format!(
415                            "otel_scope_{}",
416                            translate_label_name(&attr.key, metric_ctx.metric_translation_strategy)
417                        )
418                    }
419                    AttributeType::Legacy => legacy_normalize_otlp_name(&attr.key),
420                };
421                match val {
422                    any_value::Value::StringValue(s) => Some((key, s.clone())),
423                    any_value::Value::IntValue(v) => Some((key, v.to_string())),
424                    any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
425                    _ => None, // TODO(sunng87): allow different type of values
426                }
427            })
428    });
429    row_writer::write_tags(writer, tags, row)?;
430
431    Ok(())
432}
433
434fn write_timestamp(
435    table: &mut TableData,
436    row: &mut Vec<Value>,
437    time_nano: i64,
438    legacy_mode: bool,
439) -> Result<()> {
440    if legacy_mode {
441        row_writer::write_ts_to_nanos(
442            table,
443            greptime_timestamp(),
444            Some(time_nano),
445            Precision::Nanosecond,
446            row,
447        )
448    } else {
449        row_writer::write_ts_to_millis(
450            table,
451            greptime_timestamp(),
452            Some(time_nano / 1000000),
453            Precision::Millisecond,
454            row,
455        )
456    }
457}
458
459fn write_data_point_value(
460    table: &mut TableData,
461    row: &mut Vec<Value>,
462    field: &str,
463    value: &Option<number_data_point::Value>,
464) -> Result<()> {
465    match value {
466        Some(number_data_point::Value::AsInt(val)) => {
467            // we coerce all values to f64
468            row_writer::write_f64(table, field, *val as f64, row)?;
469        }
470        Some(number_data_point::Value::AsDouble(val)) => {
471            row_writer::write_f64(table, field, *val, row)?;
472        }
473        _ => {}
474    }
475    Ok(())
476}
477
478fn write_tags_and_timestamp(
479    table: &mut TableData,
480    row: &mut Vec<Value>,
481    resource_attrs: Option<&Vec<KeyValue>>,
482    scope_attrs: Option<&Vec<KeyValue>>,
483    data_point_attrs: Option<&Vec<KeyValue>>,
484    timestamp_nanos: i64,
485    metric_ctx: &OtlpMetricCtx,
486) -> Result<()> {
487    if metric_ctx.is_legacy {
488        write_attributes(
489            table,
490            row,
491            resource_attrs,
492            AttributeType::Legacy,
493            metric_ctx,
494        )?;
495        write_attributes(table, row, scope_attrs, AttributeType::Legacy, metric_ctx)?;
496        write_attributes(
497            table,
498            row,
499            data_point_attrs,
500            AttributeType::Legacy,
501            metric_ctx,
502        )?;
503    } else {
504        // TODO(shuiyisong): check `__type__` and `__unit__` tags in prometheus
505        write_attributes(
506            table,
507            row,
508            resource_attrs,
509            AttributeType::Resource,
510            metric_ctx,
511        )?;
512        write_attributes(table, row, scope_attrs, AttributeType::Scope, metric_ctx)?;
513        write_attributes(
514            table,
515            row,
516            data_point_attrs,
517            AttributeType::DataPoint,
518            metric_ctx,
519        )?;
520    }
521
522    write_timestamp(table, row, timestamp_nanos, metric_ctx.is_legacy)?;
523
524    Ok(())
525}
526
527/// encode this gauge metric
528///
529/// note that there can be multiple data points in the request, it's going to be
530/// stored as multiple rows
531fn encode_gauge(
532    table_writer: &mut MultiTableData,
533    name: &str,
534    gauge: &Gauge,
535    resource_attrs: Option<&Vec<KeyValue>>,
536    scope_attrs: Option<&Vec<KeyValue>>,
537    metric_ctx: &OtlpMetricCtx,
538) -> Result<()> {
539    let table = table_writer.get_or_default_table_data(
540        name,
541        APPROXIMATE_COLUMN_COUNT,
542        gauge.data_points.len(),
543    );
544
545    for data_point in &gauge.data_points {
546        let mut row = table.alloc_one_row();
547        write_tags_and_timestamp(
548            table,
549            &mut row,
550            resource_attrs,
551            scope_attrs,
552            Some(data_point.attributes.as_ref()),
553            data_point.time_unix_nano as i64,
554            metric_ctx,
555        )?;
556
557        write_data_point_value(table, &mut row, greptime_value(), &data_point.value)?;
558        table.add_row(row);
559    }
560
561    Ok(())
562}
563
564/// encode this sum metric
565///
566/// `aggregation_temporality` and `monotonic` are ignored for now
567fn encode_sum(
568    table_writer: &mut MultiTableData,
569    name: &str,
570    sum: &Sum,
571    resource_attrs: Option<&Vec<KeyValue>>,
572    scope_attrs: Option<&Vec<KeyValue>>,
573    metric_ctx: &OtlpMetricCtx,
574) -> Result<()> {
575    let table = table_writer.get_or_default_table_data(
576        name,
577        APPROXIMATE_COLUMN_COUNT,
578        sum.data_points.len(),
579    );
580
581    for data_point in &sum.data_points {
582        let mut row = table.alloc_one_row();
583        write_tags_and_timestamp(
584            table,
585            &mut row,
586            resource_attrs,
587            scope_attrs,
588            Some(data_point.attributes.as_ref()),
589            data_point.time_unix_nano as i64,
590            metric_ctx,
591        )?;
592        write_data_point_value(table, &mut row, greptime_value(), &data_point.value)?;
593        table.add_row(row);
594    }
595
596    Ok(())
597}
598
599const HISTOGRAM_LE_COLUMN: &str = "le";
600
601/// Encode histogram data. This function returns 3 insert requests for 3 tables.
602///
603/// The implementation has been following Prometheus histogram table format:
604///
605/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper
606///   limit, and `greptime_value` for bucket count
607/// - A `%metric%_sum` table storing sum of samples
608/// -  A `%metric%_count` table storing count of samples.
609///
610/// By its Prometheus compatibility, we hope to be able to use prometheus
611/// quantile functions on this table.
612fn encode_histogram(
613    table_writer: &mut MultiTableData,
614    name: &str,
615    hist: &Histogram,
616    resource_attrs: Option<&Vec<KeyValue>>,
617    scope_attrs: Option<&Vec<KeyValue>>,
618    metric_ctx: &OtlpMetricCtx,
619) -> Result<()> {
620    let normalized_name = name;
621
622    let bucket_table_name = format!("{}{}", normalized_name, BUCKET_TABLE_SUFFIX);
623    let sum_table_name = format!("{}{}", normalized_name, SUM_TABLE_SUFFIX);
624    let count_table_name = format!("{}{}", normalized_name, COUNT_TABLE_SUFFIX);
625
626    let data_points_len = hist.data_points.len();
627    // Note that the row and columns number here is approximate
628    let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
629    let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
630    let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
631
632    for data_point in &hist.data_points {
633        let mut accumulated_count = 0;
634        for (idx, count) in data_point.bucket_counts.iter().enumerate() {
635            let mut bucket_row = bucket_table.alloc_one_row();
636            write_tags_and_timestamp(
637                &mut bucket_table,
638                &mut bucket_row,
639                resource_attrs,
640                scope_attrs,
641                Some(data_point.attributes.as_ref()),
642                data_point.time_unix_nano as i64,
643                metric_ctx,
644            )?;
645
646            if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
647                row_writer::write_tag(
648                    &mut bucket_table,
649                    HISTOGRAM_LE_COLUMN,
650                    upper_bounds,
651                    &mut bucket_row,
652                )?;
653            } else if idx == data_point.explicit_bounds.len() {
654                // The last bucket
655                row_writer::write_tag(
656                    &mut bucket_table,
657                    HISTOGRAM_LE_COLUMN,
658                    f64::INFINITY,
659                    &mut bucket_row,
660                )?;
661            }
662
663            accumulated_count += count;
664            row_writer::write_f64(
665                &mut bucket_table,
666                greptime_value(),
667                accumulated_count as f64,
668                &mut bucket_row,
669            )?;
670
671            bucket_table.add_row(bucket_row);
672        }
673
674        if let Some(sum) = data_point.sum {
675            let mut sum_row = sum_table.alloc_one_row();
676            write_tags_and_timestamp(
677                &mut sum_table,
678                &mut sum_row,
679                resource_attrs,
680                scope_attrs,
681                Some(data_point.attributes.as_ref()),
682                data_point.time_unix_nano as i64,
683                metric_ctx,
684            )?;
685
686            row_writer::write_f64(&mut sum_table, greptime_value(), sum, &mut sum_row)?;
687            sum_table.add_row(sum_row);
688        }
689
690        let mut count_row = count_table.alloc_one_row();
691        write_tags_and_timestamp(
692            &mut count_table,
693            &mut count_row,
694            resource_attrs,
695            scope_attrs,
696            Some(data_point.attributes.as_ref()),
697            data_point.time_unix_nano as i64,
698            metric_ctx,
699        )?;
700
701        row_writer::write_f64(
702            &mut count_table,
703            greptime_value(),
704            data_point.count as f64,
705            &mut count_row,
706        )?;
707        count_table.add_row(count_row);
708    }
709
710    table_writer.add_table_data(bucket_table_name, bucket_table);
711    table_writer.add_table_data(sum_table_name, sum_table);
712    table_writer.add_table_data(count_table_name, count_table);
713
714    Ok(())
715}
716
717#[allow(dead_code)]
718fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
719    // TODO(sunng87): implement this using a prometheus compatible way
720    Ok(())
721}
722
723fn encode_summary(
724    table_writer: &mut MultiTableData,
725    name: &str,
726    summary: &Summary,
727    resource_attrs: Option<&Vec<KeyValue>>,
728    scope_attrs: Option<&Vec<KeyValue>>,
729    metric_ctx: &OtlpMetricCtx,
730) -> Result<()> {
731    if metric_ctx.is_legacy {
732        let table = table_writer.get_or_default_table_data(
733            name,
734            APPROXIMATE_COLUMN_COUNT,
735            summary.data_points.len(),
736        );
737
738        for data_point in &summary.data_points {
739            let mut row = table.alloc_one_row();
740            write_tags_and_timestamp(
741                table,
742                &mut row,
743                resource_attrs,
744                scope_attrs,
745                Some(data_point.attributes.as_ref()),
746                data_point.time_unix_nano as i64,
747                metric_ctx,
748            )?;
749
750            for quantile in &data_point.quantile_values {
751                row_writer::write_f64(
752                    table,
753                    format!("greptime_p{:02}", quantile.quantile * 100f64),
754                    quantile.value,
755                    &mut row,
756                )?;
757            }
758
759            row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
760            table.add_row(row);
761        }
762    } else {
763        // 1. quantile table
764        // 2. count table
765        // 3. sum table
766
767        let metric_name = name;
768        let count_name = format!("{}{}", metric_name, COUNT_TABLE_SUFFIX);
769        let sum_name = format!("{}{}", metric_name, SUM_TABLE_SUFFIX);
770
771        for data_point in &summary.data_points {
772            {
773                let quantile_table = table_writer.get_or_default_table_data(
774                    metric_name,
775                    APPROXIMATE_COLUMN_COUNT,
776                    summary.data_points.len(),
777                );
778
779                for quantile in &data_point.quantile_values {
780                    let mut row = quantile_table.alloc_one_row();
781                    write_tags_and_timestamp(
782                        quantile_table,
783                        &mut row,
784                        resource_attrs,
785                        scope_attrs,
786                        Some(data_point.attributes.as_ref()),
787                        data_point.time_unix_nano as i64,
788                        metric_ctx,
789                    )?;
790                    row_writer::write_tag(quantile_table, "quantile", quantile.quantile, &mut row)?;
791                    row_writer::write_f64(
792                        quantile_table,
793                        greptime_value(),
794                        quantile.value,
795                        &mut row,
796                    )?;
797                    quantile_table.add_row(row);
798                }
799            }
800            {
801                let count_table = table_writer.get_or_default_table_data(
802                    &count_name,
803                    APPROXIMATE_COLUMN_COUNT,
804                    summary.data_points.len(),
805                );
806                let mut row = count_table.alloc_one_row();
807                write_tags_and_timestamp(
808                    count_table,
809                    &mut row,
810                    resource_attrs,
811                    scope_attrs,
812                    Some(data_point.attributes.as_ref()),
813                    data_point.time_unix_nano as i64,
814                    metric_ctx,
815                )?;
816
817                row_writer::write_f64(
818                    count_table,
819                    greptime_value(),
820                    data_point.count as f64,
821                    &mut row,
822                )?;
823
824                count_table.add_row(row);
825            }
826            {
827                let sum_table = table_writer.get_or_default_table_data(
828                    &sum_name,
829                    APPROXIMATE_COLUMN_COUNT,
830                    summary.data_points.len(),
831                );
832
833                let mut row = sum_table.alloc_one_row();
834                write_tags_and_timestamp(
835                    sum_table,
836                    &mut row,
837                    resource_attrs,
838                    scope_attrs,
839                    Some(data_point.attributes.as_ref()),
840                    data_point.time_unix_nano as i64,
841                    metric_ctx,
842                )?;
843
844                row_writer::write_f64(sum_table, greptime_value(), data_point.sum, &mut row)?;
845
846                sum_table.add_row(row);
847            }
848        }
849    }
850
851    Ok(())
852}
853
854#[cfg(test)]
855mod tests {
856    use otel_arrow_rust::proto::opentelemetry::common::v1::AnyValue;
857    use otel_arrow_rust::proto::opentelemetry::common::v1::any_value::Value as Val;
858    use otel_arrow_rust::proto::opentelemetry::metrics::v1::number_data_point::Value;
859    use otel_arrow_rust::proto::opentelemetry::metrics::v1::summary_data_point::ValueAtQuantile;
860    use otel_arrow_rust::proto::opentelemetry::metrics::v1::{
861        AggregationTemporality, HistogramDataPoint, NumberDataPoint, SummaryDataPoint,
862    };
863
864    use super::*;
865
866    fn keyvalue(key: &str, value: &str) -> KeyValue {
867        KeyValue {
868            key: key.into(),
869            value: Some(AnyValue {
870                value: Some(Val::StringValue(value.into())),
871            }),
872        }
873    }
874
875    #[test]
876    fn test_encode_gauge() {
877        let mut tables = MultiTableData::default();
878
879        let data_points = vec![
880            NumberDataPoint {
881                attributes: vec![keyvalue("host", "testsevrer")],
882                time_unix_nano: 100,
883                value: Some(Value::AsInt(100)),
884                ..Default::default()
885            },
886            NumberDataPoint {
887                attributes: vec![keyvalue("host", "testserver")],
888                time_unix_nano: 105,
889                value: Some(Value::AsInt(105)),
890                ..Default::default()
891            },
892        ];
893        let gauge = Gauge { data_points };
894        encode_gauge(
895            &mut tables,
896            "datamon",
897            &gauge,
898            Some(&vec![]),
899            Some(&vec![keyvalue("scope", "otel")]),
900            &OtlpMetricCtx::default(),
901        )
902        .unwrap();
903
904        let table = tables.get_or_default_table_data("datamon", 0, 0);
905        assert_eq!(table.num_rows(), 2);
906        assert_eq!(table.num_columns(), 4);
907        assert_eq!(
908            table
909                .columns()
910                .iter()
911                .map(|c| &c.column_name)
912                .collect::<Vec<&String>>(),
913            vec![
914                "otel_scope_scope",
915                "host",
916                greptime_timestamp(),
917                greptime_value()
918            ]
919        );
920    }
921
922    #[test]
923    fn test_encode_sum() {
924        let mut tables = MultiTableData::default();
925
926        let data_points = vec![
927            NumberDataPoint {
928                attributes: vec![keyvalue("host", "testserver")],
929                time_unix_nano: 100,
930                value: Some(Value::AsInt(100)),
931                ..Default::default()
932            },
933            NumberDataPoint {
934                attributes: vec![keyvalue("host", "testserver")],
935                time_unix_nano: 105,
936                value: Some(Value::AsInt(0)),
937                ..Default::default()
938            },
939        ];
940        let sum = Sum {
941            data_points,
942            ..Default::default()
943        };
944        encode_sum(
945            &mut tables,
946            "datamon",
947            &sum,
948            Some(&vec![]),
949            Some(&vec![keyvalue("scope", "otel")]),
950            &OtlpMetricCtx::default(),
951        )
952        .unwrap();
953
954        let table = tables.get_or_default_table_data("datamon", 0, 0);
955        assert_eq!(table.num_rows(), 2);
956        assert_eq!(table.num_columns(), 4);
957        assert_eq!(
958            table
959                .columns()
960                .iter()
961                .map(|c| &c.column_name)
962                .collect::<Vec<&String>>(),
963            vec![
964                "otel_scope_scope",
965                "host",
966                greptime_timestamp(),
967                greptime_value()
968            ]
969        );
970    }
971
972    #[test]
973    fn test_encode_summary() {
974        let mut tables = MultiTableData::default();
975
976        let data_points = vec![SummaryDataPoint {
977            attributes: vec![keyvalue("host", "testserver")],
978            time_unix_nano: 100,
979            count: 25,
980            sum: 5400.0,
981            quantile_values: vec![
982                ValueAtQuantile {
983                    quantile: 0.90,
984                    value: 1000.0,
985                },
986                ValueAtQuantile {
987                    quantile: 0.95,
988                    value: 3030.0,
989                },
990            ],
991            ..Default::default()
992        }];
993        let summary = Summary { data_points };
994        encode_summary(
995            &mut tables,
996            "datamon",
997            &summary,
998            Some(&vec![]),
999            Some(&vec![keyvalue("scope", "otel")]),
1000            &OtlpMetricCtx::default(),
1001        )
1002        .unwrap();
1003
1004        let table = tables.get_or_default_table_data("datamon", 0, 0);
1005        assert_eq!(table.num_rows(), 2);
1006        assert_eq!(table.num_columns(), 5);
1007        assert_eq!(
1008            table
1009                .columns()
1010                .iter()
1011                .map(|c| &c.column_name)
1012                .collect::<Vec<&String>>(),
1013            vec![
1014                "otel_scope_scope",
1015                "host",
1016                greptime_timestamp(),
1017                "quantile",
1018                greptime_value()
1019            ]
1020        );
1021
1022        let table = tables.get_or_default_table_data("datamon_count", 0, 0);
1023        assert_eq!(table.num_rows(), 1);
1024        assert_eq!(table.num_columns(), 4);
1025        assert_eq!(
1026            table
1027                .columns()
1028                .iter()
1029                .map(|c| &c.column_name)
1030                .collect::<Vec<&String>>(),
1031            vec![
1032                "otel_scope_scope",
1033                "host",
1034                greptime_timestamp(),
1035                greptime_value()
1036            ]
1037        );
1038
1039        let table = tables.get_or_default_table_data("datamon_sum", 0, 0);
1040        assert_eq!(table.num_rows(), 1);
1041        assert_eq!(table.num_columns(), 4);
1042        assert_eq!(
1043            table
1044                .columns()
1045                .iter()
1046                .map(|c| &c.column_name)
1047                .collect::<Vec<&String>>(),
1048            vec![
1049                "otel_scope_scope",
1050                "host",
1051                greptime_timestamp(),
1052                greptime_value()
1053            ]
1054        );
1055    }
1056
1057    #[test]
1058    fn test_encode_histogram() {
1059        let mut tables = MultiTableData::default();
1060
1061        let data_points = vec![HistogramDataPoint {
1062            attributes: vec![keyvalue("host", "testserver")],
1063            time_unix_nano: 100,
1064            start_time_unix_nano: 23,
1065            count: 25,
1066            sum: Some(100.),
1067            max: Some(200.),
1068            min: Some(0.03),
1069            bucket_counts: vec![2, 4, 6, 9, 4],
1070            explicit_bounds: vec![0.1, 1., 10., 100.],
1071            ..Default::default()
1072        }];
1073
1074        let histogram = Histogram {
1075            data_points,
1076            aggregation_temporality: AggregationTemporality::Delta.into(),
1077        };
1078        encode_histogram(
1079            &mut tables,
1080            "histo",
1081            &histogram,
1082            Some(&vec![]),
1083            Some(&vec![keyvalue("scope", "otel")]),
1084            &OtlpMetricCtx::default(),
1085        )
1086        .unwrap();
1087
1088        assert_eq!(3, tables.num_tables());
1089
1090        // bucket table
1091        let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
1092        assert_eq!(bucket_table.num_rows(), 5);
1093        assert_eq!(bucket_table.num_columns(), 5);
1094        assert_eq!(
1095            bucket_table
1096                .columns()
1097                .iter()
1098                .map(|c| &c.column_name)
1099                .collect::<Vec<&String>>(),
1100            vec![
1101                "otel_scope_scope",
1102                "host",
1103                greptime_timestamp(),
1104                "le",
1105                greptime_value(),
1106            ]
1107        );
1108
1109        let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
1110        assert_eq!(sum_table.num_rows(), 1);
1111        assert_eq!(sum_table.num_columns(), 4);
1112        assert_eq!(
1113            sum_table
1114                .columns()
1115                .iter()
1116                .map(|c| &c.column_name)
1117                .collect::<Vec<&String>>(),
1118            vec![
1119                "otel_scope_scope",
1120                "host",
1121                greptime_timestamp(),
1122                greptime_value()
1123            ]
1124        );
1125
1126        let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
1127        assert_eq!(count_table.num_rows(), 1);
1128        assert_eq!(count_table.num_columns(), 4);
1129        assert_eq!(
1130            count_table
1131                .columns()
1132                .iter()
1133                .map(|c| &c.column_name)
1134                .collect::<Vec<&String>>(),
1135            vec![
1136                "otel_scope_scope",
1137                "host",
1138                greptime_timestamp(),
1139                greptime_value()
1140            ]
1141        );
1142    }
1143
1144    use std::collections::BTreeMap;
1145
1146    use table::requests::validate_semantic_option;
1147
1148    fn decode(index: &SemanticIndex) -> BTreeMap<String, BTreeMap<String, String>> {
1149        serde_json::from_str(&index.encode().expect("non-empty index")).unwrap()
1150    }
1151
1152    fn record(metric: &Metric, metric_type: MetricType, name: &str) -> SemanticIndex {
1153        let ctx = OtlpMetricCtx {
1154            metric_type,
1155            ..Default::default()
1156        };
1157        let mut index = SemanticIndex::default();
1158        record_metric_semantics(&mut index, metric, name, &ctx);
1159        index
1160    }
1161
1162    #[test]
1163    fn test_metric_type_constants_validate() {
1164        for value in [
1165            METRIC_TYPE_COUNTER,
1166            METRIC_TYPE_UPDOWN_COUNTER,
1167            METRIC_TYPE_GAUGE,
1168            METRIC_TYPE_HISTOGRAM,
1169            METRIC_TYPE_SUMMARY,
1170        ] {
1171            assert!(
1172                validate_semantic_option(SEMANTIC_METRIC_TYPE, value),
1173                "metric.type value `{value}` must be in the vocabulary domain"
1174            );
1175        }
1176        for value in ["delta", "cumulative"] {
1177            assert!(validate_semantic_option(SEMANTIC_METRIC_TEMPORALITY, value));
1178        }
1179    }
1180
1181    #[test]
1182    fn test_record_monotonic_sum() {
1183        let metric = Metric {
1184            name: "claude_code.cost.usage".to_string(),
1185            unit: "USD".to_string(),
1186            data: Some(metric::Data::Sum(Sum {
1187                aggregation_temporality: AggregationTemporality::Delta as i32,
1188                is_monotonic: true,
1189                ..Default::default()
1190            })),
1191            ..Default::default()
1192        };
1193        let index = record(
1194            &metric,
1195            MetricType::MonotonicSum,
1196            "claude_code_cost_usage_USD_total",
1197        );
1198        let decoded = decode(&index);
1199        let t = &decoded["claude_code_cost_usage_USD_total"];
1200
1201        assert_eq!(
1202            t.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
1203            Some("counter")
1204        );
1205        assert_eq!(
1206            t.get(SEMANTIC_METRIC_TEMPORALITY).map(String::as_str),
1207            Some("delta")
1208        );
1209        assert_eq!(t.get(SEMANTIC_METRIC_UNIT).map(String::as_str), Some("USD"));
1210        assert_eq!(
1211            t.get(SEMANTIC_METRIC_ORIGINAL_NAME).map(String::as_str),
1212            Some("claude_code.cost.usage")
1213        );
1214        assert_eq!(
1215            t.get(SEMANTIC_METRIC_METADATA_QUALITY).map(String::as_str),
1216            Some("declared")
1217        );
1218    }
1219
1220    #[test]
1221    fn test_record_non_monotonic_sum() {
1222        let metric = Metric {
1223            name: "queue_size".to_string(),
1224            data: Some(metric::Data::Sum(Sum {
1225                aggregation_temporality: AggregationTemporality::Cumulative as i32,
1226                is_monotonic: false,
1227                ..Default::default()
1228            })),
1229            ..Default::default()
1230        };
1231        let index = record(&metric, MetricType::NonMonotonicSum, "queue_size");
1232        let decoded = decode(&index);
1233        let t = &decoded["queue_size"];
1234        assert_eq!(
1235            t.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
1236            Some("updown_counter")
1237        );
1238        assert_eq!(
1239            t.get(SEMANTIC_METRIC_TEMPORALITY).map(String::as_str),
1240            Some("cumulative")
1241        );
1242        // Name unchanged by translation -> no original_name.
1243        assert_eq!(t.get(SEMANTIC_METRIC_ORIGINAL_NAME), None);
1244    }
1245
1246    #[test]
1247    fn test_record_gauge_has_no_temporality() {
1248        let metric = Metric {
1249            name: "temperature".to_string(),
1250            data: Some(metric::Data::Gauge(Gauge::default())),
1251            ..Default::default()
1252        };
1253        let index = record(&metric, MetricType::Gauge, "temperature");
1254        let decoded = decode(&index);
1255        let t = &decoded["temperature"];
1256        assert_eq!(
1257            t.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
1258            Some("gauge")
1259        );
1260        assert_eq!(t.get(SEMANTIC_METRIC_TEMPORALITY), None);
1261    }
1262
1263    #[test]
1264    fn test_record_histogram_fans_out_with_distinct_types() {
1265        let metric = Metric {
1266            name: "request.duration".to_string(),
1267            unit: "s".to_string(),
1268            data: Some(metric::Data::Histogram(Histogram {
1269                aggregation_temporality: AggregationTemporality::Cumulative as i32,
1270                ..Default::default()
1271            })),
1272            ..Default::default()
1273        };
1274        let index = record(&metric, MetricType::Histogram, "request_duration");
1275        let decoded = decode(&index);
1276
1277        let bucket = &decoded["request_duration_bucket"];
1278        assert_eq!(
1279            bucket.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
1280            Some("histogram")
1281        );
1282        assert_eq!(
1283            bucket.get(SEMANTIC_METRIC_UNIT).map(String::as_str),
1284            Some("s")
1285        );
1286
1287        for companion in ["request_duration_sum", "request_duration_count"] {
1288            let t = &decoded[companion];
1289            assert_eq!(
1290                t.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
1291                Some("counter")
1292            );
1293            assert_eq!(
1294                t.get(SEMANTIC_METRIC_TEMPORALITY).map(String::as_str),
1295                Some("cumulative")
1296            );
1297        }
1298    }
1299
1300    #[test]
1301    fn test_record_summary_fans_out() {
1302        let metric = Metric {
1303            name: "rpc.latency".to_string(),
1304            data: Some(metric::Data::Summary(Summary::default())),
1305            ..Default::default()
1306        };
1307        let index = record(&metric, MetricType::Summary, "rpc_latency");
1308        let decoded = decode(&index);
1309
1310        assert_eq!(
1311            decoded["rpc_latency"]
1312                .get(SEMANTIC_METRIC_TYPE)
1313                .map(String::as_str),
1314            Some("summary")
1315        );
1316        // Summary has no temporality.
1317        assert_eq!(
1318            decoded["rpc_latency"].get(SEMANTIC_METRIC_TEMPORALITY),
1319            None
1320        );
1321        for companion in ["rpc_latency_count", "rpc_latency_sum"] {
1322            assert_eq!(
1323                decoded[companion]
1324                    .get(SEMANTIC_METRIC_TYPE)
1325                    .map(String::as_str),
1326                Some("counter")
1327            );
1328        }
1329    }
1330}