servers/otlp/
metrics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use ahash::{HashMap, HashSet};
16use api::v1::{RowInsertRequests, Value};
17use common_grpc::precision::Precision;
18use common_query::prelude::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
19use lazy_static::lazy_static;
20use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
21use otel_arrow_rust::proto::opentelemetry::common::v1::{AnyValue, KeyValue, any_value};
22use otel_arrow_rust::proto::opentelemetry::metrics::v1::{metric, number_data_point, *};
23use regex::Regex;
24use session::protocol_ctx::{MetricType, OtlpMetricCtx};
25
26use crate::error::Result;
27use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME};
28use crate::row_writer::{self, MultiTableData, TableData};
29
30/// the default column count for table writer
31const APPROXIMATE_COLUMN_COUNT: usize = 8;
32
33const COUNT_TABLE_SUFFIX: &str = "_count";
34const SUM_TABLE_SUFFIX: &str = "_sum";
35
36const JOB_KEY: &str = "job";
37const INSTANCE_KEY: &str = "instance";
38
39const UNDERSCORE: &str = "_";
40const DOUBLE_UNDERSCORE: &str = "__";
41const TOTAL: &str = "total";
42const RATIO: &str = "ratio";
43
44// see: https://prometheus.io/docs/guides/opentelemetry/#promoting-resource-attributes
45const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [
46    "service.instance.id",
47    "service.name",
48    "service.namespace",
49    "service.version",
50    "cloud.availability_zone",
51    "cloud.region",
52    "container.name",
53    "deployment.environment",
54    "deployment.environment.name",
55    "k8s.cluster.name",
56    "k8s.container.name",
57    "k8s.cronjob.name",
58    "k8s.daemonset.name",
59    "k8s.deployment.name",
60    "k8s.job.name",
61    "k8s.namespace.name",
62    "k8s.pod.name",
63    "k8s.replicaset.name",
64    "k8s.statefulset.name",
65];
66
67lazy_static! {
68    static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet<String> =
69        HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string()));
70    static ref NON_ALPHA_NUM_CHAR: Regex = Regex::new(r"[^a-zA-Z0-9]").unwrap();
71    static ref UNIT_MAP: HashMap<String, String> = [
72        // Time
73        ("d", "days"),
74        ("h", "hours"),
75        ("min", "minutes"),
76        ("s", "seconds"),
77        ("ms", "milliseconds"),
78        ("us", "microseconds"),
79        ("ns", "nanoseconds"),
80        // Bytes
81        ("By", "bytes"),
82        ("KiBy", "kibibytes"),
83        ("MiBy", "mebibytes"),
84        ("GiBy", "gibibytes"),
85        ("TiBy", "tibibytes"),
86        ("KBy", "kilobytes"),
87        ("MBy", "megabytes"),
88        ("GBy", "gigabytes"),
89        ("TBy", "terabytes"),
90        // SI
91        ("m", "meters"),
92        ("V", "volts"),
93        ("A", "amperes"),
94        ("J", "joules"),
95        ("W", "watts"),
96        ("g", "grams"),
97        // Misc
98        ("Cel", "celsius"),
99        ("Hz", "hertz"),
100        ("1", ""),
101        ("%", "percent"),
102    ].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect();
103    static ref PER_UNIT_MAP: HashMap<String, String> = [
104        ("s", "second"),
105        ("m", "minute"),
106        ("h", "hour"),
107        ("d", "day"),
108        ("w", "week"),
109        ("mo", "month"),
110        ("y", "year"),
111    ].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect();
112}
113
114const OTEL_SCOPE_NAME: &str = "name";
115const OTEL_SCOPE_VERSION: &str = "version";
116const OTEL_SCOPE_SCHEMA_URL: &str = "schema_url";
117
118/// Convert OpenTelemetry metrics to GreptimeDB insert requests
119///
120/// See
121/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto>
122/// for data structure of OTLP metrics.
123///
124/// Returns `InsertRequests` and total number of rows to ingest
125pub fn to_grpc_insert_requests(
126    request: ExportMetricsServiceRequest,
127    metric_ctx: &mut OtlpMetricCtx,
128) -> Result<(RowInsertRequests, usize)> {
129    let mut table_writer = MultiTableData::default();
130
131    for resource in &request.resource_metrics {
132        let resource_attrs = resource.resource.as_ref().map(|r| {
133            let mut attrs = r.attributes.clone();
134            process_resource_attrs(&mut attrs, metric_ctx);
135            attrs
136        });
137
138        for scope in &resource.scope_metrics {
139            let scope_attrs = process_scope_attrs(scope, metric_ctx);
140
141            for metric in &scope.metrics {
142                if metric.data.is_none() {
143                    continue;
144                }
145                if let Some(t) = metric.data.as_ref().map(from_metric_type) {
146                    metric_ctx.set_metric_type(t);
147                }
148
149                encode_metrics(
150                    &mut table_writer,
151                    metric,
152                    resource_attrs.as_ref(),
153                    scope_attrs.as_ref(),
154                    metric_ctx,
155                )?;
156            }
157        }
158    }
159
160    Ok(table_writer.into_row_insert_requests())
161}
162
163fn from_metric_type(data: &metric::Data) -> MetricType {
164    match data {
165        metric::Data::Gauge(_) => MetricType::Gauge,
166        metric::Data::Sum(s) => {
167            if s.is_monotonic {
168                MetricType::MonotonicSum
169            } else {
170                MetricType::NonMonotonicSum
171            }
172        }
173        metric::Data::Histogram(_) => MetricType::Histogram,
174        metric::Data::ExponentialHistogram(_) => MetricType::ExponentialHistogram,
175        metric::Data::Summary(_) => MetricType::Summary,
176    }
177}
178
179fn process_resource_attrs(attrs: &mut Vec<KeyValue>, metric_ctx: &OtlpMetricCtx) {
180    if metric_ctx.is_legacy {
181        return;
182    }
183
184    // remap service.name and service.instance.id to job and instance
185    let mut tmp = Vec::with_capacity(2);
186    for kv in attrs.iter() {
187        match &kv.key as &str {
188            KEY_SERVICE_NAME => {
189                tmp.push(KeyValue {
190                    key: JOB_KEY.to_string(),
191                    value: kv.value.clone(),
192                });
193            }
194            KEY_SERVICE_INSTANCE_ID => {
195                tmp.push(KeyValue {
196                    key: INSTANCE_KEY.to_string(),
197                    value: kv.value.clone(),
198                });
199            }
200            _ => {}
201        }
202    }
203
204    // if promote all, then exclude the list, else, include the list
205    if metric_ctx.promote_all_resource_attrs {
206        attrs.retain(|kv| !metric_ctx.resource_attrs.contains(&kv.key));
207    } else {
208        attrs.retain(|kv| {
209            metric_ctx.resource_attrs.contains(&kv.key)
210                || DEFAULT_PROMOTE_ATTRS_SET.contains(&kv.key)
211        });
212    }
213
214    attrs.extend(tmp);
215}
216
217fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Option<Vec<KeyValue>> {
218    if metric_ctx.is_legacy {
219        return scope.scope.as_ref().map(|s| s.attributes.clone());
220    };
221
222    if !metric_ctx.promote_scope_attrs {
223        return None;
224    }
225
226    // persist scope attrs with name, version and schema_url
227    scope.scope.as_ref().map(|s| {
228        let mut attrs = s.attributes.clone();
229        attrs.push(KeyValue {
230            key: OTEL_SCOPE_NAME.to_string(),
231            value: Some(AnyValue {
232                value: Some(any_value::Value::StringValue(s.name.clone())),
233            }),
234        });
235        attrs.push(KeyValue {
236            key: OTEL_SCOPE_VERSION.to_string(),
237            value: Some(AnyValue {
238                value: Some(any_value::Value::StringValue(s.version.clone())),
239            }),
240        });
241        attrs.push(KeyValue {
242            key: OTEL_SCOPE_SCHEMA_URL.to_string(),
243            value: Some(AnyValue {
244                value: Some(any_value::Value::StringValue(scope.schema_url.clone())),
245            }),
246        });
247        attrs
248    })
249}
250
251// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_name.go#L55
252pub fn normalize_metric_name(metric: &Metric, metric_type: &MetricType) -> String {
253    // Split metric name in "tokens" (remove all non-alphanumeric), filtering out empty strings
254    let mut name_tokens: Vec<String> = NON_ALPHA_NUM_CHAR
255        .split(&metric.name)
256        .filter_map(|s| {
257            let trimmed = s.trim();
258            if trimmed.is_empty() {
259                None
260            } else {
261                Some(trimmed.to_string())
262            }
263        })
264        .collect();
265
266    // Append unit if it exists
267    if !metric.unit.is_empty() {
268        let (main, per) = build_unit_suffix(&metric.unit);
269        if let Some(main) = main
270            && !name_tokens.contains(&main)
271        {
272            name_tokens.push(main);
273        }
274        if let Some(per) = per
275            && !name_tokens.contains(&per)
276        {
277            name_tokens.push("per".to_string());
278            name_tokens.push(per);
279        }
280    }
281
282    // Append _total for Counters (monotonic sums)
283    if matches!(metric_type, MetricType::MonotonicSum) {
284        // Remove existing "total" tokens first, then append
285        name_tokens.retain(|t| t != TOTAL);
286        name_tokens.push(TOTAL.to_string());
287    }
288
289    // Append _ratio for metrics with unit "1" (gauges only)
290    if metric.unit == "1" && matches!(metric_type, MetricType::Gauge) {
291        // Remove existing "ratio" tokens first, then append
292        name_tokens.retain(|t| t != RATIO);
293        name_tokens.push(RATIO.to_string());
294    }
295
296    // Build the string from the tokens, separated with underscores
297    let name = name_tokens.join(UNDERSCORE);
298
299    // Metric name cannot start with a digit, so prefix it with "_" in this case
300    if let Some((_, first)) = name.char_indices().next()
301        && first.is_ascii_digit()
302    {
303        format!("_{}", name)
304    } else {
305        name
306    }
307}
308
309fn build_unit_suffix(unit: &str) -> (Option<String>, Option<String>) {
310    let (main, per) = unit.split_once('/').unwrap_or((unit, ""));
311    (check_unit(main, &UNIT_MAP), check_unit(per, &PER_UNIT_MAP))
312}
313
314fn check_unit(unit_str: &str, unit_map: &HashMap<String, String>) -> Option<String> {
315    let u = unit_str.trim();
316    // Skip units that are empty, contain "{" or "}" characters
317    if !u.is_empty() && !u.contains('{') && !u.contains('}') {
318        let u = unit_map.get(u).map(|s| s.as_ref()).unwrap_or(u);
319        let u = clean_unit_name(u);
320        if !u.is_empty() {
321            return Some(u);
322        }
323    }
324    None
325}
326
327fn clean_unit_name(name: &str) -> String {
328    // Split on non-alphanumeric characters, filter out empty strings, then join with underscores
329    // This matches the Go implementation: strings.FieldsFunc + strings.Join
330    NON_ALPHA_NUM_CHAR
331        .split(name)
332        .filter(|s| !s.is_empty())
333        .collect::<Vec<&str>>()
334        .join(UNDERSCORE)
335}
336
337// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_label.go#L27
338pub fn normalize_label_name(name: &str) -> String {
339    if name.is_empty() {
340        return name.to_string();
341    }
342
343    let n = NON_ALPHA_NUM_CHAR.replace_all(name, UNDERSCORE);
344    if let Some((_, first)) = n.char_indices().next()
345        && first.is_ascii_digit()
346    {
347        return format!("key_{}", n);
348    }
349    if n.starts_with(UNDERSCORE) && !n.starts_with(DOUBLE_UNDERSCORE) {
350        return format!("key{}", n);
351    }
352    n.to_string()
353}
354
355/// Normalize otlp instrumentation, metric and attribute names
356///
357/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-name-syntax>
358/// - since the name are case-insensitive, we transform them to lowercase for
359///   better sql usability
360/// - replace `.` and `-` with `_`
361pub fn legacy_normalize_otlp_name(name: &str) -> String {
362    name.to_lowercase().replace(['.', '-'], "_")
363}
364
365fn encode_metrics(
366    table_writer: &mut MultiTableData,
367    metric: &Metric,
368    resource_attrs: Option<&Vec<KeyValue>>,
369    scope_attrs: Option<&Vec<KeyValue>>,
370    metric_ctx: &OtlpMetricCtx,
371) -> Result<()> {
372    let name = if metric_ctx.is_legacy {
373        legacy_normalize_otlp_name(&metric.name)
374    } else {
375        normalize_metric_name(metric, &metric_ctx.metric_type)
376    };
377
378    // note that we don't store description or unit, we might want to deal with
379    // these fields in the future.
380    if let Some(data) = &metric.data {
381        match data {
382            metric::Data::Gauge(gauge) => {
383                encode_gauge(
384                    table_writer,
385                    &name,
386                    gauge,
387                    resource_attrs,
388                    scope_attrs,
389                    metric_ctx,
390                )?;
391            }
392            metric::Data::Sum(sum) => {
393                encode_sum(
394                    table_writer,
395                    &name,
396                    sum,
397                    resource_attrs,
398                    scope_attrs,
399                    metric_ctx,
400                )?;
401            }
402            metric::Data::Summary(summary) => {
403                encode_summary(
404                    table_writer,
405                    &name,
406                    summary,
407                    resource_attrs,
408                    scope_attrs,
409                    metric_ctx,
410                )?;
411            }
412            metric::Data::Histogram(hist) => {
413                encode_histogram(
414                    table_writer,
415                    &name,
416                    hist,
417                    resource_attrs,
418                    scope_attrs,
419                    metric_ctx,
420                )?;
421            }
422            // TODO(sunng87) leave ExponentialHistogram for next release
423            metric::Data::ExponentialHistogram(_hist) => {}
424        }
425    }
426
427    Ok(())
428}
429
430#[derive(Debug, Clone, Copy, PartialEq, Eq)]
431enum AttributeType {
432    Resource,
433    Scope,
434    DataPoint,
435    Legacy,
436}
437
438fn write_attributes(
439    writer: &mut TableData,
440    row: &mut Vec<Value>,
441    attrs: Option<&Vec<KeyValue>>,
442    attribute_type: AttributeType,
443) -> Result<()> {
444    let Some(attrs) = attrs else {
445        return Ok(());
446    };
447
448    let tags = attrs.iter().filter_map(|attr| {
449        attr.value
450            .as_ref()
451            .and_then(|v| v.value.as_ref())
452            .and_then(|val| {
453                let key = match attribute_type {
454                    AttributeType::Resource | AttributeType::DataPoint => {
455                        normalize_label_name(&attr.key)
456                    }
457                    AttributeType::Scope => {
458                        format!("otel_scope_{}", normalize_label_name(&attr.key))
459                    }
460                    AttributeType::Legacy => legacy_normalize_otlp_name(&attr.key),
461                };
462                match val {
463                    any_value::Value::StringValue(s) => Some((key, s.clone())),
464                    any_value::Value::IntValue(v) => Some((key, v.to_string())),
465                    any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
466                    _ => None, // TODO(sunng87): allow different type of values
467                }
468            })
469    });
470    row_writer::write_tags(writer, tags, row)?;
471
472    Ok(())
473}
474
475fn write_timestamp(
476    table: &mut TableData,
477    row: &mut Vec<Value>,
478    time_nano: i64,
479    legacy_mode: bool,
480) -> Result<()> {
481    if legacy_mode {
482        row_writer::write_ts_to_nanos(
483            table,
484            GREPTIME_TIMESTAMP,
485            Some(time_nano),
486            Precision::Nanosecond,
487            row,
488        )
489    } else {
490        row_writer::write_ts_to_millis(
491            table,
492            GREPTIME_TIMESTAMP,
493            Some(time_nano / 1000000),
494            Precision::Millisecond,
495            row,
496        )
497    }
498}
499
500fn write_data_point_value(
501    table: &mut TableData,
502    row: &mut Vec<Value>,
503    field: &str,
504    value: &Option<number_data_point::Value>,
505) -> Result<()> {
506    match value {
507        Some(number_data_point::Value::AsInt(val)) => {
508            // we coerce all values to f64
509            row_writer::write_f64(table, field, *val as f64, row)?;
510        }
511        Some(number_data_point::Value::AsDouble(val)) => {
512            row_writer::write_f64(table, field, *val, row)?;
513        }
514        _ => {}
515    }
516    Ok(())
517}
518
519fn write_tags_and_timestamp(
520    table: &mut TableData,
521    row: &mut Vec<Value>,
522    resource_attrs: Option<&Vec<KeyValue>>,
523    scope_attrs: Option<&Vec<KeyValue>>,
524    data_point_attrs: Option<&Vec<KeyValue>>,
525    timestamp_nanos: i64,
526    metric_ctx: &OtlpMetricCtx,
527) -> Result<()> {
528    if metric_ctx.is_legacy {
529        write_attributes(table, row, resource_attrs, AttributeType::Legacy)?;
530        write_attributes(table, row, scope_attrs, AttributeType::Legacy)?;
531        write_attributes(table, row, data_point_attrs, AttributeType::Legacy)?;
532    } else {
533        // TODO(shuiyisong): check `__type__` and `__unit__` tags in prometheus
534        write_attributes(table, row, resource_attrs, AttributeType::Resource)?;
535        write_attributes(table, row, scope_attrs, AttributeType::Scope)?;
536        write_attributes(table, row, data_point_attrs, AttributeType::DataPoint)?;
537    }
538
539    write_timestamp(table, row, timestamp_nanos, metric_ctx.is_legacy)?;
540
541    Ok(())
542}
543
544/// encode this gauge metric
545///
546/// note that there can be multiple data points in the request, it's going to be
547/// stored as multiple rows
548fn encode_gauge(
549    table_writer: &mut MultiTableData,
550    name: &str,
551    gauge: &Gauge,
552    resource_attrs: Option<&Vec<KeyValue>>,
553    scope_attrs: Option<&Vec<KeyValue>>,
554    metric_ctx: &OtlpMetricCtx,
555) -> Result<()> {
556    let table = table_writer.get_or_default_table_data(
557        name,
558        APPROXIMATE_COLUMN_COUNT,
559        gauge.data_points.len(),
560    );
561
562    for data_point in &gauge.data_points {
563        let mut row = table.alloc_one_row();
564        write_tags_and_timestamp(
565            table,
566            &mut row,
567            resource_attrs,
568            scope_attrs,
569            Some(data_point.attributes.as_ref()),
570            data_point.time_unix_nano as i64,
571            metric_ctx,
572        )?;
573
574        write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
575        table.add_row(row);
576    }
577
578    Ok(())
579}
580
581/// encode this sum metric
582///
583/// `aggregation_temporality` and `monotonic` are ignored for now
584fn encode_sum(
585    table_writer: &mut MultiTableData,
586    name: &str,
587    sum: &Sum,
588    resource_attrs: Option<&Vec<KeyValue>>,
589    scope_attrs: Option<&Vec<KeyValue>>,
590    metric_ctx: &OtlpMetricCtx,
591) -> Result<()> {
592    let table = table_writer.get_or_default_table_data(
593        name,
594        APPROXIMATE_COLUMN_COUNT,
595        sum.data_points.len(),
596    );
597
598    for data_point in &sum.data_points {
599        let mut row = table.alloc_one_row();
600        write_tags_and_timestamp(
601            table,
602            &mut row,
603            resource_attrs,
604            scope_attrs,
605            Some(data_point.attributes.as_ref()),
606            data_point.time_unix_nano as i64,
607            metric_ctx,
608        )?;
609        write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
610        table.add_row(row);
611    }
612
613    Ok(())
614}
615
616const HISTOGRAM_LE_COLUMN: &str = "le";
617
618/// Encode histogram data. This function returns 3 insert requests for 3 tables.
619///
620/// The implementation has been following Prometheus histogram table format:
621///
622/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper
623///   limit, and `greptime_value` for bucket count
624/// - A `%metric%_sum` table storing sum of samples
625/// -  A `%metric%_count` table storing count of samples.
626///
627/// By its Prometheus compatibility, we hope to be able to use prometheus
628/// quantile functions on this table.
629fn encode_histogram(
630    table_writer: &mut MultiTableData,
631    name: &str,
632    hist: &Histogram,
633    resource_attrs: Option<&Vec<KeyValue>>,
634    scope_attrs: Option<&Vec<KeyValue>>,
635    metric_ctx: &OtlpMetricCtx,
636) -> Result<()> {
637    let normalized_name = name;
638
639    let bucket_table_name = format!("{}_bucket", normalized_name);
640    let sum_table_name = format!("{}_sum", normalized_name);
641    let count_table_name = format!("{}_count", normalized_name);
642
643    let data_points_len = hist.data_points.len();
644    // Note that the row and columns number here is approximate
645    let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
646    let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
647    let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
648
649    for data_point in &hist.data_points {
650        let mut accumulated_count = 0;
651        for (idx, count) in data_point.bucket_counts.iter().enumerate() {
652            let mut bucket_row = bucket_table.alloc_one_row();
653            write_tags_and_timestamp(
654                &mut bucket_table,
655                &mut bucket_row,
656                resource_attrs,
657                scope_attrs,
658                Some(data_point.attributes.as_ref()),
659                data_point.time_unix_nano as i64,
660                metric_ctx,
661            )?;
662
663            if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
664                row_writer::write_tag(
665                    &mut bucket_table,
666                    HISTOGRAM_LE_COLUMN,
667                    upper_bounds,
668                    &mut bucket_row,
669                )?;
670            } else if idx == data_point.explicit_bounds.len() {
671                // The last bucket
672                row_writer::write_tag(
673                    &mut bucket_table,
674                    HISTOGRAM_LE_COLUMN,
675                    f64::INFINITY,
676                    &mut bucket_row,
677                )?;
678            }
679
680            accumulated_count += count;
681            row_writer::write_f64(
682                &mut bucket_table,
683                GREPTIME_VALUE,
684                accumulated_count as f64,
685                &mut bucket_row,
686            )?;
687
688            bucket_table.add_row(bucket_row);
689        }
690
691        if let Some(sum) = data_point.sum {
692            let mut sum_row = sum_table.alloc_one_row();
693            write_tags_and_timestamp(
694                &mut sum_table,
695                &mut sum_row,
696                resource_attrs,
697                scope_attrs,
698                Some(data_point.attributes.as_ref()),
699                data_point.time_unix_nano as i64,
700                metric_ctx,
701            )?;
702
703            row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?;
704            sum_table.add_row(sum_row);
705        }
706
707        let mut count_row = count_table.alloc_one_row();
708        write_tags_and_timestamp(
709            &mut count_table,
710            &mut count_row,
711            resource_attrs,
712            scope_attrs,
713            Some(data_point.attributes.as_ref()),
714            data_point.time_unix_nano as i64,
715            metric_ctx,
716        )?;
717
718        row_writer::write_f64(
719            &mut count_table,
720            GREPTIME_VALUE,
721            data_point.count as f64,
722            &mut count_row,
723        )?;
724        count_table.add_row(count_row);
725    }
726
727    table_writer.add_table_data(bucket_table_name, bucket_table);
728    table_writer.add_table_data(sum_table_name, sum_table);
729    table_writer.add_table_data(count_table_name, count_table);
730
731    Ok(())
732}
733
734#[allow(dead_code)]
735fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
736    // TODO(sunng87): implement this using a prometheus compatible way
737    Ok(())
738}
739
740fn encode_summary(
741    table_writer: &mut MultiTableData,
742    name: &str,
743    summary: &Summary,
744    resource_attrs: Option<&Vec<KeyValue>>,
745    scope_attrs: Option<&Vec<KeyValue>>,
746    metric_ctx: &OtlpMetricCtx,
747) -> Result<()> {
748    if metric_ctx.is_legacy {
749        let table = table_writer.get_or_default_table_data(
750            name,
751            APPROXIMATE_COLUMN_COUNT,
752            summary.data_points.len(),
753        );
754
755        for data_point in &summary.data_points {
756            let mut row = table.alloc_one_row();
757            write_tags_and_timestamp(
758                table,
759                &mut row,
760                resource_attrs,
761                scope_attrs,
762                Some(data_point.attributes.as_ref()),
763                data_point.time_unix_nano as i64,
764                metric_ctx,
765            )?;
766
767            for quantile in &data_point.quantile_values {
768                row_writer::write_f64(
769                    table,
770                    format!("greptime_p{:02}", quantile.quantile * 100f64),
771                    quantile.value,
772                    &mut row,
773                )?;
774            }
775
776            row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
777            table.add_row(row);
778        }
779    } else {
780        // 1. quantile table
781        // 2. count table
782        // 3. sum table
783
784        let metric_name = name;
785        let count_name = format!("{}{}", metric_name, COUNT_TABLE_SUFFIX);
786        let sum_name = format!("{}{}", metric_name, SUM_TABLE_SUFFIX);
787
788        for data_point in &summary.data_points {
789            {
790                let quantile_table = table_writer.get_or_default_table_data(
791                    metric_name,
792                    APPROXIMATE_COLUMN_COUNT,
793                    summary.data_points.len(),
794                );
795
796                for quantile in &data_point.quantile_values {
797                    let mut row = quantile_table.alloc_one_row();
798                    write_tags_and_timestamp(
799                        quantile_table,
800                        &mut row,
801                        resource_attrs,
802                        scope_attrs,
803                        Some(data_point.attributes.as_ref()),
804                        data_point.time_unix_nano as i64,
805                        metric_ctx,
806                    )?;
807                    row_writer::write_tag(quantile_table, "quantile", quantile.quantile, &mut row)?;
808                    row_writer::write_f64(
809                        quantile_table,
810                        GREPTIME_VALUE,
811                        quantile.value,
812                        &mut row,
813                    )?;
814                    quantile_table.add_row(row);
815                }
816            }
817            {
818                let count_table = table_writer.get_or_default_table_data(
819                    &count_name,
820                    APPROXIMATE_COLUMN_COUNT,
821                    summary.data_points.len(),
822                );
823                let mut row = count_table.alloc_one_row();
824                write_tags_and_timestamp(
825                    count_table,
826                    &mut row,
827                    resource_attrs,
828                    scope_attrs,
829                    Some(data_point.attributes.as_ref()),
830                    data_point.time_unix_nano as i64,
831                    metric_ctx,
832                )?;
833
834                row_writer::write_f64(
835                    count_table,
836                    GREPTIME_VALUE,
837                    data_point.count as f64,
838                    &mut row,
839                )?;
840
841                count_table.add_row(row);
842            }
843            {
844                let sum_table = table_writer.get_or_default_table_data(
845                    &sum_name,
846                    APPROXIMATE_COLUMN_COUNT,
847                    summary.data_points.len(),
848                );
849
850                let mut row = sum_table.alloc_one_row();
851                write_tags_and_timestamp(
852                    sum_table,
853                    &mut row,
854                    resource_attrs,
855                    scope_attrs,
856                    Some(data_point.attributes.as_ref()),
857                    data_point.time_unix_nano as i64,
858                    metric_ctx,
859                )?;
860
861                row_writer::write_f64(sum_table, GREPTIME_VALUE, data_point.sum, &mut row)?;
862
863                sum_table.add_row(row);
864            }
865        }
866    }
867
868    Ok(())
869}
870
871#[cfg(test)]
872mod tests {
873    use otel_arrow_rust::proto::opentelemetry::common::v1::AnyValue;
874    use otel_arrow_rust::proto::opentelemetry::common::v1::any_value::Value as Val;
875    use otel_arrow_rust::proto::opentelemetry::metrics::v1::number_data_point::Value;
876    use otel_arrow_rust::proto::opentelemetry::metrics::v1::summary_data_point::ValueAtQuantile;
877    use otel_arrow_rust::proto::opentelemetry::metrics::v1::{
878        AggregationTemporality, HistogramDataPoint, NumberDataPoint, SummaryDataPoint,
879    };
880
881    use super::*;
882
883    #[test]
884    fn test_legacy_normalize_otlp_name() {
885        assert_eq!(
886            legacy_normalize_otlp_name("jvm.memory.free"),
887            "jvm_memory_free"
888        );
889        assert_eq!(
890            legacy_normalize_otlp_name("jvm-memory-free"),
891            "jvm_memory_free"
892        );
893        assert_eq!(
894            legacy_normalize_otlp_name("jvm_memory_free"),
895            "jvm_memory_free"
896        );
897        assert_eq!(
898            legacy_normalize_otlp_name("JVM_MEMORY_FREE"),
899            "jvm_memory_free"
900        );
901        assert_eq!(
902            legacy_normalize_otlp_name("JVM_memory_FREE"),
903            "jvm_memory_free"
904        );
905    }
906
907    #[test]
908    fn test_normalize_metric_name() {
909        let test_cases = vec![
910            // Default case
911            (Metric::default(), MetricType::Init, ""),
912            // Basic metric with just name
913            (
914                Metric {
915                    name: "foo".to_string(),
916                    ..Default::default()
917                },
918                MetricType::Init,
919                "foo",
920            ),
921            // Metric with unit "s" should append "seconds"
922            (
923                Metric {
924                    name: "foo".to_string(),
925                    unit: "s".to_string(),
926                    ..Default::default()
927                },
928                MetricType::Init,
929                "foo_seconds",
930            ),
931            // Metric already ending with unit suffix should not duplicate
932            (
933                Metric {
934                    name: "foo_seconds".to_string(),
935                    unit: "s".to_string(),
936                    ..Default::default()
937                },
938                MetricType::Init,
939                "foo_seconds",
940            ),
941            // Monotonic sum should append "total"
942            (
943                Metric {
944                    name: "foo".to_string(),
945                    ..Default::default()
946                },
947                MetricType::MonotonicSum,
948                "foo_total",
949            ),
950            // Metric already ending with "total" should not duplicate
951            (
952                Metric {
953                    name: "foo_total".to_string(),
954                    ..Default::default()
955                },
956                MetricType::MonotonicSum,
957                "foo_total",
958            ),
959            // Monotonic sum with unit should append both unit and "total"
960            (
961                Metric {
962                    name: "foo".to_string(),
963                    unit: "s".to_string(),
964                    ..Default::default()
965                },
966                MetricType::MonotonicSum,
967                "foo_seconds_total",
968            ),
969            // Metric with unit suffix and monotonic sum
970            (
971                Metric {
972                    name: "foo_seconds".to_string(),
973                    unit: "s".to_string(),
974                    ..Default::default()
975                },
976                MetricType::MonotonicSum,
977                "foo_seconds_total",
978            ),
979            // Metric already ending with "total" and has unit
980            (
981                Metric {
982                    name: "foo_total".to_string(),
983                    unit: "s".to_string(),
984                    ..Default::default()
985                },
986                MetricType::MonotonicSum,
987                "foo_seconds_total",
988            ),
989            // Metric already ending with both unit and "total"
990            (
991                Metric {
992                    name: "foo_seconds_total".to_string(),
993                    unit: "s".to_string(),
994                    ..Default::default()
995                },
996                MetricType::MonotonicSum,
997                "foo_seconds_total",
998            ),
999            // Metric with unusual order (total_seconds) should be normalized
1000            (
1001                Metric {
1002                    name: "foo_total_seconds".to_string(),
1003                    unit: "s".to_string(),
1004                    ..Default::default()
1005                },
1006                MetricType::MonotonicSum,
1007                "foo_seconds_total",
1008            ),
1009            // Gauge with unit "1" should append "ratio"
1010            (
1011                Metric {
1012                    name: "foo".to_string(),
1013                    unit: "1".to_string(),
1014                    ..Default::default()
1015                },
1016                MetricType::Gauge,
1017                "foo_ratio",
1018            ),
1019            // Complex unit like "m/s" should be converted to "meters_per_second"
1020            (
1021                Metric {
1022                    name: "foo".to_string(),
1023                    unit: "m/s".to_string(),
1024                    ..Default::default()
1025                },
1026                MetricType::Init,
1027                "foo_meters_per_second",
1028            ),
1029            // Metric with partial unit match
1030            (
1031                Metric {
1032                    name: "foo_second".to_string(),
1033                    unit: "m/s".to_string(),
1034                    ..Default::default()
1035                },
1036                MetricType::Init,
1037                "foo_second_meters",
1038            ),
1039            // Metric already containing the main unit
1040            (
1041                Metric {
1042                    name: "foo_meters".to_string(),
1043                    unit: "m/s".to_string(),
1044                    ..Default::default()
1045                },
1046                MetricType::Init,
1047                "foo_meters_per_second",
1048            ),
1049        ];
1050
1051        for (metric, metric_type, expected) in test_cases {
1052            let result = normalize_metric_name(&metric, &metric_type);
1053            assert_eq!(
1054                result, expected,
1055                "Failed for metric name: '{}', unit: '{}', type: {:?}",
1056                metric.name, metric.unit, metric_type
1057            );
1058        }
1059    }
1060
1061    #[test]
1062    fn test_normalize_metric_name_edge_cases() {
1063        let test_cases = vec![
1064            // Edge case: name with multiple non-alphanumeric chars in a row
1065            (
1066                Metric {
1067                    name: "foo--bar__baz".to_string(),
1068                    ..Default::default()
1069                },
1070                MetricType::Init,
1071                "foo_bar_baz",
1072            ),
1073            // Edge case: name starting and ending with non-alphanumeric
1074            (
1075                Metric {
1076                    name: "-foo_bar-".to_string(),
1077                    ..Default::default()
1078                },
1079                MetricType::Init,
1080                "foo_bar",
1081            ),
1082            // Edge case: name with only special chars (should be empty)
1083            (
1084                Metric {
1085                    name: "--___--".to_string(),
1086                    ..Default::default()
1087                },
1088                MetricType::Init,
1089                "",
1090            ),
1091            // Edge case: name starting with digit
1092            (
1093                Metric {
1094                    name: "2xx_requests".to_string(),
1095                    ..Default::default()
1096                },
1097                MetricType::Init,
1098                "_2xx_requests",
1099            ),
1100        ];
1101
1102        for (metric, metric_type, expected) in test_cases {
1103            let result = normalize_metric_name(&metric, &metric_type);
1104            assert_eq!(
1105                result, expected,
1106                "Failed for metric name: '{}', unit: '{}', type: {:?}",
1107                metric.name, metric.unit, metric_type
1108            );
1109        }
1110    }
1111
1112    #[test]
1113    fn test_normalize_label_name() {
1114        let test_cases = vec![
1115            ("", ""),
1116            ("foo", "foo"),
1117            ("foo_bar/baz:abc", "foo_bar_baz_abc"),
1118            ("1foo", "key_1foo"),
1119            ("_foo", "key_foo"),
1120            ("__bar", "__bar"),
1121        ];
1122
1123        for (input, expected) in test_cases {
1124            let result = normalize_label_name(input);
1125            assert_eq!(
1126                result, expected,
1127                "unexpected result for input '{}'; got '{}'; want '{}'",
1128                input, result, expected
1129            );
1130        }
1131    }
1132
1133    #[test]
1134    fn test_clean_unit_name() {
1135        // Test the improved clean_unit_name function
1136        assert_eq!(clean_unit_name("faults"), "faults");
1137        assert_eq!(clean_unit_name("{faults}"), "faults"); // clean_unit_name still processes braces internally
1138        assert_eq!(clean_unit_name("req/sec"), "req_sec");
1139        assert_eq!(clean_unit_name("m/s"), "m_s");
1140        assert_eq!(clean_unit_name("___test___"), "test");
1141        assert_eq!(
1142            clean_unit_name("multiple__underscores"),
1143            "multiple_underscores"
1144        );
1145        assert_eq!(clean_unit_name(""), "");
1146        assert_eq!(clean_unit_name("___"), "");
1147        assert_eq!(clean_unit_name("bytes.per.second"), "bytes_per_second");
1148    }
1149
1150    #[test]
1151    fn test_normalize_metric_name_braced_units() {
1152        // Test that units with braces are rejected (not processed)
1153        let test_cases = vec![
1154            (
1155                Metric {
1156                    name: "test.metric".to_string(),
1157                    unit: "{faults}".to_string(),
1158                    ..Default::default()
1159                },
1160                MetricType::MonotonicSum,
1161                "test_metric_total", // braced units are rejected, no unit suffix added
1162            ),
1163            (
1164                Metric {
1165                    name: "test.metric".to_string(),
1166                    unit: "{operations}".to_string(),
1167                    ..Default::default()
1168                },
1169                MetricType::Gauge,
1170                "test_metric", // braced units are rejected, no unit suffix added
1171            ),
1172            (
1173                Metric {
1174                    name: "test.metric".to_string(),
1175                    unit: "{}".to_string(), // empty braces should be ignored due to contains('{') || contains('}')
1176                    ..Default::default()
1177                },
1178                MetricType::Gauge,
1179                "test_metric",
1180            ),
1181            (
1182                Metric {
1183                    name: "test.metric".to_string(),
1184                    unit: "faults".to_string(), // no braces, should work normally
1185                    ..Default::default()
1186                },
1187                MetricType::Gauge,
1188                "test_metric_faults",
1189            ),
1190        ];
1191
1192        for (metric, metric_type, expected) in test_cases {
1193            let result = normalize_metric_name(&metric, &metric_type);
1194            assert_eq!(
1195                result, expected,
1196                "Failed for metric name: '{}', unit: '{}', type: {:?}. Got: '{}', Expected: '{}'",
1197                metric.name, metric.unit, metric_type, result, expected
1198            );
1199        }
1200    }
1201
1202    #[test]
1203    fn test_normalize_metric_name_with_testdata() {
1204        // Test cases extracted from real OTLP metrics data from testdata.txt
1205        let test_cases = vec![
1206            // Basic system metrics with various units
1207            (
1208                Metric {
1209                    name: "system.paging.faults".to_string(),
1210                    unit: "{faults}".to_string(),
1211                    ..Default::default()
1212                },
1213                MetricType::MonotonicSum,
1214                "system_paging_faults_total", // braced units are rejected, no unit suffix added
1215            ),
1216            (
1217                Metric {
1218                    name: "system.paging.operations".to_string(),
1219                    unit: "{operations}".to_string(),
1220                    ..Default::default()
1221                },
1222                MetricType::MonotonicSum,
1223                "system_paging_operations_total", // braced units are rejected, no unit suffix added
1224            ),
1225            (
1226                Metric {
1227                    name: "system.paging.usage".to_string(),
1228                    unit: "By".to_string(),
1229                    ..Default::default()
1230                },
1231                MetricType::NonMonotonicSum,
1232                "system_paging_usage_bytes",
1233            ),
1234            // Load average metrics - gauge with custom unit
1235            (
1236                Metric {
1237                    name: "system.cpu.load_average.15m".to_string(),
1238                    unit: "{thread}".to_string(),
1239                    ..Default::default()
1240                },
1241                MetricType::Gauge,
1242                "system_cpu_load_average_15m", // braced units are rejected, no unit suffix added
1243            ),
1244            (
1245                Metric {
1246                    name: "system.cpu.load_average.1m".to_string(),
1247                    unit: "{thread}".to_string(),
1248                    ..Default::default()
1249                },
1250                MetricType::Gauge,
1251                "system_cpu_load_average_1m", // braced units are rejected, no unit suffix added
1252            ),
1253            // Disk I/O with bytes unit
1254            (
1255                Metric {
1256                    name: "system.disk.io".to_string(),
1257                    unit: "By".to_string(),
1258                    ..Default::default()
1259                },
1260                MetricType::MonotonicSum,
1261                "system_disk_io_bytes_total",
1262            ),
1263            // Time-based metrics with seconds unit
1264            (
1265                Metric {
1266                    name: "system.disk.io_time".to_string(),
1267                    unit: "s".to_string(),
1268                    ..Default::default()
1269                },
1270                MetricType::MonotonicSum,
1271                "system_disk_io_time_seconds_total",
1272            ),
1273            (
1274                Metric {
1275                    name: "system.disk.operation_time".to_string(),
1276                    unit: "s".to_string(),
1277                    ..Default::default()
1278                },
1279                MetricType::MonotonicSum,
1280                "system_disk_operation_time_seconds_total",
1281            ),
1282            // CPU time metric
1283            (
1284                Metric {
1285                    name: "system.cpu.time".to_string(),
1286                    unit: "s".to_string(),
1287                    ..Default::default()
1288                },
1289                MetricType::MonotonicSum,
1290                "system_cpu_time_seconds_total",
1291            ),
1292            // Process counts
1293            (
1294                Metric {
1295                    name: "system.processes.count".to_string(),
1296                    unit: "{processes}".to_string(),
1297                    ..Default::default()
1298                },
1299                MetricType::NonMonotonicSum,
1300                "system_processes_count", // braced units are rejected, no unit suffix added
1301            ),
1302            (
1303                Metric {
1304                    name: "system.processes.created".to_string(),
1305                    unit: "{processes}".to_string(),
1306                    ..Default::default()
1307                },
1308                MetricType::MonotonicSum,
1309                "system_processes_created_total", // braced units are rejected, no unit suffix added
1310            ),
1311            // Memory usage with bytes
1312            (
1313                Metric {
1314                    name: "system.memory.usage".to_string(),
1315                    unit: "By".to_string(),
1316                    ..Default::default()
1317                },
1318                MetricType::NonMonotonicSum,
1319                "system_memory_usage_bytes",
1320            ),
1321            // Uptime as gauge
1322            (
1323                Metric {
1324                    name: "system.uptime".to_string(),
1325                    unit: "s".to_string(),
1326                    ..Default::default()
1327                },
1328                MetricType::Gauge,
1329                "system_uptime_seconds",
1330            ),
1331            // Network metrics
1332            (
1333                Metric {
1334                    name: "system.network.connections".to_string(),
1335                    unit: "{connections}".to_string(),
1336                    ..Default::default()
1337                },
1338                MetricType::NonMonotonicSum,
1339                "system_network_connections", // braced units are rejected, no unit suffix added
1340            ),
1341            (
1342                Metric {
1343                    name: "system.network.dropped".to_string(),
1344                    unit: "{packets}".to_string(),
1345                    ..Default::default()
1346                },
1347                MetricType::MonotonicSum,
1348                "system_network_dropped_total", // braced units are rejected, no unit suffix added
1349            ),
1350            (
1351                Metric {
1352                    name: "system.network.errors".to_string(),
1353                    unit: "{errors}".to_string(),
1354                    ..Default::default()
1355                },
1356                MetricType::MonotonicSum,
1357                "system_network_errors_total", // braced units are rejected, no unit suffix added
1358            ),
1359            (
1360                Metric {
1361                    name: "system.network.io".to_string(),
1362                    unit: "By".to_string(),
1363                    ..Default::default()
1364                },
1365                MetricType::MonotonicSum,
1366                "system_network_io_bytes_total",
1367            ),
1368            (
1369                Metric {
1370                    name: "system.network.packets".to_string(),
1371                    unit: "{packets}".to_string(),
1372                    ..Default::default()
1373                },
1374                MetricType::MonotonicSum,
1375                "system_network_packets_total", // braced units are rejected, no unit suffix added
1376            ),
1377            // Filesystem metrics
1378            (
1379                Metric {
1380                    name: "system.filesystem.inodes.usage".to_string(),
1381                    unit: "{inodes}".to_string(),
1382                    ..Default::default()
1383                },
1384                MetricType::NonMonotonicSum,
1385                "system_filesystem_inodes_usage", // braced units are rejected, no unit suffix added
1386            ),
1387            (
1388                Metric {
1389                    name: "system.filesystem.usage".to_string(),
1390                    unit: "By".to_string(),
1391                    ..Default::default()
1392                },
1393                MetricType::NonMonotonicSum,
1394                "system_filesystem_usage_bytes",
1395            ),
1396            // Edge cases with special characters and numbers
1397            (
1398                Metric {
1399                    name: "system.load.1".to_string(),
1400                    unit: "1".to_string(),
1401                    ..Default::default()
1402                },
1403                MetricType::Gauge,
1404                "system_load_1_ratio",
1405            ),
1406            (
1407                Metric {
1408                    name: "http.request.2xx".to_string(),
1409                    unit: "{requests}".to_string(),
1410                    ..Default::default()
1411                },
1412                MetricType::MonotonicSum,
1413                "http_request_2xx_total", // braced units are rejected, no unit suffix added
1414            ),
1415            // Metric with dots and underscores mixed
1416            (
1417                Metric {
1418                    name: "jvm.memory.heap_usage".to_string(),
1419                    unit: "By".to_string(),
1420                    ..Default::default()
1421                },
1422                MetricType::Gauge,
1423                "jvm_memory_heap_usage_bytes",
1424            ),
1425            // Complex unit with per-second
1426            (
1427                Metric {
1428                    name: "http.request.rate".to_string(),
1429                    unit: "1/s".to_string(),
1430                    ..Default::default()
1431                },
1432                MetricType::Gauge,
1433                "http_request_rate_per_second",
1434            ),
1435        ];
1436
1437        for (metric, metric_type, expected) in test_cases {
1438            let result = normalize_metric_name(&metric, &metric_type);
1439            assert_eq!(
1440                result, expected,
1441                "Failed for metric name: '{}', unit: '{}', type: {:?}. Got: '{}', Expected: '{}'",
1442                metric.name, metric.unit, metric_type, result, expected
1443            );
1444        }
1445    }
1446
1447    fn keyvalue(key: &str, value: &str) -> KeyValue {
1448        KeyValue {
1449            key: key.into(),
1450            value: Some(AnyValue {
1451                value: Some(Val::StringValue(value.into())),
1452            }),
1453        }
1454    }
1455
1456    #[test]
1457    fn test_encode_gauge() {
1458        let mut tables = MultiTableData::default();
1459
1460        let data_points = vec![
1461            NumberDataPoint {
1462                attributes: vec![keyvalue("host", "testsevrer")],
1463                time_unix_nano: 100,
1464                value: Some(Value::AsInt(100)),
1465                ..Default::default()
1466            },
1467            NumberDataPoint {
1468                attributes: vec![keyvalue("host", "testserver")],
1469                time_unix_nano: 105,
1470                value: Some(Value::AsInt(105)),
1471                ..Default::default()
1472            },
1473        ];
1474        let gauge = Gauge { data_points };
1475        encode_gauge(
1476            &mut tables,
1477            "datamon",
1478            &gauge,
1479            Some(&vec![]),
1480            Some(&vec![keyvalue("scope", "otel")]),
1481            &OtlpMetricCtx::default(),
1482        )
1483        .unwrap();
1484
1485        let table = tables.get_or_default_table_data("datamon", 0, 0);
1486        assert_eq!(table.num_rows(), 2);
1487        assert_eq!(table.num_columns(), 4);
1488        assert_eq!(
1489            table
1490                .columns()
1491                .iter()
1492                .map(|c| &c.column_name)
1493                .collect::<Vec<&String>>(),
1494            vec![
1495                "otel_scope_scope",
1496                "host",
1497                "greptime_timestamp",
1498                "greptime_value"
1499            ]
1500        );
1501    }
1502
1503    #[test]
1504    fn test_encode_sum() {
1505        let mut tables = MultiTableData::default();
1506
1507        let data_points = vec![
1508            NumberDataPoint {
1509                attributes: vec![keyvalue("host", "testserver")],
1510                time_unix_nano: 100,
1511                value: Some(Value::AsInt(100)),
1512                ..Default::default()
1513            },
1514            NumberDataPoint {
1515                attributes: vec![keyvalue("host", "testserver")],
1516                time_unix_nano: 105,
1517                value: Some(Value::AsInt(0)),
1518                ..Default::default()
1519            },
1520        ];
1521        let sum = Sum {
1522            data_points,
1523            ..Default::default()
1524        };
1525        encode_sum(
1526            &mut tables,
1527            "datamon",
1528            &sum,
1529            Some(&vec![]),
1530            Some(&vec![keyvalue("scope", "otel")]),
1531            &OtlpMetricCtx::default(),
1532        )
1533        .unwrap();
1534
1535        let table = tables.get_or_default_table_data("datamon", 0, 0);
1536        assert_eq!(table.num_rows(), 2);
1537        assert_eq!(table.num_columns(), 4);
1538        assert_eq!(
1539            table
1540                .columns()
1541                .iter()
1542                .map(|c| &c.column_name)
1543                .collect::<Vec<&String>>(),
1544            vec![
1545                "otel_scope_scope",
1546                "host",
1547                "greptime_timestamp",
1548                "greptime_value"
1549            ]
1550        );
1551    }
1552
1553    #[test]
1554    fn test_encode_summary() {
1555        let mut tables = MultiTableData::default();
1556
1557        let data_points = vec![SummaryDataPoint {
1558            attributes: vec![keyvalue("host", "testserver")],
1559            time_unix_nano: 100,
1560            count: 25,
1561            sum: 5400.0,
1562            quantile_values: vec![
1563                ValueAtQuantile {
1564                    quantile: 0.90,
1565                    value: 1000.0,
1566                },
1567                ValueAtQuantile {
1568                    quantile: 0.95,
1569                    value: 3030.0,
1570                },
1571            ],
1572            ..Default::default()
1573        }];
1574        let summary = Summary { data_points };
1575        encode_summary(
1576            &mut tables,
1577            "datamon",
1578            &summary,
1579            Some(&vec![]),
1580            Some(&vec![keyvalue("scope", "otel")]),
1581            &OtlpMetricCtx::default(),
1582        )
1583        .unwrap();
1584
1585        let table = tables.get_or_default_table_data("datamon", 0, 0);
1586        assert_eq!(table.num_rows(), 2);
1587        assert_eq!(table.num_columns(), 5);
1588        assert_eq!(
1589            table
1590                .columns()
1591                .iter()
1592                .map(|c| &c.column_name)
1593                .collect::<Vec<&String>>(),
1594            vec![
1595                "otel_scope_scope",
1596                "host",
1597                "greptime_timestamp",
1598                "quantile",
1599                "greptime_value"
1600            ]
1601        );
1602
1603        let table = tables.get_or_default_table_data("datamon_count", 0, 0);
1604        assert_eq!(table.num_rows(), 1);
1605        assert_eq!(table.num_columns(), 4);
1606        assert_eq!(
1607            table
1608                .columns()
1609                .iter()
1610                .map(|c| &c.column_name)
1611                .collect::<Vec<&String>>(),
1612            vec![
1613                "otel_scope_scope",
1614                "host",
1615                "greptime_timestamp",
1616                "greptime_value"
1617            ]
1618        );
1619
1620        let table = tables.get_or_default_table_data("datamon_sum", 0, 0);
1621        assert_eq!(table.num_rows(), 1);
1622        assert_eq!(table.num_columns(), 4);
1623        assert_eq!(
1624            table
1625                .columns()
1626                .iter()
1627                .map(|c| &c.column_name)
1628                .collect::<Vec<&String>>(),
1629            vec![
1630                "otel_scope_scope",
1631                "host",
1632                "greptime_timestamp",
1633                "greptime_value"
1634            ]
1635        );
1636    }
1637
1638    #[test]
1639    fn test_encode_histogram() {
1640        let mut tables = MultiTableData::default();
1641
1642        let data_points = vec![HistogramDataPoint {
1643            attributes: vec![keyvalue("host", "testserver")],
1644            time_unix_nano: 100,
1645            start_time_unix_nano: 23,
1646            count: 25,
1647            sum: Some(100.),
1648            max: Some(200.),
1649            min: Some(0.03),
1650            bucket_counts: vec![2, 4, 6, 9, 4],
1651            explicit_bounds: vec![0.1, 1., 10., 100.],
1652            ..Default::default()
1653        }];
1654
1655        let histogram = Histogram {
1656            data_points,
1657            aggregation_temporality: AggregationTemporality::Delta.into(),
1658        };
1659        encode_histogram(
1660            &mut tables,
1661            "histo",
1662            &histogram,
1663            Some(&vec![]),
1664            Some(&vec![keyvalue("scope", "otel")]),
1665            &OtlpMetricCtx::default(),
1666        )
1667        .unwrap();
1668
1669        assert_eq!(3, tables.num_tables());
1670
1671        // bucket table
1672        let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
1673        assert_eq!(bucket_table.num_rows(), 5);
1674        assert_eq!(bucket_table.num_columns(), 5);
1675        assert_eq!(
1676            bucket_table
1677                .columns()
1678                .iter()
1679                .map(|c| &c.column_name)
1680                .collect::<Vec<&String>>(),
1681            vec![
1682                "otel_scope_scope",
1683                "host",
1684                "greptime_timestamp",
1685                "le",
1686                "greptime_value",
1687            ]
1688        );
1689
1690        let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
1691        assert_eq!(sum_table.num_rows(), 1);
1692        assert_eq!(sum_table.num_columns(), 4);
1693        assert_eq!(
1694            sum_table
1695                .columns()
1696                .iter()
1697                .map(|c| &c.column_name)
1698                .collect::<Vec<&String>>(),
1699            vec![
1700                "otel_scope_scope",
1701                "host",
1702                "greptime_timestamp",
1703                "greptime_value"
1704            ]
1705        );
1706
1707        let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
1708        assert_eq!(count_table.num_rows(), 1);
1709        assert_eq!(count_table.num_columns(), 4);
1710        assert_eq!(
1711            count_table
1712                .columns()
1713                .iter()
1714                .map(|c| &c.column_name)
1715                .collect::<Vec<&String>>(),
1716            vec![
1717                "otel_scope_scope",
1718                "host",
1719                "greptime_timestamp",
1720                "greptime_value"
1721            ]
1722        );
1723    }
1724}