1use ahash::{HashMap, HashSet};
16use api::v1::{RowInsertRequests, Value};
17use common_grpc::precision::Precision;
18use common_query::prelude::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
19use itertools::Itertools;
20use lazy_static::lazy_static;
21use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
22use otel_arrow_rust::proto::opentelemetry::common::v1::{any_value, AnyValue, KeyValue};
23use otel_arrow_rust::proto::opentelemetry::metrics::v1::{metric, number_data_point, *};
24use regex::Regex;
25use session::protocol_ctx::{MetricType, OtlpMetricCtx};
26
27use crate::error::Result;
28use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME};
29use crate::row_writer::{self, MultiTableData, TableData};
30
31const APPROXIMATE_COLUMN_COUNT: usize = 8;
33
34const COUNT_TABLE_SUFFIX: &str = "_count";
35const SUM_TABLE_SUFFIX: &str = "_sum";
36
37const JOB_KEY: &str = "job";
38const INSTANCE_KEY: &str = "instance";
39
40const UNDERSCORE: &str = "_";
41const DOUBLE_UNDERSCORE: &str = "__";
42const TOTAL: &str = "total";
43const RATIO: &str = "ratio";
44
45const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [
47 "service.instance.id",
48 "service.name",
49 "service.namespace",
50 "service.version",
51 "cloud.availability_zone",
52 "cloud.region",
53 "container.name",
54 "deployment.environment",
55 "deployment.environment.name",
56 "k8s.cluster.name",
57 "k8s.container.name",
58 "k8s.cronjob.name",
59 "k8s.daemonset.name",
60 "k8s.deployment.name",
61 "k8s.job.name",
62 "k8s.namespace.name",
63 "k8s.pod.name",
64 "k8s.replicaset.name",
65 "k8s.statefulset.name",
66];
67
68lazy_static! {
69 static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet<String> =
70 HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string()));
71 static ref NON_ALPHA_NUM_CHAR: Regex = Regex::new(r"[^a-zA-Z0-9]").unwrap();
72 static ref UNIT_MAP: HashMap<String, String> = [
73 ("d", "days"),
75 ("h", "hours"),
76 ("min", "minutes"),
77 ("s", "seconds"),
78 ("ms", "milliseconds"),
79 ("us", "microseconds"),
80 ("ns", "nanoseconds"),
81 ("By", "bytes"),
83 ("KiBy", "kibibytes"),
84 ("MiBy", "mebibytes"),
85 ("GiBy", "gibibytes"),
86 ("TiBy", "tibibytes"),
87 ("KBy", "kilobytes"),
88 ("MBy", "megabytes"),
89 ("GBy", "gigabytes"),
90 ("TBy", "terabytes"),
91 ("m", "meters"),
93 ("V", "volts"),
94 ("A", "amperes"),
95 ("J", "joules"),
96 ("W", "watts"),
97 ("g", "grams"),
98 ("Cel", "celsius"),
100 ("Hz", "hertz"),
101 ("1", ""),
102 ("%", "percent"),
103 ].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect();
104 static ref PER_UNIT_MAP: HashMap<String, String> = [
105 ("s", "second"),
106 ("m", "minute"),
107 ("h", "hour"),
108 ("d", "day"),
109 ("w", "week"),
110 ("mo", "month"),
111 ("y", "year"),
112 ].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect();
113}
114
115const OTEL_SCOPE_NAME: &str = "name";
116const OTEL_SCOPE_VERSION: &str = "version";
117const OTEL_SCOPE_SCHEMA_URL: &str = "schema_url";
118
119pub fn to_grpc_insert_requests(
127 request: ExportMetricsServiceRequest,
128 metric_ctx: &mut OtlpMetricCtx,
129) -> Result<(RowInsertRequests, usize)> {
130 let mut table_writer = MultiTableData::default();
131
132 for resource in &request.resource_metrics {
133 let resource_attrs = resource.resource.as_ref().map(|r| {
134 let mut attrs = r.attributes.clone();
135 process_resource_attrs(&mut attrs, metric_ctx);
136 attrs
137 });
138
139 for scope in &resource.scope_metrics {
140 let scope_attrs = process_scope_attrs(scope, metric_ctx);
141
142 for metric in &scope.metrics {
143 if metric.data.is_none() {
144 continue;
145 }
146 if let Some(t) = metric.data.as_ref().map(from_metric_type) {
147 metric_ctx.set_metric_type(t);
148 }
149
150 encode_metrics(
151 &mut table_writer,
152 metric,
153 resource_attrs.as_ref(),
154 scope_attrs.as_ref(),
155 metric_ctx,
156 )?;
157 }
158 }
159 }
160
161 Ok(table_writer.into_row_insert_requests())
162}
163
164fn from_metric_type(data: &metric::Data) -> MetricType {
165 match data {
166 metric::Data::Gauge(_) => MetricType::Gauge,
167 metric::Data::Sum(s) => {
168 if s.is_monotonic {
169 MetricType::MonotonicSum
170 } else {
171 MetricType::NonMonotonicSum
172 }
173 }
174 metric::Data::Histogram(_) => MetricType::Histogram,
175 metric::Data::ExponentialHistogram(_) => MetricType::ExponentialHistogram,
176 metric::Data::Summary(_) => MetricType::Summary,
177 }
178}
179
180fn process_resource_attrs(attrs: &mut Vec<KeyValue>, metric_ctx: &OtlpMetricCtx) {
181 if metric_ctx.is_legacy {
182 return;
183 }
184
185 let mut tmp = Vec::with_capacity(2);
187 for kv in attrs.iter() {
188 match &kv.key as &str {
189 KEY_SERVICE_NAME => {
190 tmp.push(KeyValue {
191 key: JOB_KEY.to_string(),
192 value: kv.value.clone(),
193 });
194 }
195 KEY_SERVICE_INSTANCE_ID => {
196 tmp.push(KeyValue {
197 key: INSTANCE_KEY.to_string(),
198 value: kv.value.clone(),
199 });
200 }
201 _ => {}
202 }
203 }
204
205 if metric_ctx.promote_all_resource_attrs {
207 attrs.retain(|kv| !metric_ctx.resource_attrs.contains(&kv.key));
208 } else {
209 attrs.retain(|kv| {
210 metric_ctx.resource_attrs.contains(&kv.key)
211 || DEFAULT_PROMOTE_ATTRS_SET.contains(&kv.key)
212 });
213 }
214
215 attrs.extend(tmp);
216}
217
218fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Option<Vec<KeyValue>> {
219 if metric_ctx.is_legacy {
220 return scope.scope.as_ref().map(|s| s.attributes.clone());
221 };
222
223 if !metric_ctx.promote_scope_attrs {
224 return None;
225 }
226
227 scope.scope.as_ref().map(|s| {
229 let mut attrs = s.attributes.clone();
230 attrs.push(KeyValue {
231 key: OTEL_SCOPE_NAME.to_string(),
232 value: Some(AnyValue {
233 value: Some(any_value::Value::StringValue(s.name.clone())),
234 }),
235 });
236 attrs.push(KeyValue {
237 key: OTEL_SCOPE_VERSION.to_string(),
238 value: Some(AnyValue {
239 value: Some(any_value::Value::StringValue(s.version.clone())),
240 }),
241 });
242 attrs.push(KeyValue {
243 key: OTEL_SCOPE_SCHEMA_URL.to_string(),
244 value: Some(AnyValue {
245 value: Some(any_value::Value::StringValue(scope.schema_url.clone())),
246 }),
247 });
248 attrs
249 })
250}
251
252pub fn normalize_metric_name(metric: &Metric, metric_type: &MetricType) -> String {
254 let mut name_tokens = NON_ALPHA_NUM_CHAR
255 .split(&metric.name)
256 .map(|s| s.to_string())
257 .collect_vec();
258 if !metric.unit.is_empty() {
259 let (main, per) = build_unit_suffix(&metric.unit);
260 if let Some(main) = main
261 && !name_tokens.contains(&main)
262 {
263 name_tokens.push(main);
264 }
265 if let Some(per) = per
266 && !name_tokens.contains(&per)
267 {
268 name_tokens.push("per".to_string());
269 name_tokens.push(per);
270 }
271 }
272
273 if matches!(metric_type, MetricType::MonotonicSum) {
274 name_tokens.retain(|t| t != TOTAL);
275 name_tokens.push(TOTAL.to_string());
276 }
277 if metric.unit == "1" && matches!(metric_type, MetricType::Gauge) {
278 name_tokens.retain(|t| t != RATIO);
279 name_tokens.push(RATIO.to_string());
280 }
281
282 let name = name_tokens.join(UNDERSCORE);
283
284 if let Some((_, first)) = name.char_indices().next()
285 && first >= '0'
286 && first <= '9'
287 {
288 format!("_{}", name)
289 } else {
290 name
291 }
292}
293
294fn build_unit_suffix(unit: &str) -> (Option<String>, Option<String>) {
295 let (main, per) = unit.split_once('/').unwrap_or((unit, ""));
296 (check_unit(main, &UNIT_MAP), check_unit(per, &PER_UNIT_MAP))
297}
298
299fn check_unit(unit_str: &str, unit_map: &HashMap<String, String>) -> Option<String> {
300 let u = unit_str.trim();
301 if !u.is_empty() && !u.contains("{}") {
302 let u = unit_map.get(u).map(|s| s.as_ref()).unwrap_or(u);
303 let u = clean_unit_name(u);
304 if !u.is_empty() {
305 return Some(u);
306 }
307 }
308 None
309}
310
311fn clean_unit_name(name: &str) -> String {
312 NON_ALPHA_NUM_CHAR.split(name).join(UNDERSCORE)
313}
314
315pub fn normalize_label_name(name: &str) -> String {
317 if name.is_empty() {
318 return name.to_string();
319 }
320
321 let n = NON_ALPHA_NUM_CHAR.replace_all(name, UNDERSCORE);
322 if let Some((_, first)) = n.char_indices().next()
323 && first >= '0'
324 && first <= '9'
325 {
326 return format!("key_{}", n);
327 }
328 if n.starts_with(UNDERSCORE) && !n.starts_with(DOUBLE_UNDERSCORE) {
329 return format!("key{}", n);
330 }
331 n.to_string()
332}
333
334pub fn legacy_normalize_otlp_name(name: &str) -> String {
341 name.to_lowercase().replace(['.', '-'], "_")
342}
343
344fn encode_metrics(
345 table_writer: &mut MultiTableData,
346 metric: &Metric,
347 resource_attrs: Option<&Vec<KeyValue>>,
348 scope_attrs: Option<&Vec<KeyValue>>,
349 metric_ctx: &OtlpMetricCtx,
350) -> Result<()> {
351 let name = if metric_ctx.is_legacy {
352 legacy_normalize_otlp_name(&metric.name)
353 } else {
354 normalize_metric_name(metric, &metric_ctx.metric_type)
355 };
356
357 if let Some(data) = &metric.data {
360 match data {
361 metric::Data::Gauge(gauge) => {
362 encode_gauge(
363 table_writer,
364 &name,
365 gauge,
366 resource_attrs,
367 scope_attrs,
368 metric_ctx,
369 )?;
370 }
371 metric::Data::Sum(sum) => {
372 encode_sum(
373 table_writer,
374 &name,
375 sum,
376 resource_attrs,
377 scope_attrs,
378 metric_ctx,
379 )?;
380 }
381 metric::Data::Summary(summary) => {
382 encode_summary(
383 table_writer,
384 &name,
385 summary,
386 resource_attrs,
387 scope_attrs,
388 metric_ctx,
389 )?;
390 }
391 metric::Data::Histogram(hist) => {
392 encode_histogram(
393 table_writer,
394 &name,
395 hist,
396 resource_attrs,
397 scope_attrs,
398 metric_ctx,
399 )?;
400 }
401 metric::Data::ExponentialHistogram(_hist) => {}
403 }
404 }
405
406 Ok(())
407}
408
409#[derive(Debug, Clone, Copy, PartialEq, Eq)]
410enum AttributeType {
411 Resource,
412 Scope,
413 DataPoint,
414 Legacy,
415}
416
417fn write_attributes(
418 writer: &mut TableData,
419 row: &mut Vec<Value>,
420 attrs: Option<&Vec<KeyValue>>,
421 attribute_type: AttributeType,
422) -> Result<()> {
423 let Some(attrs) = attrs else {
424 return Ok(());
425 };
426
427 let tags = attrs.iter().filter_map(|attr| {
428 attr.value
429 .as_ref()
430 .and_then(|v| v.value.as_ref())
431 .and_then(|val| {
432 let key = match attribute_type {
433 AttributeType::Resource | AttributeType::DataPoint => {
434 normalize_label_name(&attr.key)
435 }
436 AttributeType::Scope => {
437 format!("otel_scope_{}", normalize_label_name(&attr.key))
438 }
439 AttributeType::Legacy => legacy_normalize_otlp_name(&attr.key),
440 };
441 match val {
442 any_value::Value::StringValue(s) => Some((key, s.clone())),
443 any_value::Value::IntValue(v) => Some((key, v.to_string())),
444 any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
445 _ => None, }
447 })
448 });
449 row_writer::write_tags(writer, tags, row)?;
450
451 Ok(())
452}
453
454fn write_timestamp(
455 table: &mut TableData,
456 row: &mut Vec<Value>,
457 time_nano: i64,
458 legacy_mode: bool,
459) -> Result<()> {
460 if legacy_mode {
461 row_writer::write_ts_to_nanos(
462 table,
463 GREPTIME_TIMESTAMP,
464 Some(time_nano),
465 Precision::Nanosecond,
466 row,
467 )
468 } else {
469 row_writer::write_ts_to_millis(
470 table,
471 GREPTIME_TIMESTAMP,
472 Some(time_nano / 1000000),
473 Precision::Millisecond,
474 row,
475 )
476 }
477}
478
479fn write_data_point_value(
480 table: &mut TableData,
481 row: &mut Vec<Value>,
482 field: &str,
483 value: &Option<number_data_point::Value>,
484) -> Result<()> {
485 match value {
486 Some(number_data_point::Value::AsInt(val)) => {
487 row_writer::write_f64(table, field, *val as f64, row)?;
489 }
490 Some(number_data_point::Value::AsDouble(val)) => {
491 row_writer::write_f64(table, field, *val, row)?;
492 }
493 _ => {}
494 }
495 Ok(())
496}
497
498fn write_tags_and_timestamp(
499 table: &mut TableData,
500 row: &mut Vec<Value>,
501 resource_attrs: Option<&Vec<KeyValue>>,
502 scope_attrs: Option<&Vec<KeyValue>>,
503 data_point_attrs: Option<&Vec<KeyValue>>,
504 timestamp_nanos: i64,
505 metric_ctx: &OtlpMetricCtx,
506) -> Result<()> {
507 if metric_ctx.is_legacy {
508 write_attributes(table, row, resource_attrs, AttributeType::Legacy)?;
509 write_attributes(table, row, scope_attrs, AttributeType::Legacy)?;
510 write_attributes(table, row, data_point_attrs, AttributeType::Legacy)?;
511 } else {
512 write_attributes(table, row, resource_attrs, AttributeType::Resource)?;
514 write_attributes(table, row, scope_attrs, AttributeType::Scope)?;
515 write_attributes(table, row, data_point_attrs, AttributeType::DataPoint)?;
516 }
517
518 write_timestamp(table, row, timestamp_nanos, metric_ctx.is_legacy)?;
519
520 Ok(())
521}
522
523fn encode_gauge(
528 table_writer: &mut MultiTableData,
529 name: &str,
530 gauge: &Gauge,
531 resource_attrs: Option<&Vec<KeyValue>>,
532 scope_attrs: Option<&Vec<KeyValue>>,
533 metric_ctx: &OtlpMetricCtx,
534) -> Result<()> {
535 let table = table_writer.get_or_default_table_data(
536 name,
537 APPROXIMATE_COLUMN_COUNT,
538 gauge.data_points.len(),
539 );
540
541 for data_point in &gauge.data_points {
542 let mut row = table.alloc_one_row();
543 write_tags_and_timestamp(
544 table,
545 &mut row,
546 resource_attrs,
547 scope_attrs,
548 Some(data_point.attributes.as_ref()),
549 data_point.time_unix_nano as i64,
550 metric_ctx,
551 )?;
552
553 write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
554 table.add_row(row);
555 }
556
557 Ok(())
558}
559
560fn encode_sum(
564 table_writer: &mut MultiTableData,
565 name: &str,
566 sum: &Sum,
567 resource_attrs: Option<&Vec<KeyValue>>,
568 scope_attrs: Option<&Vec<KeyValue>>,
569 metric_ctx: &OtlpMetricCtx,
570) -> Result<()> {
571 let table = table_writer.get_or_default_table_data(
572 name,
573 APPROXIMATE_COLUMN_COUNT,
574 sum.data_points.len(),
575 );
576
577 for data_point in &sum.data_points {
578 let mut row = table.alloc_one_row();
579 write_tags_and_timestamp(
580 table,
581 &mut row,
582 resource_attrs,
583 scope_attrs,
584 Some(data_point.attributes.as_ref()),
585 data_point.time_unix_nano as i64,
586 metric_ctx,
587 )?;
588 write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
589 table.add_row(row);
590 }
591
592 Ok(())
593}
594
595const HISTOGRAM_LE_COLUMN: &str = "le";
596
597fn encode_histogram(
609 table_writer: &mut MultiTableData,
610 name: &str,
611 hist: &Histogram,
612 resource_attrs: Option<&Vec<KeyValue>>,
613 scope_attrs: Option<&Vec<KeyValue>>,
614 metric_ctx: &OtlpMetricCtx,
615) -> Result<()> {
616 let normalized_name = name;
617
618 let bucket_table_name = format!("{}_bucket", normalized_name);
619 let sum_table_name = format!("{}_sum", normalized_name);
620 let count_table_name = format!("{}_count", normalized_name);
621
622 let data_points_len = hist.data_points.len();
623 let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
625 let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
626 let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
627
628 for data_point in &hist.data_points {
629 let mut accumulated_count = 0;
630 for (idx, count) in data_point.bucket_counts.iter().enumerate() {
631 let mut bucket_row = bucket_table.alloc_one_row();
632 write_tags_and_timestamp(
633 &mut bucket_table,
634 &mut bucket_row,
635 resource_attrs,
636 scope_attrs,
637 Some(data_point.attributes.as_ref()),
638 data_point.time_unix_nano as i64,
639 metric_ctx,
640 )?;
641
642 if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
643 row_writer::write_tag(
644 &mut bucket_table,
645 HISTOGRAM_LE_COLUMN,
646 upper_bounds,
647 &mut bucket_row,
648 )?;
649 } else if idx == data_point.explicit_bounds.len() {
650 row_writer::write_tag(
652 &mut bucket_table,
653 HISTOGRAM_LE_COLUMN,
654 f64::INFINITY,
655 &mut bucket_row,
656 )?;
657 }
658
659 accumulated_count += count;
660 row_writer::write_f64(
661 &mut bucket_table,
662 GREPTIME_VALUE,
663 accumulated_count as f64,
664 &mut bucket_row,
665 )?;
666
667 bucket_table.add_row(bucket_row);
668 }
669
670 if let Some(sum) = data_point.sum {
671 let mut sum_row = sum_table.alloc_one_row();
672 write_tags_and_timestamp(
673 &mut sum_table,
674 &mut sum_row,
675 resource_attrs,
676 scope_attrs,
677 Some(data_point.attributes.as_ref()),
678 data_point.time_unix_nano as i64,
679 metric_ctx,
680 )?;
681
682 row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?;
683 sum_table.add_row(sum_row);
684 }
685
686 let mut count_row = count_table.alloc_one_row();
687 write_tags_and_timestamp(
688 &mut count_table,
689 &mut count_row,
690 resource_attrs,
691 scope_attrs,
692 Some(data_point.attributes.as_ref()),
693 data_point.time_unix_nano as i64,
694 metric_ctx,
695 )?;
696
697 row_writer::write_f64(
698 &mut count_table,
699 GREPTIME_VALUE,
700 data_point.count as f64,
701 &mut count_row,
702 )?;
703 count_table.add_row(count_row);
704 }
705
706 table_writer.add_table_data(bucket_table_name, bucket_table);
707 table_writer.add_table_data(sum_table_name, sum_table);
708 table_writer.add_table_data(count_table_name, count_table);
709
710 Ok(())
711}
712
713#[allow(dead_code)]
714fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
715 Ok(())
717}
718
719fn encode_summary(
720 table_writer: &mut MultiTableData,
721 name: &str,
722 summary: &Summary,
723 resource_attrs: Option<&Vec<KeyValue>>,
724 scope_attrs: Option<&Vec<KeyValue>>,
725 metric_ctx: &OtlpMetricCtx,
726) -> Result<()> {
727 if metric_ctx.is_legacy {
728 let table = table_writer.get_or_default_table_data(
729 name,
730 APPROXIMATE_COLUMN_COUNT,
731 summary.data_points.len(),
732 );
733
734 for data_point in &summary.data_points {
735 let mut row = table.alloc_one_row();
736 write_tags_and_timestamp(
737 table,
738 &mut row,
739 resource_attrs,
740 scope_attrs,
741 Some(data_point.attributes.as_ref()),
742 data_point.time_unix_nano as i64,
743 metric_ctx,
744 )?;
745
746 for quantile in &data_point.quantile_values {
747 row_writer::write_f64(
748 table,
749 format!("greptime_p{:02}", quantile.quantile * 100f64),
750 quantile.value,
751 &mut row,
752 )?;
753 }
754
755 row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
756 table.add_row(row);
757 }
758 } else {
759 let metric_name = name;
764 let count_name = format!("{}{}", metric_name, COUNT_TABLE_SUFFIX);
765 let sum_name = format!("{}{}", metric_name, SUM_TABLE_SUFFIX);
766
767 for data_point in &summary.data_points {
768 {
769 let quantile_table = table_writer.get_or_default_table_data(
770 metric_name,
771 APPROXIMATE_COLUMN_COUNT,
772 summary.data_points.len(),
773 );
774
775 for quantile in &data_point.quantile_values {
776 let mut row = quantile_table.alloc_one_row();
777 write_tags_and_timestamp(
778 quantile_table,
779 &mut row,
780 resource_attrs,
781 scope_attrs,
782 Some(data_point.attributes.as_ref()),
783 data_point.time_unix_nano as i64,
784 metric_ctx,
785 )?;
786 row_writer::write_tag(quantile_table, "quantile", quantile.quantile, &mut row)?;
787 row_writer::write_f64(
788 quantile_table,
789 GREPTIME_VALUE,
790 quantile.value,
791 &mut row,
792 )?;
793 quantile_table.add_row(row);
794 }
795 }
796 {
797 let count_table = table_writer.get_or_default_table_data(
798 &count_name,
799 APPROXIMATE_COLUMN_COUNT,
800 summary.data_points.len(),
801 );
802 let mut row = count_table.alloc_one_row();
803 write_tags_and_timestamp(
804 count_table,
805 &mut row,
806 resource_attrs,
807 scope_attrs,
808 Some(data_point.attributes.as_ref()),
809 data_point.time_unix_nano as i64,
810 metric_ctx,
811 )?;
812
813 row_writer::write_f64(
814 count_table,
815 GREPTIME_VALUE,
816 data_point.count as f64,
817 &mut row,
818 )?;
819
820 count_table.add_row(row);
821 }
822 {
823 let sum_table = table_writer.get_or_default_table_data(
824 &sum_name,
825 APPROXIMATE_COLUMN_COUNT,
826 summary.data_points.len(),
827 );
828
829 let mut row = sum_table.alloc_one_row();
830 write_tags_and_timestamp(
831 sum_table,
832 &mut row,
833 resource_attrs,
834 scope_attrs,
835 Some(data_point.attributes.as_ref()),
836 data_point.time_unix_nano as i64,
837 metric_ctx,
838 )?;
839
840 row_writer::write_f64(sum_table, GREPTIME_VALUE, data_point.sum, &mut row)?;
841
842 sum_table.add_row(row);
843 }
844 }
845 }
846
847 Ok(())
848}
849
850#[cfg(test)]
851mod tests {
852 use otel_arrow_rust::proto::opentelemetry::common::v1::any_value::Value as Val;
853 use otel_arrow_rust::proto::opentelemetry::common::v1::AnyValue;
854 use otel_arrow_rust::proto::opentelemetry::metrics::v1::number_data_point::Value;
855 use otel_arrow_rust::proto::opentelemetry::metrics::v1::summary_data_point::ValueAtQuantile;
856 use otel_arrow_rust::proto::opentelemetry::metrics::v1::{
857 AggregationTemporality, HistogramDataPoint, NumberDataPoint, SummaryDataPoint,
858 };
859
860 use super::*;
861
862 #[test]
863 fn test_legacy_normalize_otlp_name() {
864 assert_eq!(
865 legacy_normalize_otlp_name("jvm.memory.free"),
866 "jvm_memory_free"
867 );
868 assert_eq!(
869 legacy_normalize_otlp_name("jvm-memory-free"),
870 "jvm_memory_free"
871 );
872 assert_eq!(
873 legacy_normalize_otlp_name("jvm_memory_free"),
874 "jvm_memory_free"
875 );
876 assert_eq!(
877 legacy_normalize_otlp_name("JVM_MEMORY_FREE"),
878 "jvm_memory_free"
879 );
880 assert_eq!(
881 legacy_normalize_otlp_name("JVM_memory_FREE"),
882 "jvm_memory_free"
883 );
884 }
885
886 #[test]
887 fn test_normalize_metric_name() {
888 let test_cases = vec![
889 (Metric::default(), MetricType::Init, ""),
891 (
893 Metric {
894 name: "foo".to_string(),
895 ..Default::default()
896 },
897 MetricType::Init,
898 "foo",
899 ),
900 (
902 Metric {
903 name: "foo".to_string(),
904 unit: "s".to_string(),
905 ..Default::default()
906 },
907 MetricType::Init,
908 "foo_seconds",
909 ),
910 (
912 Metric {
913 name: "foo_seconds".to_string(),
914 unit: "s".to_string(),
915 ..Default::default()
916 },
917 MetricType::Init,
918 "foo_seconds",
919 ),
920 (
922 Metric {
923 name: "foo".to_string(),
924 ..Default::default()
925 },
926 MetricType::MonotonicSum,
927 "foo_total",
928 ),
929 (
931 Metric {
932 name: "foo_total".to_string(),
933 ..Default::default()
934 },
935 MetricType::MonotonicSum,
936 "foo_total",
937 ),
938 (
940 Metric {
941 name: "foo".to_string(),
942 unit: "s".to_string(),
943 ..Default::default()
944 },
945 MetricType::MonotonicSum,
946 "foo_seconds_total",
947 ),
948 (
950 Metric {
951 name: "foo_seconds".to_string(),
952 unit: "s".to_string(),
953 ..Default::default()
954 },
955 MetricType::MonotonicSum,
956 "foo_seconds_total",
957 ),
958 (
960 Metric {
961 name: "foo_total".to_string(),
962 unit: "s".to_string(),
963 ..Default::default()
964 },
965 MetricType::MonotonicSum,
966 "foo_seconds_total",
967 ),
968 (
970 Metric {
971 name: "foo_seconds_total".to_string(),
972 unit: "s".to_string(),
973 ..Default::default()
974 },
975 MetricType::MonotonicSum,
976 "foo_seconds_total",
977 ),
978 (
980 Metric {
981 name: "foo_total_seconds".to_string(),
982 unit: "s".to_string(),
983 ..Default::default()
984 },
985 MetricType::MonotonicSum,
986 "foo_seconds_total",
987 ),
988 (
990 Metric {
991 name: "foo".to_string(),
992 unit: "1".to_string(),
993 ..Default::default()
994 },
995 MetricType::Gauge,
996 "foo_ratio",
997 ),
998 (
1000 Metric {
1001 name: "foo".to_string(),
1002 unit: "m/s".to_string(),
1003 ..Default::default()
1004 },
1005 MetricType::Init,
1006 "foo_meters_per_second",
1007 ),
1008 (
1010 Metric {
1011 name: "foo_second".to_string(),
1012 unit: "m/s".to_string(),
1013 ..Default::default()
1014 },
1015 MetricType::Init,
1016 "foo_second_meters",
1017 ),
1018 (
1020 Metric {
1021 name: "foo_meters".to_string(),
1022 unit: "m/s".to_string(),
1023 ..Default::default()
1024 },
1025 MetricType::Init,
1026 "foo_meters_per_second",
1027 ),
1028 ];
1029
1030 for (metric, metric_type, expected) in test_cases {
1031 let result = normalize_metric_name(&metric, &metric_type);
1032 assert_eq!(
1033 result, expected,
1034 "Failed for metric name: '{}', unit: '{}', type: {:?}",
1035 metric.name, metric.unit, metric_type
1036 );
1037 }
1038 }
1039
1040 #[test]
1041 fn test_normalize_label_name() {
1042 let test_cases = vec![
1043 ("", ""),
1044 ("foo", "foo"),
1045 ("foo_bar/baz:abc", "foo_bar_baz_abc"),
1046 ("1foo", "key_1foo"),
1047 ("_foo", "key_foo"),
1048 ("__bar", "__bar"),
1049 ];
1050
1051 for (input, expected) in test_cases {
1052 let result = normalize_label_name(input);
1053 assert_eq!(
1054 result, expected,
1055 "unexpected result for input '{}'; got '{}'; want '{}'",
1056 input, result, expected
1057 );
1058 }
1059 }
1060
1061 fn keyvalue(key: &str, value: &str) -> KeyValue {
1062 KeyValue {
1063 key: key.into(),
1064 value: Some(AnyValue {
1065 value: Some(Val::StringValue(value.into())),
1066 }),
1067 }
1068 }
1069
1070 #[test]
1071 fn test_encode_gauge() {
1072 let mut tables = MultiTableData::default();
1073
1074 let data_points = vec![
1075 NumberDataPoint {
1076 attributes: vec![keyvalue("host", "testsevrer")],
1077 time_unix_nano: 100,
1078 value: Some(Value::AsInt(100)),
1079 ..Default::default()
1080 },
1081 NumberDataPoint {
1082 attributes: vec![keyvalue("host", "testserver")],
1083 time_unix_nano: 105,
1084 value: Some(Value::AsInt(105)),
1085 ..Default::default()
1086 },
1087 ];
1088 let gauge = Gauge { data_points };
1089 encode_gauge(
1090 &mut tables,
1091 "datamon",
1092 &gauge,
1093 Some(&vec![]),
1094 Some(&vec![keyvalue("scope", "otel")]),
1095 &OtlpMetricCtx::default(),
1096 )
1097 .unwrap();
1098
1099 let table = tables.get_or_default_table_data("datamon", 0, 0);
1100 assert_eq!(table.num_rows(), 2);
1101 assert_eq!(table.num_columns(), 4);
1102 assert_eq!(
1103 table
1104 .columns()
1105 .iter()
1106 .map(|c| &c.column_name)
1107 .collect::<Vec<&String>>(),
1108 vec![
1109 "otel_scope_scope",
1110 "host",
1111 "greptime_timestamp",
1112 "greptime_value"
1113 ]
1114 );
1115 }
1116
1117 #[test]
1118 fn test_encode_sum() {
1119 let mut tables = MultiTableData::default();
1120
1121 let data_points = vec![
1122 NumberDataPoint {
1123 attributes: vec![keyvalue("host", "testserver")],
1124 time_unix_nano: 100,
1125 value: Some(Value::AsInt(100)),
1126 ..Default::default()
1127 },
1128 NumberDataPoint {
1129 attributes: vec![keyvalue("host", "testserver")],
1130 time_unix_nano: 105,
1131 value: Some(Value::AsInt(0)),
1132 ..Default::default()
1133 },
1134 ];
1135 let sum = Sum {
1136 data_points,
1137 ..Default::default()
1138 };
1139 encode_sum(
1140 &mut tables,
1141 "datamon",
1142 &sum,
1143 Some(&vec![]),
1144 Some(&vec![keyvalue("scope", "otel")]),
1145 &OtlpMetricCtx::default(),
1146 )
1147 .unwrap();
1148
1149 let table = tables.get_or_default_table_data("datamon", 0, 0);
1150 assert_eq!(table.num_rows(), 2);
1151 assert_eq!(table.num_columns(), 4);
1152 assert_eq!(
1153 table
1154 .columns()
1155 .iter()
1156 .map(|c| &c.column_name)
1157 .collect::<Vec<&String>>(),
1158 vec![
1159 "otel_scope_scope",
1160 "host",
1161 "greptime_timestamp",
1162 "greptime_value"
1163 ]
1164 );
1165 }
1166
1167 #[test]
1168 fn test_encode_summary() {
1169 let mut tables = MultiTableData::default();
1170
1171 let data_points = vec![SummaryDataPoint {
1172 attributes: vec![keyvalue("host", "testserver")],
1173 time_unix_nano: 100,
1174 count: 25,
1175 sum: 5400.0,
1176 quantile_values: vec![
1177 ValueAtQuantile {
1178 quantile: 0.90,
1179 value: 1000.0,
1180 },
1181 ValueAtQuantile {
1182 quantile: 0.95,
1183 value: 3030.0,
1184 },
1185 ],
1186 ..Default::default()
1187 }];
1188 let summary = Summary { data_points };
1189 encode_summary(
1190 &mut tables,
1191 "datamon",
1192 &summary,
1193 Some(&vec![]),
1194 Some(&vec![keyvalue("scope", "otel")]),
1195 &OtlpMetricCtx::default(),
1196 )
1197 .unwrap();
1198
1199 let table = tables.get_or_default_table_data("datamon", 0, 0);
1200 assert_eq!(table.num_rows(), 2);
1201 assert_eq!(table.num_columns(), 5);
1202 assert_eq!(
1203 table
1204 .columns()
1205 .iter()
1206 .map(|c| &c.column_name)
1207 .collect::<Vec<&String>>(),
1208 vec![
1209 "otel_scope_scope",
1210 "host",
1211 "greptime_timestamp",
1212 "quantile",
1213 "greptime_value"
1214 ]
1215 );
1216
1217 let table = tables.get_or_default_table_data("datamon_count", 0, 0);
1218 assert_eq!(table.num_rows(), 1);
1219 assert_eq!(table.num_columns(), 4);
1220 assert_eq!(
1221 table
1222 .columns()
1223 .iter()
1224 .map(|c| &c.column_name)
1225 .collect::<Vec<&String>>(),
1226 vec![
1227 "otel_scope_scope",
1228 "host",
1229 "greptime_timestamp",
1230 "greptime_value"
1231 ]
1232 );
1233
1234 let table = tables.get_or_default_table_data("datamon_sum", 0, 0);
1235 assert_eq!(table.num_rows(), 1);
1236 assert_eq!(table.num_columns(), 4);
1237 assert_eq!(
1238 table
1239 .columns()
1240 .iter()
1241 .map(|c| &c.column_name)
1242 .collect::<Vec<&String>>(),
1243 vec![
1244 "otel_scope_scope",
1245 "host",
1246 "greptime_timestamp",
1247 "greptime_value"
1248 ]
1249 );
1250 }
1251
1252 #[test]
1253 fn test_encode_histogram() {
1254 let mut tables = MultiTableData::default();
1255
1256 let data_points = vec![HistogramDataPoint {
1257 attributes: vec![keyvalue("host", "testserver")],
1258 time_unix_nano: 100,
1259 start_time_unix_nano: 23,
1260 count: 25,
1261 sum: Some(100.),
1262 max: Some(200.),
1263 min: Some(0.03),
1264 bucket_counts: vec![2, 4, 6, 9, 4],
1265 explicit_bounds: vec![0.1, 1., 10., 100.],
1266 ..Default::default()
1267 }];
1268
1269 let histogram = Histogram {
1270 data_points,
1271 aggregation_temporality: AggregationTemporality::Delta.into(),
1272 };
1273 encode_histogram(
1274 &mut tables,
1275 "histo",
1276 &histogram,
1277 Some(&vec![]),
1278 Some(&vec![keyvalue("scope", "otel")]),
1279 &OtlpMetricCtx::default(),
1280 )
1281 .unwrap();
1282
1283 assert_eq!(3, tables.num_tables());
1284
1285 let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
1287 assert_eq!(bucket_table.num_rows(), 5);
1288 assert_eq!(bucket_table.num_columns(), 5);
1289 assert_eq!(
1290 bucket_table
1291 .columns()
1292 .iter()
1293 .map(|c| &c.column_name)
1294 .collect::<Vec<&String>>(),
1295 vec![
1296 "otel_scope_scope",
1297 "host",
1298 "greptime_timestamp",
1299 "le",
1300 "greptime_value",
1301 ]
1302 );
1303
1304 let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
1305 assert_eq!(sum_table.num_rows(), 1);
1306 assert_eq!(sum_table.num_columns(), 4);
1307 assert_eq!(
1308 sum_table
1309 .columns()
1310 .iter()
1311 .map(|c| &c.column_name)
1312 .collect::<Vec<&String>>(),
1313 vec![
1314 "otel_scope_scope",
1315 "host",
1316 "greptime_timestamp",
1317 "greptime_value"
1318 ]
1319 );
1320
1321 let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
1322 assert_eq!(count_table.num_rows(), 1);
1323 assert_eq!(count_table.num_columns(), 4);
1324 assert_eq!(
1325 count_table
1326 .columns()
1327 .iter()
1328 .map(|c| &c.column_name)
1329 .collect::<Vec<&String>>(),
1330 vec![
1331 "otel_scope_scope",
1332 "host",
1333 "greptime_timestamp",
1334 "greptime_value"
1335 ]
1336 );
1337 }
1338}