Skip to main content

servers/otlp/
metrics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use ahash::HashSet;
16use api::v1::{RowInsertRequests, Value};
17use common_grpc::precision::Precision;
18use common_query::prelude::{GREPTIME_COUNT, greptime_timestamp, greptime_value};
19use lazy_static::lazy_static;
20use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
21use otel_arrow_rust::proto::opentelemetry::common::v1::{AnyValue, KeyValue, any_value};
22use otel_arrow_rust::proto::opentelemetry::metrics::v1::{metric, number_data_point, *};
23use session::protocol_ctx::{MetricType, OtlpMetricCtx};
24
25use crate::error::Result;
26use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME};
27use crate::row_writer::{self, MultiTableData, TableData};
28
29mod translator;
30
31pub use translator::legacy_normalize_otlp_name;
32use translator::{translate_label_name, translate_metric_name};
33
34/// the default column count for table writer
35const APPROXIMATE_COLUMN_COUNT: usize = 8;
36
37const COUNT_TABLE_SUFFIX: &str = "_count";
38const SUM_TABLE_SUFFIX: &str = "_sum";
39
40const JOB_KEY: &str = "job";
41const INSTANCE_KEY: &str = "instance";
42
43// see: https://prometheus.io/docs/guides/opentelemetry/#promoting-resource-attributes
44const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [
45    "service.instance.id",
46    "service.name",
47    "service.namespace",
48    "service.version",
49    "cloud.availability_zone",
50    "cloud.region",
51    "container.name",
52    "deployment.environment",
53    "deployment.environment.name",
54    "k8s.cluster.name",
55    "k8s.container.name",
56    "k8s.cronjob.name",
57    "k8s.daemonset.name",
58    "k8s.deployment.name",
59    "k8s.job.name",
60    "k8s.namespace.name",
61    "k8s.pod.name",
62    "k8s.replicaset.name",
63    "k8s.statefulset.name",
64];
65
66lazy_static! {
67    static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet<String> =
68        HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string()));
69}
70
71const OTEL_SCOPE_NAME: &str = "name";
72const OTEL_SCOPE_VERSION: &str = "version";
73const OTEL_SCOPE_SCHEMA_URL: &str = "schema_url";
74
75/// Convert OpenTelemetry metrics to GreptimeDB insert requests
76///
77/// See
78/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto>
79/// for data structure of OTLP metrics.
80///
81/// Returns `InsertRequests` and total number of rows to ingest
82pub fn to_grpc_insert_requests(
83    request: ExportMetricsServiceRequest,
84    metric_ctx: &mut OtlpMetricCtx,
85) -> Result<(RowInsertRequests, usize)> {
86    let mut table_writer = MultiTableData::default();
87
88    for resource in &request.resource_metrics {
89        let resource_attrs = resource.resource.as_ref().map(|r| {
90            let mut attrs = r.attributes.clone();
91            process_resource_attrs(&mut attrs, metric_ctx);
92            attrs
93        });
94
95        for scope in &resource.scope_metrics {
96            let scope_attrs = process_scope_attrs(scope, metric_ctx);
97
98            for metric in &scope.metrics {
99                if metric.data.is_none() {
100                    continue;
101                }
102                if let Some(t) = metric.data.as_ref().map(from_metric_type) {
103                    metric_ctx.set_metric_type(t);
104                }
105
106                encode_metrics(
107                    &mut table_writer,
108                    metric,
109                    resource_attrs.as_ref(),
110                    scope_attrs.as_ref(),
111                    metric_ctx,
112                )?;
113            }
114        }
115    }
116
117    Ok(table_writer.into_row_insert_requests())
118}
119
120fn from_metric_type(data: &metric::Data) -> MetricType {
121    match data {
122        metric::Data::Gauge(_) => MetricType::Gauge,
123        metric::Data::Sum(s) => {
124            if s.is_monotonic {
125                MetricType::MonotonicSum
126            } else {
127                MetricType::NonMonotonicSum
128            }
129        }
130        metric::Data::Histogram(_) => MetricType::Histogram,
131        metric::Data::ExponentialHistogram(_) => MetricType::ExponentialHistogram,
132        metric::Data::Summary(_) => MetricType::Summary,
133    }
134}
135
136fn process_resource_attrs(attrs: &mut Vec<KeyValue>, metric_ctx: &OtlpMetricCtx) {
137    if metric_ctx.is_legacy {
138        return;
139    }
140
141    // remap service.name and service.instance.id to job and instance
142    let mut tmp = Vec::with_capacity(2);
143    for kv in attrs.iter() {
144        match &kv.key as &str {
145            KEY_SERVICE_NAME => {
146                tmp.push(KeyValue {
147                    key: JOB_KEY.to_string(),
148                    value: kv.value.clone(),
149                });
150            }
151            KEY_SERVICE_INSTANCE_ID => {
152                tmp.push(KeyValue {
153                    key: INSTANCE_KEY.to_string(),
154                    value: kv.value.clone(),
155                });
156            }
157            _ => {}
158        }
159    }
160
161    // if promote all, then exclude the list, else, include the list
162    if metric_ctx.promote_all_resource_attrs {
163        attrs.retain(|kv| !metric_ctx.resource_attrs.contains(&kv.key));
164    } else {
165        attrs.retain(|kv| {
166            metric_ctx.resource_attrs.contains(&kv.key)
167                || DEFAULT_PROMOTE_ATTRS_SET.contains(&kv.key)
168        });
169    }
170
171    attrs.extend(tmp);
172}
173
174fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Option<Vec<KeyValue>> {
175    if metric_ctx.is_legacy {
176        return scope.scope.as_ref().map(|s| s.attributes.clone());
177    };
178
179    if !metric_ctx.promote_scope_attrs {
180        return None;
181    }
182
183    // persist scope attrs with name, version and schema_url
184    scope.scope.as_ref().map(|s| {
185        let mut attrs = s.attributes.clone();
186        attrs.push(KeyValue {
187            key: OTEL_SCOPE_NAME.to_string(),
188            value: Some(AnyValue {
189                value: Some(any_value::Value::StringValue(s.name.clone())),
190            }),
191        });
192        attrs.push(KeyValue {
193            key: OTEL_SCOPE_VERSION.to_string(),
194            value: Some(AnyValue {
195                value: Some(any_value::Value::StringValue(s.version.clone())),
196            }),
197        });
198        attrs.push(KeyValue {
199            key: OTEL_SCOPE_SCHEMA_URL.to_string(),
200            value: Some(AnyValue {
201                value: Some(any_value::Value::StringValue(scope.schema_url.clone())),
202            }),
203        });
204        attrs
205    })
206}
207
208fn encode_metrics(
209    table_writer: &mut MultiTableData,
210    metric: &Metric,
211    resource_attrs: Option<&Vec<KeyValue>>,
212    scope_attrs: Option<&Vec<KeyValue>>,
213    metric_ctx: &OtlpMetricCtx,
214) -> Result<()> {
215    let name = if metric_ctx.is_legacy {
216        legacy_normalize_otlp_name(&metric.name)
217    } else {
218        translate_metric_name(
219            metric,
220            &metric_ctx.metric_type,
221            metric_ctx.metric_translation_strategy,
222        )
223    };
224
225    // note that we don't store description or unit, we might want to deal with
226    // these fields in the future.
227    if let Some(data) = &metric.data {
228        match data {
229            metric::Data::Gauge(gauge) => {
230                encode_gauge(
231                    table_writer,
232                    &name,
233                    gauge,
234                    resource_attrs,
235                    scope_attrs,
236                    metric_ctx,
237                )?;
238            }
239            metric::Data::Sum(sum) => {
240                encode_sum(
241                    table_writer,
242                    &name,
243                    sum,
244                    resource_attrs,
245                    scope_attrs,
246                    metric_ctx,
247                )?;
248            }
249            metric::Data::Summary(summary) => {
250                encode_summary(
251                    table_writer,
252                    &name,
253                    summary,
254                    resource_attrs,
255                    scope_attrs,
256                    metric_ctx,
257                )?;
258            }
259            metric::Data::Histogram(hist) => {
260                encode_histogram(
261                    table_writer,
262                    &name,
263                    hist,
264                    resource_attrs,
265                    scope_attrs,
266                    metric_ctx,
267                )?;
268            }
269            // TODO(sunng87) leave ExponentialHistogram for next release
270            metric::Data::ExponentialHistogram(_hist) => {}
271        }
272    }
273
274    Ok(())
275}
276
277#[derive(Debug, Clone, Copy, PartialEq, Eq)]
278enum AttributeType {
279    Resource,
280    Scope,
281    DataPoint,
282    Legacy,
283}
284
285fn write_attributes(
286    writer: &mut TableData,
287    row: &mut Vec<Value>,
288    attrs: Option<&Vec<KeyValue>>,
289    attribute_type: AttributeType,
290    metric_ctx: &OtlpMetricCtx,
291) -> Result<()> {
292    let Some(attrs) = attrs else {
293        return Ok(());
294    };
295
296    let tags = attrs.iter().filter_map(|attr| {
297        attr.value
298            .as_ref()
299            .and_then(|v| v.value.as_ref())
300            .and_then(|val| {
301                let key = match attribute_type {
302                    AttributeType::Resource | AttributeType::DataPoint => {
303                        translate_label_name(&attr.key, metric_ctx.metric_translation_strategy)
304                    }
305                    AttributeType::Scope => {
306                        format!(
307                            "otel_scope_{}",
308                            translate_label_name(&attr.key, metric_ctx.metric_translation_strategy)
309                        )
310                    }
311                    AttributeType::Legacy => legacy_normalize_otlp_name(&attr.key),
312                };
313                match val {
314                    any_value::Value::StringValue(s) => Some((key, s.clone())),
315                    any_value::Value::IntValue(v) => Some((key, v.to_string())),
316                    any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
317                    _ => None, // TODO(sunng87): allow different type of values
318                }
319            })
320    });
321    row_writer::write_tags(writer, tags, row)?;
322
323    Ok(())
324}
325
326fn write_timestamp(
327    table: &mut TableData,
328    row: &mut Vec<Value>,
329    time_nano: i64,
330    legacy_mode: bool,
331) -> Result<()> {
332    if legacy_mode {
333        row_writer::write_ts_to_nanos(
334            table,
335            greptime_timestamp(),
336            Some(time_nano),
337            Precision::Nanosecond,
338            row,
339        )
340    } else {
341        row_writer::write_ts_to_millis(
342            table,
343            greptime_timestamp(),
344            Some(time_nano / 1000000),
345            Precision::Millisecond,
346            row,
347        )
348    }
349}
350
351fn write_data_point_value(
352    table: &mut TableData,
353    row: &mut Vec<Value>,
354    field: &str,
355    value: &Option<number_data_point::Value>,
356) -> Result<()> {
357    match value {
358        Some(number_data_point::Value::AsInt(val)) => {
359            // we coerce all values to f64
360            row_writer::write_f64(table, field, *val as f64, row)?;
361        }
362        Some(number_data_point::Value::AsDouble(val)) => {
363            row_writer::write_f64(table, field, *val, row)?;
364        }
365        _ => {}
366    }
367    Ok(())
368}
369
370fn write_tags_and_timestamp(
371    table: &mut TableData,
372    row: &mut Vec<Value>,
373    resource_attrs: Option<&Vec<KeyValue>>,
374    scope_attrs: Option<&Vec<KeyValue>>,
375    data_point_attrs: Option<&Vec<KeyValue>>,
376    timestamp_nanos: i64,
377    metric_ctx: &OtlpMetricCtx,
378) -> Result<()> {
379    if metric_ctx.is_legacy {
380        write_attributes(
381            table,
382            row,
383            resource_attrs,
384            AttributeType::Legacy,
385            metric_ctx,
386        )?;
387        write_attributes(table, row, scope_attrs, AttributeType::Legacy, metric_ctx)?;
388        write_attributes(
389            table,
390            row,
391            data_point_attrs,
392            AttributeType::Legacy,
393            metric_ctx,
394        )?;
395    } else {
396        // TODO(shuiyisong): check `__type__` and `__unit__` tags in prometheus
397        write_attributes(
398            table,
399            row,
400            resource_attrs,
401            AttributeType::Resource,
402            metric_ctx,
403        )?;
404        write_attributes(table, row, scope_attrs, AttributeType::Scope, metric_ctx)?;
405        write_attributes(
406            table,
407            row,
408            data_point_attrs,
409            AttributeType::DataPoint,
410            metric_ctx,
411        )?;
412    }
413
414    write_timestamp(table, row, timestamp_nanos, metric_ctx.is_legacy)?;
415
416    Ok(())
417}
418
419/// encode this gauge metric
420///
421/// note that there can be multiple data points in the request, it's going to be
422/// stored as multiple rows
423fn encode_gauge(
424    table_writer: &mut MultiTableData,
425    name: &str,
426    gauge: &Gauge,
427    resource_attrs: Option<&Vec<KeyValue>>,
428    scope_attrs: Option<&Vec<KeyValue>>,
429    metric_ctx: &OtlpMetricCtx,
430) -> Result<()> {
431    let table = table_writer.get_or_default_table_data(
432        name,
433        APPROXIMATE_COLUMN_COUNT,
434        gauge.data_points.len(),
435    );
436
437    for data_point in &gauge.data_points {
438        let mut row = table.alloc_one_row();
439        write_tags_and_timestamp(
440            table,
441            &mut row,
442            resource_attrs,
443            scope_attrs,
444            Some(data_point.attributes.as_ref()),
445            data_point.time_unix_nano as i64,
446            metric_ctx,
447        )?;
448
449        write_data_point_value(table, &mut row, greptime_value(), &data_point.value)?;
450        table.add_row(row);
451    }
452
453    Ok(())
454}
455
456/// encode this sum metric
457///
458/// `aggregation_temporality` and `monotonic` are ignored for now
459fn encode_sum(
460    table_writer: &mut MultiTableData,
461    name: &str,
462    sum: &Sum,
463    resource_attrs: Option<&Vec<KeyValue>>,
464    scope_attrs: Option<&Vec<KeyValue>>,
465    metric_ctx: &OtlpMetricCtx,
466) -> Result<()> {
467    let table = table_writer.get_or_default_table_data(
468        name,
469        APPROXIMATE_COLUMN_COUNT,
470        sum.data_points.len(),
471    );
472
473    for data_point in &sum.data_points {
474        let mut row = table.alloc_one_row();
475        write_tags_and_timestamp(
476            table,
477            &mut row,
478            resource_attrs,
479            scope_attrs,
480            Some(data_point.attributes.as_ref()),
481            data_point.time_unix_nano as i64,
482            metric_ctx,
483        )?;
484        write_data_point_value(table, &mut row, greptime_value(), &data_point.value)?;
485        table.add_row(row);
486    }
487
488    Ok(())
489}
490
491const HISTOGRAM_LE_COLUMN: &str = "le";
492
493/// Encode histogram data. This function returns 3 insert requests for 3 tables.
494///
495/// The implementation has been following Prometheus histogram table format:
496///
497/// - A `%metric%_bucket` table including `greptime_le` tag that stores bucket upper
498///   limit, and `greptime_value` for bucket count
499/// - A `%metric%_sum` table storing sum of samples
500/// -  A `%metric%_count` table storing count of samples.
501///
502/// By its Prometheus compatibility, we hope to be able to use prometheus
503/// quantile functions on this table.
504fn encode_histogram(
505    table_writer: &mut MultiTableData,
506    name: &str,
507    hist: &Histogram,
508    resource_attrs: Option<&Vec<KeyValue>>,
509    scope_attrs: Option<&Vec<KeyValue>>,
510    metric_ctx: &OtlpMetricCtx,
511) -> Result<()> {
512    let normalized_name = name;
513
514    let bucket_table_name = format!("{}_bucket", normalized_name);
515    let sum_table_name = format!("{}_sum", normalized_name);
516    let count_table_name = format!("{}_count", normalized_name);
517
518    let data_points_len = hist.data_points.len();
519    // Note that the row and columns number here is approximate
520    let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
521    let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
522    let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
523
524    for data_point in &hist.data_points {
525        let mut accumulated_count = 0;
526        for (idx, count) in data_point.bucket_counts.iter().enumerate() {
527            let mut bucket_row = bucket_table.alloc_one_row();
528            write_tags_and_timestamp(
529                &mut bucket_table,
530                &mut bucket_row,
531                resource_attrs,
532                scope_attrs,
533                Some(data_point.attributes.as_ref()),
534                data_point.time_unix_nano as i64,
535                metric_ctx,
536            )?;
537
538            if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
539                row_writer::write_tag(
540                    &mut bucket_table,
541                    HISTOGRAM_LE_COLUMN,
542                    upper_bounds,
543                    &mut bucket_row,
544                )?;
545            } else if idx == data_point.explicit_bounds.len() {
546                // The last bucket
547                row_writer::write_tag(
548                    &mut bucket_table,
549                    HISTOGRAM_LE_COLUMN,
550                    f64::INFINITY,
551                    &mut bucket_row,
552                )?;
553            }
554
555            accumulated_count += count;
556            row_writer::write_f64(
557                &mut bucket_table,
558                greptime_value(),
559                accumulated_count as f64,
560                &mut bucket_row,
561            )?;
562
563            bucket_table.add_row(bucket_row);
564        }
565
566        if let Some(sum) = data_point.sum {
567            let mut sum_row = sum_table.alloc_one_row();
568            write_tags_and_timestamp(
569                &mut sum_table,
570                &mut sum_row,
571                resource_attrs,
572                scope_attrs,
573                Some(data_point.attributes.as_ref()),
574                data_point.time_unix_nano as i64,
575                metric_ctx,
576            )?;
577
578            row_writer::write_f64(&mut sum_table, greptime_value(), sum, &mut sum_row)?;
579            sum_table.add_row(sum_row);
580        }
581
582        let mut count_row = count_table.alloc_one_row();
583        write_tags_and_timestamp(
584            &mut count_table,
585            &mut count_row,
586            resource_attrs,
587            scope_attrs,
588            Some(data_point.attributes.as_ref()),
589            data_point.time_unix_nano as i64,
590            metric_ctx,
591        )?;
592
593        row_writer::write_f64(
594            &mut count_table,
595            greptime_value(),
596            data_point.count as f64,
597            &mut count_row,
598        )?;
599        count_table.add_row(count_row);
600    }
601
602    table_writer.add_table_data(bucket_table_name, bucket_table);
603    table_writer.add_table_data(sum_table_name, sum_table);
604    table_writer.add_table_data(count_table_name, count_table);
605
606    Ok(())
607}
608
609#[allow(dead_code)]
610fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
611    // TODO(sunng87): implement this using a prometheus compatible way
612    Ok(())
613}
614
615fn encode_summary(
616    table_writer: &mut MultiTableData,
617    name: &str,
618    summary: &Summary,
619    resource_attrs: Option<&Vec<KeyValue>>,
620    scope_attrs: Option<&Vec<KeyValue>>,
621    metric_ctx: &OtlpMetricCtx,
622) -> Result<()> {
623    if metric_ctx.is_legacy {
624        let table = table_writer.get_or_default_table_data(
625            name,
626            APPROXIMATE_COLUMN_COUNT,
627            summary.data_points.len(),
628        );
629
630        for data_point in &summary.data_points {
631            let mut row = table.alloc_one_row();
632            write_tags_and_timestamp(
633                table,
634                &mut row,
635                resource_attrs,
636                scope_attrs,
637                Some(data_point.attributes.as_ref()),
638                data_point.time_unix_nano as i64,
639                metric_ctx,
640            )?;
641
642            for quantile in &data_point.quantile_values {
643                row_writer::write_f64(
644                    table,
645                    format!("greptime_p{:02}", quantile.quantile * 100f64),
646                    quantile.value,
647                    &mut row,
648                )?;
649            }
650
651            row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
652            table.add_row(row);
653        }
654    } else {
655        // 1. quantile table
656        // 2. count table
657        // 3. sum table
658
659        let metric_name = name;
660        let count_name = format!("{}{}", metric_name, COUNT_TABLE_SUFFIX);
661        let sum_name = format!("{}{}", metric_name, SUM_TABLE_SUFFIX);
662
663        for data_point in &summary.data_points {
664            {
665                let quantile_table = table_writer.get_or_default_table_data(
666                    metric_name,
667                    APPROXIMATE_COLUMN_COUNT,
668                    summary.data_points.len(),
669                );
670
671                for quantile in &data_point.quantile_values {
672                    let mut row = quantile_table.alloc_one_row();
673                    write_tags_and_timestamp(
674                        quantile_table,
675                        &mut row,
676                        resource_attrs,
677                        scope_attrs,
678                        Some(data_point.attributes.as_ref()),
679                        data_point.time_unix_nano as i64,
680                        metric_ctx,
681                    )?;
682                    row_writer::write_tag(quantile_table, "quantile", quantile.quantile, &mut row)?;
683                    row_writer::write_f64(
684                        quantile_table,
685                        greptime_value(),
686                        quantile.value,
687                        &mut row,
688                    )?;
689                    quantile_table.add_row(row);
690                }
691            }
692            {
693                let count_table = table_writer.get_or_default_table_data(
694                    &count_name,
695                    APPROXIMATE_COLUMN_COUNT,
696                    summary.data_points.len(),
697                );
698                let mut row = count_table.alloc_one_row();
699                write_tags_and_timestamp(
700                    count_table,
701                    &mut row,
702                    resource_attrs,
703                    scope_attrs,
704                    Some(data_point.attributes.as_ref()),
705                    data_point.time_unix_nano as i64,
706                    metric_ctx,
707                )?;
708
709                row_writer::write_f64(
710                    count_table,
711                    greptime_value(),
712                    data_point.count as f64,
713                    &mut row,
714                )?;
715
716                count_table.add_row(row);
717            }
718            {
719                let sum_table = table_writer.get_or_default_table_data(
720                    &sum_name,
721                    APPROXIMATE_COLUMN_COUNT,
722                    summary.data_points.len(),
723                );
724
725                let mut row = sum_table.alloc_one_row();
726                write_tags_and_timestamp(
727                    sum_table,
728                    &mut row,
729                    resource_attrs,
730                    scope_attrs,
731                    Some(data_point.attributes.as_ref()),
732                    data_point.time_unix_nano as i64,
733                    metric_ctx,
734                )?;
735
736                row_writer::write_f64(sum_table, greptime_value(), data_point.sum, &mut row)?;
737
738                sum_table.add_row(row);
739            }
740        }
741    }
742
743    Ok(())
744}
745
746#[cfg(test)]
747mod tests {
748    use otel_arrow_rust::proto::opentelemetry::common::v1::AnyValue;
749    use otel_arrow_rust::proto::opentelemetry::common::v1::any_value::Value as Val;
750    use otel_arrow_rust::proto::opentelemetry::metrics::v1::number_data_point::Value;
751    use otel_arrow_rust::proto::opentelemetry::metrics::v1::summary_data_point::ValueAtQuantile;
752    use otel_arrow_rust::proto::opentelemetry::metrics::v1::{
753        AggregationTemporality, HistogramDataPoint, NumberDataPoint, SummaryDataPoint,
754    };
755
756    use super::*;
757
758    fn keyvalue(key: &str, value: &str) -> KeyValue {
759        KeyValue {
760            key: key.into(),
761            value: Some(AnyValue {
762                value: Some(Val::StringValue(value.into())),
763            }),
764        }
765    }
766
767    #[test]
768    fn test_encode_gauge() {
769        let mut tables = MultiTableData::default();
770
771        let data_points = vec![
772            NumberDataPoint {
773                attributes: vec![keyvalue("host", "testsevrer")],
774                time_unix_nano: 100,
775                value: Some(Value::AsInt(100)),
776                ..Default::default()
777            },
778            NumberDataPoint {
779                attributes: vec![keyvalue("host", "testserver")],
780                time_unix_nano: 105,
781                value: Some(Value::AsInt(105)),
782                ..Default::default()
783            },
784        ];
785        let gauge = Gauge { data_points };
786        encode_gauge(
787            &mut tables,
788            "datamon",
789            &gauge,
790            Some(&vec![]),
791            Some(&vec![keyvalue("scope", "otel")]),
792            &OtlpMetricCtx::default(),
793        )
794        .unwrap();
795
796        let table = tables.get_or_default_table_data("datamon", 0, 0);
797        assert_eq!(table.num_rows(), 2);
798        assert_eq!(table.num_columns(), 4);
799        assert_eq!(
800            table
801                .columns()
802                .iter()
803                .map(|c| &c.column_name)
804                .collect::<Vec<&String>>(),
805            vec![
806                "otel_scope_scope",
807                "host",
808                greptime_timestamp(),
809                greptime_value()
810            ]
811        );
812    }
813
814    #[test]
815    fn test_encode_sum() {
816        let mut tables = MultiTableData::default();
817
818        let data_points = vec![
819            NumberDataPoint {
820                attributes: vec![keyvalue("host", "testserver")],
821                time_unix_nano: 100,
822                value: Some(Value::AsInt(100)),
823                ..Default::default()
824            },
825            NumberDataPoint {
826                attributes: vec![keyvalue("host", "testserver")],
827                time_unix_nano: 105,
828                value: Some(Value::AsInt(0)),
829                ..Default::default()
830            },
831        ];
832        let sum = Sum {
833            data_points,
834            ..Default::default()
835        };
836        encode_sum(
837            &mut tables,
838            "datamon",
839            &sum,
840            Some(&vec![]),
841            Some(&vec![keyvalue("scope", "otel")]),
842            &OtlpMetricCtx::default(),
843        )
844        .unwrap();
845
846        let table = tables.get_or_default_table_data("datamon", 0, 0);
847        assert_eq!(table.num_rows(), 2);
848        assert_eq!(table.num_columns(), 4);
849        assert_eq!(
850            table
851                .columns()
852                .iter()
853                .map(|c| &c.column_name)
854                .collect::<Vec<&String>>(),
855            vec![
856                "otel_scope_scope",
857                "host",
858                greptime_timestamp(),
859                greptime_value()
860            ]
861        );
862    }
863
864    #[test]
865    fn test_encode_summary() {
866        let mut tables = MultiTableData::default();
867
868        let data_points = vec![SummaryDataPoint {
869            attributes: vec![keyvalue("host", "testserver")],
870            time_unix_nano: 100,
871            count: 25,
872            sum: 5400.0,
873            quantile_values: vec![
874                ValueAtQuantile {
875                    quantile: 0.90,
876                    value: 1000.0,
877                },
878                ValueAtQuantile {
879                    quantile: 0.95,
880                    value: 3030.0,
881                },
882            ],
883            ..Default::default()
884        }];
885        let summary = Summary { data_points };
886        encode_summary(
887            &mut tables,
888            "datamon",
889            &summary,
890            Some(&vec![]),
891            Some(&vec![keyvalue("scope", "otel")]),
892            &OtlpMetricCtx::default(),
893        )
894        .unwrap();
895
896        let table = tables.get_or_default_table_data("datamon", 0, 0);
897        assert_eq!(table.num_rows(), 2);
898        assert_eq!(table.num_columns(), 5);
899        assert_eq!(
900            table
901                .columns()
902                .iter()
903                .map(|c| &c.column_name)
904                .collect::<Vec<&String>>(),
905            vec![
906                "otel_scope_scope",
907                "host",
908                greptime_timestamp(),
909                "quantile",
910                greptime_value()
911            ]
912        );
913
914        let table = tables.get_or_default_table_data("datamon_count", 0, 0);
915        assert_eq!(table.num_rows(), 1);
916        assert_eq!(table.num_columns(), 4);
917        assert_eq!(
918            table
919                .columns()
920                .iter()
921                .map(|c| &c.column_name)
922                .collect::<Vec<&String>>(),
923            vec![
924                "otel_scope_scope",
925                "host",
926                greptime_timestamp(),
927                greptime_value()
928            ]
929        );
930
931        let table = tables.get_or_default_table_data("datamon_sum", 0, 0);
932        assert_eq!(table.num_rows(), 1);
933        assert_eq!(table.num_columns(), 4);
934        assert_eq!(
935            table
936                .columns()
937                .iter()
938                .map(|c| &c.column_name)
939                .collect::<Vec<&String>>(),
940            vec![
941                "otel_scope_scope",
942                "host",
943                greptime_timestamp(),
944                greptime_value()
945            ]
946        );
947    }
948
949    #[test]
950    fn test_encode_histogram() {
951        let mut tables = MultiTableData::default();
952
953        let data_points = vec![HistogramDataPoint {
954            attributes: vec![keyvalue("host", "testserver")],
955            time_unix_nano: 100,
956            start_time_unix_nano: 23,
957            count: 25,
958            sum: Some(100.),
959            max: Some(200.),
960            min: Some(0.03),
961            bucket_counts: vec![2, 4, 6, 9, 4],
962            explicit_bounds: vec![0.1, 1., 10., 100.],
963            ..Default::default()
964        }];
965
966        let histogram = Histogram {
967            data_points,
968            aggregation_temporality: AggregationTemporality::Delta.into(),
969        };
970        encode_histogram(
971            &mut tables,
972            "histo",
973            &histogram,
974            Some(&vec![]),
975            Some(&vec![keyvalue("scope", "otel")]),
976            &OtlpMetricCtx::default(),
977        )
978        .unwrap();
979
980        assert_eq!(3, tables.num_tables());
981
982        // bucket table
983        let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
984        assert_eq!(bucket_table.num_rows(), 5);
985        assert_eq!(bucket_table.num_columns(), 5);
986        assert_eq!(
987            bucket_table
988                .columns()
989                .iter()
990                .map(|c| &c.column_name)
991                .collect::<Vec<&String>>(),
992            vec![
993                "otel_scope_scope",
994                "host",
995                greptime_timestamp(),
996                "le",
997                greptime_value(),
998            ]
999        );
1000
1001        let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
1002        assert_eq!(sum_table.num_rows(), 1);
1003        assert_eq!(sum_table.num_columns(), 4);
1004        assert_eq!(
1005            sum_table
1006                .columns()
1007                .iter()
1008                .map(|c| &c.column_name)
1009                .collect::<Vec<&String>>(),
1010            vec![
1011                "otel_scope_scope",
1012                "host",
1013                greptime_timestamp(),
1014                greptime_value()
1015            ]
1016        );
1017
1018        let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
1019        assert_eq!(count_table.num_rows(), 1);
1020        assert_eq!(count_table.num_columns(), 4);
1021        assert_eq!(
1022            count_table
1023                .columns()
1024                .iter()
1025                .map(|c| &c.column_name)
1026                .collect::<Vec<&String>>(),
1027            vec![
1028                "otel_scope_scope",
1029                "host",
1030                greptime_timestamp(),
1031                greptime_value()
1032            ]
1033        );
1034    }
1035}