1use ahash::HashSet;
16use api::v1::{RowInsertRequests, Value};
17use common_grpc::precision::Precision;
18use common_query::prelude::{GREPTIME_COUNT, greptime_timestamp, greptime_value};
19use lazy_static::lazy_static;
20use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
21use otel_arrow_rust::proto::opentelemetry::common::v1::{AnyValue, KeyValue, any_value};
22use otel_arrow_rust::proto::opentelemetry::metrics::v1::{metric, number_data_point, *};
23use session::protocol_ctx::{MetricType, OtlpMetricCtx};
24
25use crate::error::Result;
26use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME};
27use crate::row_writer::{self, MultiTableData, TableData};
28
29mod translator;
30
31pub use translator::legacy_normalize_otlp_name;
32use translator::{translate_label_name, translate_metric_name};
33
34const APPROXIMATE_COLUMN_COUNT: usize = 8;
36
37const COUNT_TABLE_SUFFIX: &str = "_count";
38const SUM_TABLE_SUFFIX: &str = "_sum";
39
40const JOB_KEY: &str = "job";
41const INSTANCE_KEY: &str = "instance";
42
43const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [
45 "service.instance.id",
46 "service.name",
47 "service.namespace",
48 "service.version",
49 "cloud.availability_zone",
50 "cloud.region",
51 "container.name",
52 "deployment.environment",
53 "deployment.environment.name",
54 "k8s.cluster.name",
55 "k8s.container.name",
56 "k8s.cronjob.name",
57 "k8s.daemonset.name",
58 "k8s.deployment.name",
59 "k8s.job.name",
60 "k8s.namespace.name",
61 "k8s.pod.name",
62 "k8s.replicaset.name",
63 "k8s.statefulset.name",
64];
65
66lazy_static! {
67 static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet<String> =
68 HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string()));
69}
70
71const OTEL_SCOPE_NAME: &str = "name";
72const OTEL_SCOPE_VERSION: &str = "version";
73const OTEL_SCOPE_SCHEMA_URL: &str = "schema_url";
74
75pub fn to_grpc_insert_requests(
83 request: ExportMetricsServiceRequest,
84 metric_ctx: &mut OtlpMetricCtx,
85) -> Result<(RowInsertRequests, usize)> {
86 let mut table_writer = MultiTableData::default();
87
88 for resource in &request.resource_metrics {
89 let resource_attrs = resource.resource.as_ref().map(|r| {
90 let mut attrs = r.attributes.clone();
91 process_resource_attrs(&mut attrs, metric_ctx);
92 attrs
93 });
94
95 for scope in &resource.scope_metrics {
96 let scope_attrs = process_scope_attrs(scope, metric_ctx);
97
98 for metric in &scope.metrics {
99 if metric.data.is_none() {
100 continue;
101 }
102 if let Some(t) = metric.data.as_ref().map(from_metric_type) {
103 metric_ctx.set_metric_type(t);
104 }
105
106 encode_metrics(
107 &mut table_writer,
108 metric,
109 resource_attrs.as_ref(),
110 scope_attrs.as_ref(),
111 metric_ctx,
112 )?;
113 }
114 }
115 }
116
117 Ok(table_writer.into_row_insert_requests())
118}
119
120fn from_metric_type(data: &metric::Data) -> MetricType {
121 match data {
122 metric::Data::Gauge(_) => MetricType::Gauge,
123 metric::Data::Sum(s) => {
124 if s.is_monotonic {
125 MetricType::MonotonicSum
126 } else {
127 MetricType::NonMonotonicSum
128 }
129 }
130 metric::Data::Histogram(_) => MetricType::Histogram,
131 metric::Data::ExponentialHistogram(_) => MetricType::ExponentialHistogram,
132 metric::Data::Summary(_) => MetricType::Summary,
133 }
134}
135
136fn process_resource_attrs(attrs: &mut Vec<KeyValue>, metric_ctx: &OtlpMetricCtx) {
137 if metric_ctx.is_legacy {
138 return;
139 }
140
141 let mut tmp = Vec::with_capacity(2);
143 for kv in attrs.iter() {
144 match &kv.key as &str {
145 KEY_SERVICE_NAME => {
146 tmp.push(KeyValue {
147 key: JOB_KEY.to_string(),
148 value: kv.value.clone(),
149 });
150 }
151 KEY_SERVICE_INSTANCE_ID => {
152 tmp.push(KeyValue {
153 key: INSTANCE_KEY.to_string(),
154 value: kv.value.clone(),
155 });
156 }
157 _ => {}
158 }
159 }
160
161 if metric_ctx.promote_all_resource_attrs {
163 attrs.retain(|kv| !metric_ctx.resource_attrs.contains(&kv.key));
164 } else {
165 attrs.retain(|kv| {
166 metric_ctx.resource_attrs.contains(&kv.key)
167 || DEFAULT_PROMOTE_ATTRS_SET.contains(&kv.key)
168 });
169 }
170
171 attrs.extend(tmp);
172}
173
174fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Option<Vec<KeyValue>> {
175 if metric_ctx.is_legacy {
176 return scope.scope.as_ref().map(|s| s.attributes.clone());
177 };
178
179 if !metric_ctx.promote_scope_attrs {
180 return None;
181 }
182
183 scope.scope.as_ref().map(|s| {
185 let mut attrs = s.attributes.clone();
186 attrs.push(KeyValue {
187 key: OTEL_SCOPE_NAME.to_string(),
188 value: Some(AnyValue {
189 value: Some(any_value::Value::StringValue(s.name.clone())),
190 }),
191 });
192 attrs.push(KeyValue {
193 key: OTEL_SCOPE_VERSION.to_string(),
194 value: Some(AnyValue {
195 value: Some(any_value::Value::StringValue(s.version.clone())),
196 }),
197 });
198 attrs.push(KeyValue {
199 key: OTEL_SCOPE_SCHEMA_URL.to_string(),
200 value: Some(AnyValue {
201 value: Some(any_value::Value::StringValue(scope.schema_url.clone())),
202 }),
203 });
204 attrs
205 })
206}
207
208fn encode_metrics(
209 table_writer: &mut MultiTableData,
210 metric: &Metric,
211 resource_attrs: Option<&Vec<KeyValue>>,
212 scope_attrs: Option<&Vec<KeyValue>>,
213 metric_ctx: &OtlpMetricCtx,
214) -> Result<()> {
215 let name = if metric_ctx.is_legacy {
216 legacy_normalize_otlp_name(&metric.name)
217 } else {
218 translate_metric_name(
219 metric,
220 &metric_ctx.metric_type,
221 metric_ctx.metric_translation_strategy,
222 )
223 };
224
225 if let Some(data) = &metric.data {
228 match data {
229 metric::Data::Gauge(gauge) => {
230 encode_gauge(
231 table_writer,
232 &name,
233 gauge,
234 resource_attrs,
235 scope_attrs,
236 metric_ctx,
237 )?;
238 }
239 metric::Data::Sum(sum) => {
240 encode_sum(
241 table_writer,
242 &name,
243 sum,
244 resource_attrs,
245 scope_attrs,
246 metric_ctx,
247 )?;
248 }
249 metric::Data::Summary(summary) => {
250 encode_summary(
251 table_writer,
252 &name,
253 summary,
254 resource_attrs,
255 scope_attrs,
256 metric_ctx,
257 )?;
258 }
259 metric::Data::Histogram(hist) => {
260 encode_histogram(
261 table_writer,
262 &name,
263 hist,
264 resource_attrs,
265 scope_attrs,
266 metric_ctx,
267 )?;
268 }
269 metric::Data::ExponentialHistogram(_hist) => {}
271 }
272 }
273
274 Ok(())
275}
276
277#[derive(Debug, Clone, Copy, PartialEq, Eq)]
278enum AttributeType {
279 Resource,
280 Scope,
281 DataPoint,
282 Legacy,
283}
284
285fn write_attributes(
286 writer: &mut TableData,
287 row: &mut Vec<Value>,
288 attrs: Option<&Vec<KeyValue>>,
289 attribute_type: AttributeType,
290 metric_ctx: &OtlpMetricCtx,
291) -> Result<()> {
292 let Some(attrs) = attrs else {
293 return Ok(());
294 };
295
296 let tags = attrs.iter().filter_map(|attr| {
297 attr.value
298 .as_ref()
299 .and_then(|v| v.value.as_ref())
300 .and_then(|val| {
301 let key = match attribute_type {
302 AttributeType::Resource | AttributeType::DataPoint => {
303 translate_label_name(&attr.key, metric_ctx.metric_translation_strategy)
304 }
305 AttributeType::Scope => {
306 format!(
307 "otel_scope_{}",
308 translate_label_name(&attr.key, metric_ctx.metric_translation_strategy)
309 )
310 }
311 AttributeType::Legacy => legacy_normalize_otlp_name(&attr.key),
312 };
313 match val {
314 any_value::Value::StringValue(s) => Some((key, s.clone())),
315 any_value::Value::IntValue(v) => Some((key, v.to_string())),
316 any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
317 _ => None, }
319 })
320 });
321 row_writer::write_tags(writer, tags, row)?;
322
323 Ok(())
324}
325
326fn write_timestamp(
327 table: &mut TableData,
328 row: &mut Vec<Value>,
329 time_nano: i64,
330 legacy_mode: bool,
331) -> Result<()> {
332 if legacy_mode {
333 row_writer::write_ts_to_nanos(
334 table,
335 greptime_timestamp(),
336 Some(time_nano),
337 Precision::Nanosecond,
338 row,
339 )
340 } else {
341 row_writer::write_ts_to_millis(
342 table,
343 greptime_timestamp(),
344 Some(time_nano / 1000000),
345 Precision::Millisecond,
346 row,
347 )
348 }
349}
350
351fn write_data_point_value(
352 table: &mut TableData,
353 row: &mut Vec<Value>,
354 field: &str,
355 value: &Option<number_data_point::Value>,
356) -> Result<()> {
357 match value {
358 Some(number_data_point::Value::AsInt(val)) => {
359 row_writer::write_f64(table, field, *val as f64, row)?;
361 }
362 Some(number_data_point::Value::AsDouble(val)) => {
363 row_writer::write_f64(table, field, *val, row)?;
364 }
365 _ => {}
366 }
367 Ok(())
368}
369
370fn write_tags_and_timestamp(
371 table: &mut TableData,
372 row: &mut Vec<Value>,
373 resource_attrs: Option<&Vec<KeyValue>>,
374 scope_attrs: Option<&Vec<KeyValue>>,
375 data_point_attrs: Option<&Vec<KeyValue>>,
376 timestamp_nanos: i64,
377 metric_ctx: &OtlpMetricCtx,
378) -> Result<()> {
379 if metric_ctx.is_legacy {
380 write_attributes(
381 table,
382 row,
383 resource_attrs,
384 AttributeType::Legacy,
385 metric_ctx,
386 )?;
387 write_attributes(table, row, scope_attrs, AttributeType::Legacy, metric_ctx)?;
388 write_attributes(
389 table,
390 row,
391 data_point_attrs,
392 AttributeType::Legacy,
393 metric_ctx,
394 )?;
395 } else {
396 write_attributes(
398 table,
399 row,
400 resource_attrs,
401 AttributeType::Resource,
402 metric_ctx,
403 )?;
404 write_attributes(table, row, scope_attrs, AttributeType::Scope, metric_ctx)?;
405 write_attributes(
406 table,
407 row,
408 data_point_attrs,
409 AttributeType::DataPoint,
410 metric_ctx,
411 )?;
412 }
413
414 write_timestamp(table, row, timestamp_nanos, metric_ctx.is_legacy)?;
415
416 Ok(())
417}
418
419fn encode_gauge(
424 table_writer: &mut MultiTableData,
425 name: &str,
426 gauge: &Gauge,
427 resource_attrs: Option<&Vec<KeyValue>>,
428 scope_attrs: Option<&Vec<KeyValue>>,
429 metric_ctx: &OtlpMetricCtx,
430) -> Result<()> {
431 let table = table_writer.get_or_default_table_data(
432 name,
433 APPROXIMATE_COLUMN_COUNT,
434 gauge.data_points.len(),
435 );
436
437 for data_point in &gauge.data_points {
438 let mut row = table.alloc_one_row();
439 write_tags_and_timestamp(
440 table,
441 &mut row,
442 resource_attrs,
443 scope_attrs,
444 Some(data_point.attributes.as_ref()),
445 data_point.time_unix_nano as i64,
446 metric_ctx,
447 )?;
448
449 write_data_point_value(table, &mut row, greptime_value(), &data_point.value)?;
450 table.add_row(row);
451 }
452
453 Ok(())
454}
455
456fn encode_sum(
460 table_writer: &mut MultiTableData,
461 name: &str,
462 sum: &Sum,
463 resource_attrs: Option<&Vec<KeyValue>>,
464 scope_attrs: Option<&Vec<KeyValue>>,
465 metric_ctx: &OtlpMetricCtx,
466) -> Result<()> {
467 let table = table_writer.get_or_default_table_data(
468 name,
469 APPROXIMATE_COLUMN_COUNT,
470 sum.data_points.len(),
471 );
472
473 for data_point in &sum.data_points {
474 let mut row = table.alloc_one_row();
475 write_tags_and_timestamp(
476 table,
477 &mut row,
478 resource_attrs,
479 scope_attrs,
480 Some(data_point.attributes.as_ref()),
481 data_point.time_unix_nano as i64,
482 metric_ctx,
483 )?;
484 write_data_point_value(table, &mut row, greptime_value(), &data_point.value)?;
485 table.add_row(row);
486 }
487
488 Ok(())
489}
490
491const HISTOGRAM_LE_COLUMN: &str = "le";
492
493fn encode_histogram(
505 table_writer: &mut MultiTableData,
506 name: &str,
507 hist: &Histogram,
508 resource_attrs: Option<&Vec<KeyValue>>,
509 scope_attrs: Option<&Vec<KeyValue>>,
510 metric_ctx: &OtlpMetricCtx,
511) -> Result<()> {
512 let normalized_name = name;
513
514 let bucket_table_name = format!("{}_bucket", normalized_name);
515 let sum_table_name = format!("{}_sum", normalized_name);
516 let count_table_name = format!("{}_count", normalized_name);
517
518 let data_points_len = hist.data_points.len();
519 let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
521 let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
522 let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
523
524 for data_point in &hist.data_points {
525 let mut accumulated_count = 0;
526 for (idx, count) in data_point.bucket_counts.iter().enumerate() {
527 let mut bucket_row = bucket_table.alloc_one_row();
528 write_tags_and_timestamp(
529 &mut bucket_table,
530 &mut bucket_row,
531 resource_attrs,
532 scope_attrs,
533 Some(data_point.attributes.as_ref()),
534 data_point.time_unix_nano as i64,
535 metric_ctx,
536 )?;
537
538 if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
539 row_writer::write_tag(
540 &mut bucket_table,
541 HISTOGRAM_LE_COLUMN,
542 upper_bounds,
543 &mut bucket_row,
544 )?;
545 } else if idx == data_point.explicit_bounds.len() {
546 row_writer::write_tag(
548 &mut bucket_table,
549 HISTOGRAM_LE_COLUMN,
550 f64::INFINITY,
551 &mut bucket_row,
552 )?;
553 }
554
555 accumulated_count += count;
556 row_writer::write_f64(
557 &mut bucket_table,
558 greptime_value(),
559 accumulated_count as f64,
560 &mut bucket_row,
561 )?;
562
563 bucket_table.add_row(bucket_row);
564 }
565
566 if let Some(sum) = data_point.sum {
567 let mut sum_row = sum_table.alloc_one_row();
568 write_tags_and_timestamp(
569 &mut sum_table,
570 &mut sum_row,
571 resource_attrs,
572 scope_attrs,
573 Some(data_point.attributes.as_ref()),
574 data_point.time_unix_nano as i64,
575 metric_ctx,
576 )?;
577
578 row_writer::write_f64(&mut sum_table, greptime_value(), sum, &mut sum_row)?;
579 sum_table.add_row(sum_row);
580 }
581
582 let mut count_row = count_table.alloc_one_row();
583 write_tags_and_timestamp(
584 &mut count_table,
585 &mut count_row,
586 resource_attrs,
587 scope_attrs,
588 Some(data_point.attributes.as_ref()),
589 data_point.time_unix_nano as i64,
590 metric_ctx,
591 )?;
592
593 row_writer::write_f64(
594 &mut count_table,
595 greptime_value(),
596 data_point.count as f64,
597 &mut count_row,
598 )?;
599 count_table.add_row(count_row);
600 }
601
602 table_writer.add_table_data(bucket_table_name, bucket_table);
603 table_writer.add_table_data(sum_table_name, sum_table);
604 table_writer.add_table_data(count_table_name, count_table);
605
606 Ok(())
607}
608
609#[allow(dead_code)]
610fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
611 Ok(())
613}
614
615fn encode_summary(
616 table_writer: &mut MultiTableData,
617 name: &str,
618 summary: &Summary,
619 resource_attrs: Option<&Vec<KeyValue>>,
620 scope_attrs: Option<&Vec<KeyValue>>,
621 metric_ctx: &OtlpMetricCtx,
622) -> Result<()> {
623 if metric_ctx.is_legacy {
624 let table = table_writer.get_or_default_table_data(
625 name,
626 APPROXIMATE_COLUMN_COUNT,
627 summary.data_points.len(),
628 );
629
630 for data_point in &summary.data_points {
631 let mut row = table.alloc_one_row();
632 write_tags_and_timestamp(
633 table,
634 &mut row,
635 resource_attrs,
636 scope_attrs,
637 Some(data_point.attributes.as_ref()),
638 data_point.time_unix_nano as i64,
639 metric_ctx,
640 )?;
641
642 for quantile in &data_point.quantile_values {
643 row_writer::write_f64(
644 table,
645 format!("greptime_p{:02}", quantile.quantile * 100f64),
646 quantile.value,
647 &mut row,
648 )?;
649 }
650
651 row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
652 table.add_row(row);
653 }
654 } else {
655 let metric_name = name;
660 let count_name = format!("{}{}", metric_name, COUNT_TABLE_SUFFIX);
661 let sum_name = format!("{}{}", metric_name, SUM_TABLE_SUFFIX);
662
663 for data_point in &summary.data_points {
664 {
665 let quantile_table = table_writer.get_or_default_table_data(
666 metric_name,
667 APPROXIMATE_COLUMN_COUNT,
668 summary.data_points.len(),
669 );
670
671 for quantile in &data_point.quantile_values {
672 let mut row = quantile_table.alloc_one_row();
673 write_tags_and_timestamp(
674 quantile_table,
675 &mut row,
676 resource_attrs,
677 scope_attrs,
678 Some(data_point.attributes.as_ref()),
679 data_point.time_unix_nano as i64,
680 metric_ctx,
681 )?;
682 row_writer::write_tag(quantile_table, "quantile", quantile.quantile, &mut row)?;
683 row_writer::write_f64(
684 quantile_table,
685 greptime_value(),
686 quantile.value,
687 &mut row,
688 )?;
689 quantile_table.add_row(row);
690 }
691 }
692 {
693 let count_table = table_writer.get_or_default_table_data(
694 &count_name,
695 APPROXIMATE_COLUMN_COUNT,
696 summary.data_points.len(),
697 );
698 let mut row = count_table.alloc_one_row();
699 write_tags_and_timestamp(
700 count_table,
701 &mut row,
702 resource_attrs,
703 scope_attrs,
704 Some(data_point.attributes.as_ref()),
705 data_point.time_unix_nano as i64,
706 metric_ctx,
707 )?;
708
709 row_writer::write_f64(
710 count_table,
711 greptime_value(),
712 data_point.count as f64,
713 &mut row,
714 )?;
715
716 count_table.add_row(row);
717 }
718 {
719 let sum_table = table_writer.get_or_default_table_data(
720 &sum_name,
721 APPROXIMATE_COLUMN_COUNT,
722 summary.data_points.len(),
723 );
724
725 let mut row = sum_table.alloc_one_row();
726 write_tags_and_timestamp(
727 sum_table,
728 &mut row,
729 resource_attrs,
730 scope_attrs,
731 Some(data_point.attributes.as_ref()),
732 data_point.time_unix_nano as i64,
733 metric_ctx,
734 )?;
735
736 row_writer::write_f64(sum_table, greptime_value(), data_point.sum, &mut row)?;
737
738 sum_table.add_row(row);
739 }
740 }
741 }
742
743 Ok(())
744}
745
746#[cfg(test)]
747mod tests {
748 use otel_arrow_rust::proto::opentelemetry::common::v1::AnyValue;
749 use otel_arrow_rust::proto::opentelemetry::common::v1::any_value::Value as Val;
750 use otel_arrow_rust::proto::opentelemetry::metrics::v1::number_data_point::Value;
751 use otel_arrow_rust::proto::opentelemetry::metrics::v1::summary_data_point::ValueAtQuantile;
752 use otel_arrow_rust::proto::opentelemetry::metrics::v1::{
753 AggregationTemporality, HistogramDataPoint, NumberDataPoint, SummaryDataPoint,
754 };
755
756 use super::*;
757
758 fn keyvalue(key: &str, value: &str) -> KeyValue {
759 KeyValue {
760 key: key.into(),
761 value: Some(AnyValue {
762 value: Some(Val::StringValue(value.into())),
763 }),
764 }
765 }
766
767 #[test]
768 fn test_encode_gauge() {
769 let mut tables = MultiTableData::default();
770
771 let data_points = vec![
772 NumberDataPoint {
773 attributes: vec![keyvalue("host", "testsevrer")],
774 time_unix_nano: 100,
775 value: Some(Value::AsInt(100)),
776 ..Default::default()
777 },
778 NumberDataPoint {
779 attributes: vec![keyvalue("host", "testserver")],
780 time_unix_nano: 105,
781 value: Some(Value::AsInt(105)),
782 ..Default::default()
783 },
784 ];
785 let gauge = Gauge { data_points };
786 encode_gauge(
787 &mut tables,
788 "datamon",
789 &gauge,
790 Some(&vec![]),
791 Some(&vec![keyvalue("scope", "otel")]),
792 &OtlpMetricCtx::default(),
793 )
794 .unwrap();
795
796 let table = tables.get_or_default_table_data("datamon", 0, 0);
797 assert_eq!(table.num_rows(), 2);
798 assert_eq!(table.num_columns(), 4);
799 assert_eq!(
800 table
801 .columns()
802 .iter()
803 .map(|c| &c.column_name)
804 .collect::<Vec<&String>>(),
805 vec![
806 "otel_scope_scope",
807 "host",
808 greptime_timestamp(),
809 greptime_value()
810 ]
811 );
812 }
813
814 #[test]
815 fn test_encode_sum() {
816 let mut tables = MultiTableData::default();
817
818 let data_points = vec![
819 NumberDataPoint {
820 attributes: vec![keyvalue("host", "testserver")],
821 time_unix_nano: 100,
822 value: Some(Value::AsInt(100)),
823 ..Default::default()
824 },
825 NumberDataPoint {
826 attributes: vec![keyvalue("host", "testserver")],
827 time_unix_nano: 105,
828 value: Some(Value::AsInt(0)),
829 ..Default::default()
830 },
831 ];
832 let sum = Sum {
833 data_points,
834 ..Default::default()
835 };
836 encode_sum(
837 &mut tables,
838 "datamon",
839 &sum,
840 Some(&vec![]),
841 Some(&vec![keyvalue("scope", "otel")]),
842 &OtlpMetricCtx::default(),
843 )
844 .unwrap();
845
846 let table = tables.get_or_default_table_data("datamon", 0, 0);
847 assert_eq!(table.num_rows(), 2);
848 assert_eq!(table.num_columns(), 4);
849 assert_eq!(
850 table
851 .columns()
852 .iter()
853 .map(|c| &c.column_name)
854 .collect::<Vec<&String>>(),
855 vec![
856 "otel_scope_scope",
857 "host",
858 greptime_timestamp(),
859 greptime_value()
860 ]
861 );
862 }
863
864 #[test]
865 fn test_encode_summary() {
866 let mut tables = MultiTableData::default();
867
868 let data_points = vec![SummaryDataPoint {
869 attributes: vec![keyvalue("host", "testserver")],
870 time_unix_nano: 100,
871 count: 25,
872 sum: 5400.0,
873 quantile_values: vec![
874 ValueAtQuantile {
875 quantile: 0.90,
876 value: 1000.0,
877 },
878 ValueAtQuantile {
879 quantile: 0.95,
880 value: 3030.0,
881 },
882 ],
883 ..Default::default()
884 }];
885 let summary = Summary { data_points };
886 encode_summary(
887 &mut tables,
888 "datamon",
889 &summary,
890 Some(&vec![]),
891 Some(&vec![keyvalue("scope", "otel")]),
892 &OtlpMetricCtx::default(),
893 )
894 .unwrap();
895
896 let table = tables.get_or_default_table_data("datamon", 0, 0);
897 assert_eq!(table.num_rows(), 2);
898 assert_eq!(table.num_columns(), 5);
899 assert_eq!(
900 table
901 .columns()
902 .iter()
903 .map(|c| &c.column_name)
904 .collect::<Vec<&String>>(),
905 vec![
906 "otel_scope_scope",
907 "host",
908 greptime_timestamp(),
909 "quantile",
910 greptime_value()
911 ]
912 );
913
914 let table = tables.get_or_default_table_data("datamon_count", 0, 0);
915 assert_eq!(table.num_rows(), 1);
916 assert_eq!(table.num_columns(), 4);
917 assert_eq!(
918 table
919 .columns()
920 .iter()
921 .map(|c| &c.column_name)
922 .collect::<Vec<&String>>(),
923 vec![
924 "otel_scope_scope",
925 "host",
926 greptime_timestamp(),
927 greptime_value()
928 ]
929 );
930
931 let table = tables.get_or_default_table_data("datamon_sum", 0, 0);
932 assert_eq!(table.num_rows(), 1);
933 assert_eq!(table.num_columns(), 4);
934 assert_eq!(
935 table
936 .columns()
937 .iter()
938 .map(|c| &c.column_name)
939 .collect::<Vec<&String>>(),
940 vec![
941 "otel_scope_scope",
942 "host",
943 greptime_timestamp(),
944 greptime_value()
945 ]
946 );
947 }
948
949 #[test]
950 fn test_encode_histogram() {
951 let mut tables = MultiTableData::default();
952
953 let data_points = vec![HistogramDataPoint {
954 attributes: vec![keyvalue("host", "testserver")],
955 time_unix_nano: 100,
956 start_time_unix_nano: 23,
957 count: 25,
958 sum: Some(100.),
959 max: Some(200.),
960 min: Some(0.03),
961 bucket_counts: vec![2, 4, 6, 9, 4],
962 explicit_bounds: vec![0.1, 1., 10., 100.],
963 ..Default::default()
964 }];
965
966 let histogram = Histogram {
967 data_points,
968 aggregation_temporality: AggregationTemporality::Delta.into(),
969 };
970 encode_histogram(
971 &mut tables,
972 "histo",
973 &histogram,
974 Some(&vec![]),
975 Some(&vec![keyvalue("scope", "otel")]),
976 &OtlpMetricCtx::default(),
977 )
978 .unwrap();
979
980 assert_eq!(3, tables.num_tables());
981
982 let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
984 assert_eq!(bucket_table.num_rows(), 5);
985 assert_eq!(bucket_table.num_columns(), 5);
986 assert_eq!(
987 bucket_table
988 .columns()
989 .iter()
990 .map(|c| &c.column_name)
991 .collect::<Vec<&String>>(),
992 vec![
993 "otel_scope_scope",
994 "host",
995 greptime_timestamp(),
996 "le",
997 greptime_value(),
998 ]
999 );
1000
1001 let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
1002 assert_eq!(sum_table.num_rows(), 1);
1003 assert_eq!(sum_table.num_columns(), 4);
1004 assert_eq!(
1005 sum_table
1006 .columns()
1007 .iter()
1008 .map(|c| &c.column_name)
1009 .collect::<Vec<&String>>(),
1010 vec![
1011 "otel_scope_scope",
1012 "host",
1013 greptime_timestamp(),
1014 greptime_value()
1015 ]
1016 );
1017
1018 let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
1019 assert_eq!(count_table.num_rows(), 1);
1020 assert_eq!(count_table.num_columns(), 4);
1021 assert_eq!(
1022 count_table
1023 .columns()
1024 .iter()
1025 .map(|c| &c.column_name)
1026 .collect::<Vec<&String>>(),
1027 vec![
1028 "otel_scope_scope",
1029 "host",
1030 greptime_timestamp(),
1031 greptime_value()
1032 ]
1033 );
1034 }
1035}