1use ahash::{HashMap, HashSet};
16use api::v1::{RowInsertRequests, Value};
17use common_grpc::precision::Precision;
18use common_query::prelude::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
19use lazy_static::lazy_static;
20use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
21use otel_arrow_rust::proto::opentelemetry::common::v1::{AnyValue, KeyValue, any_value};
22use otel_arrow_rust::proto::opentelemetry::metrics::v1::{metric, number_data_point, *};
23use regex::Regex;
24use session::protocol_ctx::{MetricType, OtlpMetricCtx};
25
26use crate::error::Result;
27use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME};
28use crate::row_writer::{self, MultiTableData, TableData};
29
30const APPROXIMATE_COLUMN_COUNT: usize = 8;
32
33const COUNT_TABLE_SUFFIX: &str = "_count";
34const SUM_TABLE_SUFFIX: &str = "_sum";
35
36const JOB_KEY: &str = "job";
37const INSTANCE_KEY: &str = "instance";
38
39const UNDERSCORE: &str = "_";
40const DOUBLE_UNDERSCORE: &str = "__";
41const TOTAL: &str = "total";
42const RATIO: &str = "ratio";
43
44const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [
46 "service.instance.id",
47 "service.name",
48 "service.namespace",
49 "service.version",
50 "cloud.availability_zone",
51 "cloud.region",
52 "container.name",
53 "deployment.environment",
54 "deployment.environment.name",
55 "k8s.cluster.name",
56 "k8s.container.name",
57 "k8s.cronjob.name",
58 "k8s.daemonset.name",
59 "k8s.deployment.name",
60 "k8s.job.name",
61 "k8s.namespace.name",
62 "k8s.pod.name",
63 "k8s.replicaset.name",
64 "k8s.statefulset.name",
65];
66
67lazy_static! {
68 static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet<String> =
69 HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string()));
70 static ref NON_ALPHA_NUM_CHAR: Regex = Regex::new(r"[^a-zA-Z0-9]").unwrap();
71 static ref UNIT_MAP: HashMap<String, String> = [
72 ("d", "days"),
74 ("h", "hours"),
75 ("min", "minutes"),
76 ("s", "seconds"),
77 ("ms", "milliseconds"),
78 ("us", "microseconds"),
79 ("ns", "nanoseconds"),
80 ("By", "bytes"),
82 ("KiBy", "kibibytes"),
83 ("MiBy", "mebibytes"),
84 ("GiBy", "gibibytes"),
85 ("TiBy", "tibibytes"),
86 ("KBy", "kilobytes"),
87 ("MBy", "megabytes"),
88 ("GBy", "gigabytes"),
89 ("TBy", "terabytes"),
90 ("m", "meters"),
92 ("V", "volts"),
93 ("A", "amperes"),
94 ("J", "joules"),
95 ("W", "watts"),
96 ("g", "grams"),
97 ("Cel", "celsius"),
99 ("Hz", "hertz"),
100 ("1", ""),
101 ("%", "percent"),
102 ].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect();
103 static ref PER_UNIT_MAP: HashMap<String, String> = [
104 ("s", "second"),
105 ("m", "minute"),
106 ("h", "hour"),
107 ("d", "day"),
108 ("w", "week"),
109 ("mo", "month"),
110 ("y", "year"),
111 ].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect();
112}
113
114const OTEL_SCOPE_NAME: &str = "name";
115const OTEL_SCOPE_VERSION: &str = "version";
116const OTEL_SCOPE_SCHEMA_URL: &str = "schema_url";
117
118pub fn to_grpc_insert_requests(
126 request: ExportMetricsServiceRequest,
127 metric_ctx: &mut OtlpMetricCtx,
128) -> Result<(RowInsertRequests, usize)> {
129 let mut table_writer = MultiTableData::default();
130
131 for resource in &request.resource_metrics {
132 let resource_attrs = resource.resource.as_ref().map(|r| {
133 let mut attrs = r.attributes.clone();
134 process_resource_attrs(&mut attrs, metric_ctx);
135 attrs
136 });
137
138 for scope in &resource.scope_metrics {
139 let scope_attrs = process_scope_attrs(scope, metric_ctx);
140
141 for metric in &scope.metrics {
142 if metric.data.is_none() {
143 continue;
144 }
145 if let Some(t) = metric.data.as_ref().map(from_metric_type) {
146 metric_ctx.set_metric_type(t);
147 }
148
149 encode_metrics(
150 &mut table_writer,
151 metric,
152 resource_attrs.as_ref(),
153 scope_attrs.as_ref(),
154 metric_ctx,
155 )?;
156 }
157 }
158 }
159
160 Ok(table_writer.into_row_insert_requests())
161}
162
163fn from_metric_type(data: &metric::Data) -> MetricType {
164 match data {
165 metric::Data::Gauge(_) => MetricType::Gauge,
166 metric::Data::Sum(s) => {
167 if s.is_monotonic {
168 MetricType::MonotonicSum
169 } else {
170 MetricType::NonMonotonicSum
171 }
172 }
173 metric::Data::Histogram(_) => MetricType::Histogram,
174 metric::Data::ExponentialHistogram(_) => MetricType::ExponentialHistogram,
175 metric::Data::Summary(_) => MetricType::Summary,
176 }
177}
178
179fn process_resource_attrs(attrs: &mut Vec<KeyValue>, metric_ctx: &OtlpMetricCtx) {
180 if metric_ctx.is_legacy {
181 return;
182 }
183
184 let mut tmp = Vec::with_capacity(2);
186 for kv in attrs.iter() {
187 match &kv.key as &str {
188 KEY_SERVICE_NAME => {
189 tmp.push(KeyValue {
190 key: JOB_KEY.to_string(),
191 value: kv.value.clone(),
192 });
193 }
194 KEY_SERVICE_INSTANCE_ID => {
195 tmp.push(KeyValue {
196 key: INSTANCE_KEY.to_string(),
197 value: kv.value.clone(),
198 });
199 }
200 _ => {}
201 }
202 }
203
204 if metric_ctx.promote_all_resource_attrs {
206 attrs.retain(|kv| !metric_ctx.resource_attrs.contains(&kv.key));
207 } else {
208 attrs.retain(|kv| {
209 metric_ctx.resource_attrs.contains(&kv.key)
210 || DEFAULT_PROMOTE_ATTRS_SET.contains(&kv.key)
211 });
212 }
213
214 attrs.extend(tmp);
215}
216
217fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Option<Vec<KeyValue>> {
218 if metric_ctx.is_legacy {
219 return scope.scope.as_ref().map(|s| s.attributes.clone());
220 };
221
222 if !metric_ctx.promote_scope_attrs {
223 return None;
224 }
225
226 scope.scope.as_ref().map(|s| {
228 let mut attrs = s.attributes.clone();
229 attrs.push(KeyValue {
230 key: OTEL_SCOPE_NAME.to_string(),
231 value: Some(AnyValue {
232 value: Some(any_value::Value::StringValue(s.name.clone())),
233 }),
234 });
235 attrs.push(KeyValue {
236 key: OTEL_SCOPE_VERSION.to_string(),
237 value: Some(AnyValue {
238 value: Some(any_value::Value::StringValue(s.version.clone())),
239 }),
240 });
241 attrs.push(KeyValue {
242 key: OTEL_SCOPE_SCHEMA_URL.to_string(),
243 value: Some(AnyValue {
244 value: Some(any_value::Value::StringValue(scope.schema_url.clone())),
245 }),
246 });
247 attrs
248 })
249}
250
251pub fn normalize_metric_name(metric: &Metric, metric_type: &MetricType) -> String {
253 let mut name_tokens: Vec<String> = NON_ALPHA_NUM_CHAR
255 .split(&metric.name)
256 .filter_map(|s| {
257 let trimmed = s.trim();
258 if trimmed.is_empty() {
259 None
260 } else {
261 Some(trimmed.to_string())
262 }
263 })
264 .collect();
265
266 if !metric.unit.is_empty() {
268 let (main, per) = build_unit_suffix(&metric.unit);
269 if let Some(main) = main
270 && !name_tokens.contains(&main)
271 {
272 name_tokens.push(main);
273 }
274 if let Some(per) = per
275 && !name_tokens.contains(&per)
276 {
277 name_tokens.push("per".to_string());
278 name_tokens.push(per);
279 }
280 }
281
282 if matches!(metric_type, MetricType::MonotonicSum) {
284 name_tokens.retain(|t| t != TOTAL);
286 name_tokens.push(TOTAL.to_string());
287 }
288
289 if metric.unit == "1" && matches!(metric_type, MetricType::Gauge) {
291 name_tokens.retain(|t| t != RATIO);
293 name_tokens.push(RATIO.to_string());
294 }
295
296 let name = name_tokens.join(UNDERSCORE);
298
299 if let Some((_, first)) = name.char_indices().next()
301 && first.is_ascii_digit()
302 {
303 format!("_{}", name)
304 } else {
305 name
306 }
307}
308
309fn build_unit_suffix(unit: &str) -> (Option<String>, Option<String>) {
310 let (main, per) = unit.split_once('/').unwrap_or((unit, ""));
311 (check_unit(main, &UNIT_MAP), check_unit(per, &PER_UNIT_MAP))
312}
313
314fn check_unit(unit_str: &str, unit_map: &HashMap<String, String>) -> Option<String> {
315 let u = unit_str.trim();
316 if !u.is_empty() && !u.contains('{') && !u.contains('}') {
318 let u = unit_map.get(u).map(|s| s.as_ref()).unwrap_or(u);
319 let u = clean_unit_name(u);
320 if !u.is_empty() {
321 return Some(u);
322 }
323 }
324 None
325}
326
327fn clean_unit_name(name: &str) -> String {
328 NON_ALPHA_NUM_CHAR
331 .split(name)
332 .filter(|s| !s.is_empty())
333 .collect::<Vec<&str>>()
334 .join(UNDERSCORE)
335}
336
337pub fn normalize_label_name(name: &str) -> String {
339 if name.is_empty() {
340 return name.to_string();
341 }
342
343 let n = NON_ALPHA_NUM_CHAR.replace_all(name, UNDERSCORE);
344 if let Some((_, first)) = n.char_indices().next()
345 && first.is_ascii_digit()
346 {
347 return format!("key_{}", n);
348 }
349 if n.starts_with(UNDERSCORE) && !n.starts_with(DOUBLE_UNDERSCORE) {
350 return format!("key{}", n);
351 }
352 n.to_string()
353}
354
355pub fn legacy_normalize_otlp_name(name: &str) -> String {
362 name.to_lowercase().replace(['.', '-'], "_")
363}
364
365fn encode_metrics(
366 table_writer: &mut MultiTableData,
367 metric: &Metric,
368 resource_attrs: Option<&Vec<KeyValue>>,
369 scope_attrs: Option<&Vec<KeyValue>>,
370 metric_ctx: &OtlpMetricCtx,
371) -> Result<()> {
372 let name = if metric_ctx.is_legacy {
373 legacy_normalize_otlp_name(&metric.name)
374 } else {
375 normalize_metric_name(metric, &metric_ctx.metric_type)
376 };
377
378 if let Some(data) = &metric.data {
381 match data {
382 metric::Data::Gauge(gauge) => {
383 encode_gauge(
384 table_writer,
385 &name,
386 gauge,
387 resource_attrs,
388 scope_attrs,
389 metric_ctx,
390 )?;
391 }
392 metric::Data::Sum(sum) => {
393 encode_sum(
394 table_writer,
395 &name,
396 sum,
397 resource_attrs,
398 scope_attrs,
399 metric_ctx,
400 )?;
401 }
402 metric::Data::Summary(summary) => {
403 encode_summary(
404 table_writer,
405 &name,
406 summary,
407 resource_attrs,
408 scope_attrs,
409 metric_ctx,
410 )?;
411 }
412 metric::Data::Histogram(hist) => {
413 encode_histogram(
414 table_writer,
415 &name,
416 hist,
417 resource_attrs,
418 scope_attrs,
419 metric_ctx,
420 )?;
421 }
422 metric::Data::ExponentialHistogram(_hist) => {}
424 }
425 }
426
427 Ok(())
428}
429
430#[derive(Debug, Clone, Copy, PartialEq, Eq)]
431enum AttributeType {
432 Resource,
433 Scope,
434 DataPoint,
435 Legacy,
436}
437
438fn write_attributes(
439 writer: &mut TableData,
440 row: &mut Vec<Value>,
441 attrs: Option<&Vec<KeyValue>>,
442 attribute_type: AttributeType,
443) -> Result<()> {
444 let Some(attrs) = attrs else {
445 return Ok(());
446 };
447
448 let tags = attrs.iter().filter_map(|attr| {
449 attr.value
450 .as_ref()
451 .and_then(|v| v.value.as_ref())
452 .and_then(|val| {
453 let key = match attribute_type {
454 AttributeType::Resource | AttributeType::DataPoint => {
455 normalize_label_name(&attr.key)
456 }
457 AttributeType::Scope => {
458 format!("otel_scope_{}", normalize_label_name(&attr.key))
459 }
460 AttributeType::Legacy => legacy_normalize_otlp_name(&attr.key),
461 };
462 match val {
463 any_value::Value::StringValue(s) => Some((key, s.clone())),
464 any_value::Value::IntValue(v) => Some((key, v.to_string())),
465 any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
466 _ => None, }
468 })
469 });
470 row_writer::write_tags(writer, tags, row)?;
471
472 Ok(())
473}
474
475fn write_timestamp(
476 table: &mut TableData,
477 row: &mut Vec<Value>,
478 time_nano: i64,
479 legacy_mode: bool,
480) -> Result<()> {
481 if legacy_mode {
482 row_writer::write_ts_to_nanos(
483 table,
484 GREPTIME_TIMESTAMP,
485 Some(time_nano),
486 Precision::Nanosecond,
487 row,
488 )
489 } else {
490 row_writer::write_ts_to_millis(
491 table,
492 GREPTIME_TIMESTAMP,
493 Some(time_nano / 1000000),
494 Precision::Millisecond,
495 row,
496 )
497 }
498}
499
500fn write_data_point_value(
501 table: &mut TableData,
502 row: &mut Vec<Value>,
503 field: &str,
504 value: &Option<number_data_point::Value>,
505) -> Result<()> {
506 match value {
507 Some(number_data_point::Value::AsInt(val)) => {
508 row_writer::write_f64(table, field, *val as f64, row)?;
510 }
511 Some(number_data_point::Value::AsDouble(val)) => {
512 row_writer::write_f64(table, field, *val, row)?;
513 }
514 _ => {}
515 }
516 Ok(())
517}
518
519fn write_tags_and_timestamp(
520 table: &mut TableData,
521 row: &mut Vec<Value>,
522 resource_attrs: Option<&Vec<KeyValue>>,
523 scope_attrs: Option<&Vec<KeyValue>>,
524 data_point_attrs: Option<&Vec<KeyValue>>,
525 timestamp_nanos: i64,
526 metric_ctx: &OtlpMetricCtx,
527) -> Result<()> {
528 if metric_ctx.is_legacy {
529 write_attributes(table, row, resource_attrs, AttributeType::Legacy)?;
530 write_attributes(table, row, scope_attrs, AttributeType::Legacy)?;
531 write_attributes(table, row, data_point_attrs, AttributeType::Legacy)?;
532 } else {
533 write_attributes(table, row, resource_attrs, AttributeType::Resource)?;
535 write_attributes(table, row, scope_attrs, AttributeType::Scope)?;
536 write_attributes(table, row, data_point_attrs, AttributeType::DataPoint)?;
537 }
538
539 write_timestamp(table, row, timestamp_nanos, metric_ctx.is_legacy)?;
540
541 Ok(())
542}
543
544fn encode_gauge(
549 table_writer: &mut MultiTableData,
550 name: &str,
551 gauge: &Gauge,
552 resource_attrs: Option<&Vec<KeyValue>>,
553 scope_attrs: Option<&Vec<KeyValue>>,
554 metric_ctx: &OtlpMetricCtx,
555) -> Result<()> {
556 let table = table_writer.get_or_default_table_data(
557 name,
558 APPROXIMATE_COLUMN_COUNT,
559 gauge.data_points.len(),
560 );
561
562 for data_point in &gauge.data_points {
563 let mut row = table.alloc_one_row();
564 write_tags_and_timestamp(
565 table,
566 &mut row,
567 resource_attrs,
568 scope_attrs,
569 Some(data_point.attributes.as_ref()),
570 data_point.time_unix_nano as i64,
571 metric_ctx,
572 )?;
573
574 write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
575 table.add_row(row);
576 }
577
578 Ok(())
579}
580
581fn encode_sum(
585 table_writer: &mut MultiTableData,
586 name: &str,
587 sum: &Sum,
588 resource_attrs: Option<&Vec<KeyValue>>,
589 scope_attrs: Option<&Vec<KeyValue>>,
590 metric_ctx: &OtlpMetricCtx,
591) -> Result<()> {
592 let table = table_writer.get_or_default_table_data(
593 name,
594 APPROXIMATE_COLUMN_COUNT,
595 sum.data_points.len(),
596 );
597
598 for data_point in &sum.data_points {
599 let mut row = table.alloc_one_row();
600 write_tags_and_timestamp(
601 table,
602 &mut row,
603 resource_attrs,
604 scope_attrs,
605 Some(data_point.attributes.as_ref()),
606 data_point.time_unix_nano as i64,
607 metric_ctx,
608 )?;
609 write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
610 table.add_row(row);
611 }
612
613 Ok(())
614}
615
616const HISTOGRAM_LE_COLUMN: &str = "le";
617
618fn encode_histogram(
630 table_writer: &mut MultiTableData,
631 name: &str,
632 hist: &Histogram,
633 resource_attrs: Option<&Vec<KeyValue>>,
634 scope_attrs: Option<&Vec<KeyValue>>,
635 metric_ctx: &OtlpMetricCtx,
636) -> Result<()> {
637 let normalized_name = name;
638
639 let bucket_table_name = format!("{}_bucket", normalized_name);
640 let sum_table_name = format!("{}_sum", normalized_name);
641 let count_table_name = format!("{}_count", normalized_name);
642
643 let data_points_len = hist.data_points.len();
644 let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
646 let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
647 let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
648
649 for data_point in &hist.data_points {
650 let mut accumulated_count = 0;
651 for (idx, count) in data_point.bucket_counts.iter().enumerate() {
652 let mut bucket_row = bucket_table.alloc_one_row();
653 write_tags_and_timestamp(
654 &mut bucket_table,
655 &mut bucket_row,
656 resource_attrs,
657 scope_attrs,
658 Some(data_point.attributes.as_ref()),
659 data_point.time_unix_nano as i64,
660 metric_ctx,
661 )?;
662
663 if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
664 row_writer::write_tag(
665 &mut bucket_table,
666 HISTOGRAM_LE_COLUMN,
667 upper_bounds,
668 &mut bucket_row,
669 )?;
670 } else if idx == data_point.explicit_bounds.len() {
671 row_writer::write_tag(
673 &mut bucket_table,
674 HISTOGRAM_LE_COLUMN,
675 f64::INFINITY,
676 &mut bucket_row,
677 )?;
678 }
679
680 accumulated_count += count;
681 row_writer::write_f64(
682 &mut bucket_table,
683 GREPTIME_VALUE,
684 accumulated_count as f64,
685 &mut bucket_row,
686 )?;
687
688 bucket_table.add_row(bucket_row);
689 }
690
691 if let Some(sum) = data_point.sum {
692 let mut sum_row = sum_table.alloc_one_row();
693 write_tags_and_timestamp(
694 &mut sum_table,
695 &mut sum_row,
696 resource_attrs,
697 scope_attrs,
698 Some(data_point.attributes.as_ref()),
699 data_point.time_unix_nano as i64,
700 metric_ctx,
701 )?;
702
703 row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?;
704 sum_table.add_row(sum_row);
705 }
706
707 let mut count_row = count_table.alloc_one_row();
708 write_tags_and_timestamp(
709 &mut count_table,
710 &mut count_row,
711 resource_attrs,
712 scope_attrs,
713 Some(data_point.attributes.as_ref()),
714 data_point.time_unix_nano as i64,
715 metric_ctx,
716 )?;
717
718 row_writer::write_f64(
719 &mut count_table,
720 GREPTIME_VALUE,
721 data_point.count as f64,
722 &mut count_row,
723 )?;
724 count_table.add_row(count_row);
725 }
726
727 table_writer.add_table_data(bucket_table_name, bucket_table);
728 table_writer.add_table_data(sum_table_name, sum_table);
729 table_writer.add_table_data(count_table_name, count_table);
730
731 Ok(())
732}
733
734#[allow(dead_code)]
735fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
736 Ok(())
738}
739
740fn encode_summary(
741 table_writer: &mut MultiTableData,
742 name: &str,
743 summary: &Summary,
744 resource_attrs: Option<&Vec<KeyValue>>,
745 scope_attrs: Option<&Vec<KeyValue>>,
746 metric_ctx: &OtlpMetricCtx,
747) -> Result<()> {
748 if metric_ctx.is_legacy {
749 let table = table_writer.get_or_default_table_data(
750 name,
751 APPROXIMATE_COLUMN_COUNT,
752 summary.data_points.len(),
753 );
754
755 for data_point in &summary.data_points {
756 let mut row = table.alloc_one_row();
757 write_tags_and_timestamp(
758 table,
759 &mut row,
760 resource_attrs,
761 scope_attrs,
762 Some(data_point.attributes.as_ref()),
763 data_point.time_unix_nano as i64,
764 metric_ctx,
765 )?;
766
767 for quantile in &data_point.quantile_values {
768 row_writer::write_f64(
769 table,
770 format!("greptime_p{:02}", quantile.quantile * 100f64),
771 quantile.value,
772 &mut row,
773 )?;
774 }
775
776 row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
777 table.add_row(row);
778 }
779 } else {
780 let metric_name = name;
785 let count_name = format!("{}{}", metric_name, COUNT_TABLE_SUFFIX);
786 let sum_name = format!("{}{}", metric_name, SUM_TABLE_SUFFIX);
787
788 for data_point in &summary.data_points {
789 {
790 let quantile_table = table_writer.get_or_default_table_data(
791 metric_name,
792 APPROXIMATE_COLUMN_COUNT,
793 summary.data_points.len(),
794 );
795
796 for quantile in &data_point.quantile_values {
797 let mut row = quantile_table.alloc_one_row();
798 write_tags_and_timestamp(
799 quantile_table,
800 &mut row,
801 resource_attrs,
802 scope_attrs,
803 Some(data_point.attributes.as_ref()),
804 data_point.time_unix_nano as i64,
805 metric_ctx,
806 )?;
807 row_writer::write_tag(quantile_table, "quantile", quantile.quantile, &mut row)?;
808 row_writer::write_f64(
809 quantile_table,
810 GREPTIME_VALUE,
811 quantile.value,
812 &mut row,
813 )?;
814 quantile_table.add_row(row);
815 }
816 }
817 {
818 let count_table = table_writer.get_or_default_table_data(
819 &count_name,
820 APPROXIMATE_COLUMN_COUNT,
821 summary.data_points.len(),
822 );
823 let mut row = count_table.alloc_one_row();
824 write_tags_and_timestamp(
825 count_table,
826 &mut row,
827 resource_attrs,
828 scope_attrs,
829 Some(data_point.attributes.as_ref()),
830 data_point.time_unix_nano as i64,
831 metric_ctx,
832 )?;
833
834 row_writer::write_f64(
835 count_table,
836 GREPTIME_VALUE,
837 data_point.count as f64,
838 &mut row,
839 )?;
840
841 count_table.add_row(row);
842 }
843 {
844 let sum_table = table_writer.get_or_default_table_data(
845 &sum_name,
846 APPROXIMATE_COLUMN_COUNT,
847 summary.data_points.len(),
848 );
849
850 let mut row = sum_table.alloc_one_row();
851 write_tags_and_timestamp(
852 sum_table,
853 &mut row,
854 resource_attrs,
855 scope_attrs,
856 Some(data_point.attributes.as_ref()),
857 data_point.time_unix_nano as i64,
858 metric_ctx,
859 )?;
860
861 row_writer::write_f64(sum_table, GREPTIME_VALUE, data_point.sum, &mut row)?;
862
863 sum_table.add_row(row);
864 }
865 }
866 }
867
868 Ok(())
869}
870
871#[cfg(test)]
872mod tests {
873 use otel_arrow_rust::proto::opentelemetry::common::v1::AnyValue;
874 use otel_arrow_rust::proto::opentelemetry::common::v1::any_value::Value as Val;
875 use otel_arrow_rust::proto::opentelemetry::metrics::v1::number_data_point::Value;
876 use otel_arrow_rust::proto::opentelemetry::metrics::v1::summary_data_point::ValueAtQuantile;
877 use otel_arrow_rust::proto::opentelemetry::metrics::v1::{
878 AggregationTemporality, HistogramDataPoint, NumberDataPoint, SummaryDataPoint,
879 };
880
881 use super::*;
882
883 #[test]
884 fn test_legacy_normalize_otlp_name() {
885 assert_eq!(
886 legacy_normalize_otlp_name("jvm.memory.free"),
887 "jvm_memory_free"
888 );
889 assert_eq!(
890 legacy_normalize_otlp_name("jvm-memory-free"),
891 "jvm_memory_free"
892 );
893 assert_eq!(
894 legacy_normalize_otlp_name("jvm_memory_free"),
895 "jvm_memory_free"
896 );
897 assert_eq!(
898 legacy_normalize_otlp_name("JVM_MEMORY_FREE"),
899 "jvm_memory_free"
900 );
901 assert_eq!(
902 legacy_normalize_otlp_name("JVM_memory_FREE"),
903 "jvm_memory_free"
904 );
905 }
906
907 #[test]
908 fn test_normalize_metric_name() {
909 let test_cases = vec![
910 (Metric::default(), MetricType::Init, ""),
912 (
914 Metric {
915 name: "foo".to_string(),
916 ..Default::default()
917 },
918 MetricType::Init,
919 "foo",
920 ),
921 (
923 Metric {
924 name: "foo".to_string(),
925 unit: "s".to_string(),
926 ..Default::default()
927 },
928 MetricType::Init,
929 "foo_seconds",
930 ),
931 (
933 Metric {
934 name: "foo_seconds".to_string(),
935 unit: "s".to_string(),
936 ..Default::default()
937 },
938 MetricType::Init,
939 "foo_seconds",
940 ),
941 (
943 Metric {
944 name: "foo".to_string(),
945 ..Default::default()
946 },
947 MetricType::MonotonicSum,
948 "foo_total",
949 ),
950 (
952 Metric {
953 name: "foo_total".to_string(),
954 ..Default::default()
955 },
956 MetricType::MonotonicSum,
957 "foo_total",
958 ),
959 (
961 Metric {
962 name: "foo".to_string(),
963 unit: "s".to_string(),
964 ..Default::default()
965 },
966 MetricType::MonotonicSum,
967 "foo_seconds_total",
968 ),
969 (
971 Metric {
972 name: "foo_seconds".to_string(),
973 unit: "s".to_string(),
974 ..Default::default()
975 },
976 MetricType::MonotonicSum,
977 "foo_seconds_total",
978 ),
979 (
981 Metric {
982 name: "foo_total".to_string(),
983 unit: "s".to_string(),
984 ..Default::default()
985 },
986 MetricType::MonotonicSum,
987 "foo_seconds_total",
988 ),
989 (
991 Metric {
992 name: "foo_seconds_total".to_string(),
993 unit: "s".to_string(),
994 ..Default::default()
995 },
996 MetricType::MonotonicSum,
997 "foo_seconds_total",
998 ),
999 (
1001 Metric {
1002 name: "foo_total_seconds".to_string(),
1003 unit: "s".to_string(),
1004 ..Default::default()
1005 },
1006 MetricType::MonotonicSum,
1007 "foo_seconds_total",
1008 ),
1009 (
1011 Metric {
1012 name: "foo".to_string(),
1013 unit: "1".to_string(),
1014 ..Default::default()
1015 },
1016 MetricType::Gauge,
1017 "foo_ratio",
1018 ),
1019 (
1021 Metric {
1022 name: "foo".to_string(),
1023 unit: "m/s".to_string(),
1024 ..Default::default()
1025 },
1026 MetricType::Init,
1027 "foo_meters_per_second",
1028 ),
1029 (
1031 Metric {
1032 name: "foo_second".to_string(),
1033 unit: "m/s".to_string(),
1034 ..Default::default()
1035 },
1036 MetricType::Init,
1037 "foo_second_meters",
1038 ),
1039 (
1041 Metric {
1042 name: "foo_meters".to_string(),
1043 unit: "m/s".to_string(),
1044 ..Default::default()
1045 },
1046 MetricType::Init,
1047 "foo_meters_per_second",
1048 ),
1049 ];
1050
1051 for (metric, metric_type, expected) in test_cases {
1052 let result = normalize_metric_name(&metric, &metric_type);
1053 assert_eq!(
1054 result, expected,
1055 "Failed for metric name: '{}', unit: '{}', type: {:?}",
1056 metric.name, metric.unit, metric_type
1057 );
1058 }
1059 }
1060
1061 #[test]
1062 fn test_normalize_metric_name_edge_cases() {
1063 let test_cases = vec![
1064 (
1066 Metric {
1067 name: "foo--bar__baz".to_string(),
1068 ..Default::default()
1069 },
1070 MetricType::Init,
1071 "foo_bar_baz",
1072 ),
1073 (
1075 Metric {
1076 name: "-foo_bar-".to_string(),
1077 ..Default::default()
1078 },
1079 MetricType::Init,
1080 "foo_bar",
1081 ),
1082 (
1084 Metric {
1085 name: "--___--".to_string(),
1086 ..Default::default()
1087 },
1088 MetricType::Init,
1089 "",
1090 ),
1091 (
1093 Metric {
1094 name: "2xx_requests".to_string(),
1095 ..Default::default()
1096 },
1097 MetricType::Init,
1098 "_2xx_requests",
1099 ),
1100 ];
1101
1102 for (metric, metric_type, expected) in test_cases {
1103 let result = normalize_metric_name(&metric, &metric_type);
1104 assert_eq!(
1105 result, expected,
1106 "Failed for metric name: '{}', unit: '{}', type: {:?}",
1107 metric.name, metric.unit, metric_type
1108 );
1109 }
1110 }
1111
1112 #[test]
1113 fn test_normalize_label_name() {
1114 let test_cases = vec![
1115 ("", ""),
1116 ("foo", "foo"),
1117 ("foo_bar/baz:abc", "foo_bar_baz_abc"),
1118 ("1foo", "key_1foo"),
1119 ("_foo", "key_foo"),
1120 ("__bar", "__bar"),
1121 ];
1122
1123 for (input, expected) in test_cases {
1124 let result = normalize_label_name(input);
1125 assert_eq!(
1126 result, expected,
1127 "unexpected result for input '{}'; got '{}'; want '{}'",
1128 input, result, expected
1129 );
1130 }
1131 }
1132
1133 #[test]
1134 fn test_clean_unit_name() {
1135 assert_eq!(clean_unit_name("faults"), "faults");
1137 assert_eq!(clean_unit_name("{faults}"), "faults"); assert_eq!(clean_unit_name("req/sec"), "req_sec");
1139 assert_eq!(clean_unit_name("m/s"), "m_s");
1140 assert_eq!(clean_unit_name("___test___"), "test");
1141 assert_eq!(
1142 clean_unit_name("multiple__underscores"),
1143 "multiple_underscores"
1144 );
1145 assert_eq!(clean_unit_name(""), "");
1146 assert_eq!(clean_unit_name("___"), "");
1147 assert_eq!(clean_unit_name("bytes.per.second"), "bytes_per_second");
1148 }
1149
1150 #[test]
1151 fn test_normalize_metric_name_braced_units() {
1152 let test_cases = vec![
1154 (
1155 Metric {
1156 name: "test.metric".to_string(),
1157 unit: "{faults}".to_string(),
1158 ..Default::default()
1159 },
1160 MetricType::MonotonicSum,
1161 "test_metric_total", ),
1163 (
1164 Metric {
1165 name: "test.metric".to_string(),
1166 unit: "{operations}".to_string(),
1167 ..Default::default()
1168 },
1169 MetricType::Gauge,
1170 "test_metric", ),
1172 (
1173 Metric {
1174 name: "test.metric".to_string(),
1175 unit: "{}".to_string(), ..Default::default()
1177 },
1178 MetricType::Gauge,
1179 "test_metric",
1180 ),
1181 (
1182 Metric {
1183 name: "test.metric".to_string(),
1184 unit: "faults".to_string(), ..Default::default()
1186 },
1187 MetricType::Gauge,
1188 "test_metric_faults",
1189 ),
1190 ];
1191
1192 for (metric, metric_type, expected) in test_cases {
1193 let result = normalize_metric_name(&metric, &metric_type);
1194 assert_eq!(
1195 result, expected,
1196 "Failed for metric name: '{}', unit: '{}', type: {:?}. Got: '{}', Expected: '{}'",
1197 metric.name, metric.unit, metric_type, result, expected
1198 );
1199 }
1200 }
1201
1202 #[test]
1203 fn test_normalize_metric_name_with_testdata() {
1204 let test_cases = vec![
1206 (
1208 Metric {
1209 name: "system.paging.faults".to_string(),
1210 unit: "{faults}".to_string(),
1211 ..Default::default()
1212 },
1213 MetricType::MonotonicSum,
1214 "system_paging_faults_total", ),
1216 (
1217 Metric {
1218 name: "system.paging.operations".to_string(),
1219 unit: "{operations}".to_string(),
1220 ..Default::default()
1221 },
1222 MetricType::MonotonicSum,
1223 "system_paging_operations_total", ),
1225 (
1226 Metric {
1227 name: "system.paging.usage".to_string(),
1228 unit: "By".to_string(),
1229 ..Default::default()
1230 },
1231 MetricType::NonMonotonicSum,
1232 "system_paging_usage_bytes",
1233 ),
1234 (
1236 Metric {
1237 name: "system.cpu.load_average.15m".to_string(),
1238 unit: "{thread}".to_string(),
1239 ..Default::default()
1240 },
1241 MetricType::Gauge,
1242 "system_cpu_load_average_15m", ),
1244 (
1245 Metric {
1246 name: "system.cpu.load_average.1m".to_string(),
1247 unit: "{thread}".to_string(),
1248 ..Default::default()
1249 },
1250 MetricType::Gauge,
1251 "system_cpu_load_average_1m", ),
1253 (
1255 Metric {
1256 name: "system.disk.io".to_string(),
1257 unit: "By".to_string(),
1258 ..Default::default()
1259 },
1260 MetricType::MonotonicSum,
1261 "system_disk_io_bytes_total",
1262 ),
1263 (
1265 Metric {
1266 name: "system.disk.io_time".to_string(),
1267 unit: "s".to_string(),
1268 ..Default::default()
1269 },
1270 MetricType::MonotonicSum,
1271 "system_disk_io_time_seconds_total",
1272 ),
1273 (
1274 Metric {
1275 name: "system.disk.operation_time".to_string(),
1276 unit: "s".to_string(),
1277 ..Default::default()
1278 },
1279 MetricType::MonotonicSum,
1280 "system_disk_operation_time_seconds_total",
1281 ),
1282 (
1284 Metric {
1285 name: "system.cpu.time".to_string(),
1286 unit: "s".to_string(),
1287 ..Default::default()
1288 },
1289 MetricType::MonotonicSum,
1290 "system_cpu_time_seconds_total",
1291 ),
1292 (
1294 Metric {
1295 name: "system.processes.count".to_string(),
1296 unit: "{processes}".to_string(),
1297 ..Default::default()
1298 },
1299 MetricType::NonMonotonicSum,
1300 "system_processes_count", ),
1302 (
1303 Metric {
1304 name: "system.processes.created".to_string(),
1305 unit: "{processes}".to_string(),
1306 ..Default::default()
1307 },
1308 MetricType::MonotonicSum,
1309 "system_processes_created_total", ),
1311 (
1313 Metric {
1314 name: "system.memory.usage".to_string(),
1315 unit: "By".to_string(),
1316 ..Default::default()
1317 },
1318 MetricType::NonMonotonicSum,
1319 "system_memory_usage_bytes",
1320 ),
1321 (
1323 Metric {
1324 name: "system.uptime".to_string(),
1325 unit: "s".to_string(),
1326 ..Default::default()
1327 },
1328 MetricType::Gauge,
1329 "system_uptime_seconds",
1330 ),
1331 (
1333 Metric {
1334 name: "system.network.connections".to_string(),
1335 unit: "{connections}".to_string(),
1336 ..Default::default()
1337 },
1338 MetricType::NonMonotonicSum,
1339 "system_network_connections", ),
1341 (
1342 Metric {
1343 name: "system.network.dropped".to_string(),
1344 unit: "{packets}".to_string(),
1345 ..Default::default()
1346 },
1347 MetricType::MonotonicSum,
1348 "system_network_dropped_total", ),
1350 (
1351 Metric {
1352 name: "system.network.errors".to_string(),
1353 unit: "{errors}".to_string(),
1354 ..Default::default()
1355 },
1356 MetricType::MonotonicSum,
1357 "system_network_errors_total", ),
1359 (
1360 Metric {
1361 name: "system.network.io".to_string(),
1362 unit: "By".to_string(),
1363 ..Default::default()
1364 },
1365 MetricType::MonotonicSum,
1366 "system_network_io_bytes_total",
1367 ),
1368 (
1369 Metric {
1370 name: "system.network.packets".to_string(),
1371 unit: "{packets}".to_string(),
1372 ..Default::default()
1373 },
1374 MetricType::MonotonicSum,
1375 "system_network_packets_total", ),
1377 (
1379 Metric {
1380 name: "system.filesystem.inodes.usage".to_string(),
1381 unit: "{inodes}".to_string(),
1382 ..Default::default()
1383 },
1384 MetricType::NonMonotonicSum,
1385 "system_filesystem_inodes_usage", ),
1387 (
1388 Metric {
1389 name: "system.filesystem.usage".to_string(),
1390 unit: "By".to_string(),
1391 ..Default::default()
1392 },
1393 MetricType::NonMonotonicSum,
1394 "system_filesystem_usage_bytes",
1395 ),
1396 (
1398 Metric {
1399 name: "system.load.1".to_string(),
1400 unit: "1".to_string(),
1401 ..Default::default()
1402 },
1403 MetricType::Gauge,
1404 "system_load_1_ratio",
1405 ),
1406 (
1407 Metric {
1408 name: "http.request.2xx".to_string(),
1409 unit: "{requests}".to_string(),
1410 ..Default::default()
1411 },
1412 MetricType::MonotonicSum,
1413 "http_request_2xx_total", ),
1415 (
1417 Metric {
1418 name: "jvm.memory.heap_usage".to_string(),
1419 unit: "By".to_string(),
1420 ..Default::default()
1421 },
1422 MetricType::Gauge,
1423 "jvm_memory_heap_usage_bytes",
1424 ),
1425 (
1427 Metric {
1428 name: "http.request.rate".to_string(),
1429 unit: "1/s".to_string(),
1430 ..Default::default()
1431 },
1432 MetricType::Gauge,
1433 "http_request_rate_per_second",
1434 ),
1435 ];
1436
1437 for (metric, metric_type, expected) in test_cases {
1438 let result = normalize_metric_name(&metric, &metric_type);
1439 assert_eq!(
1440 result, expected,
1441 "Failed for metric name: '{}', unit: '{}', type: {:?}. Got: '{}', Expected: '{}'",
1442 metric.name, metric.unit, metric_type, result, expected
1443 );
1444 }
1445 }
1446
1447 fn keyvalue(key: &str, value: &str) -> KeyValue {
1448 KeyValue {
1449 key: key.into(),
1450 value: Some(AnyValue {
1451 value: Some(Val::StringValue(value.into())),
1452 }),
1453 }
1454 }
1455
1456 #[test]
1457 fn test_encode_gauge() {
1458 let mut tables = MultiTableData::default();
1459
1460 let data_points = vec![
1461 NumberDataPoint {
1462 attributes: vec![keyvalue("host", "testsevrer")],
1463 time_unix_nano: 100,
1464 value: Some(Value::AsInt(100)),
1465 ..Default::default()
1466 },
1467 NumberDataPoint {
1468 attributes: vec![keyvalue("host", "testserver")],
1469 time_unix_nano: 105,
1470 value: Some(Value::AsInt(105)),
1471 ..Default::default()
1472 },
1473 ];
1474 let gauge = Gauge { data_points };
1475 encode_gauge(
1476 &mut tables,
1477 "datamon",
1478 &gauge,
1479 Some(&vec![]),
1480 Some(&vec![keyvalue("scope", "otel")]),
1481 &OtlpMetricCtx::default(),
1482 )
1483 .unwrap();
1484
1485 let table = tables.get_or_default_table_data("datamon", 0, 0);
1486 assert_eq!(table.num_rows(), 2);
1487 assert_eq!(table.num_columns(), 4);
1488 assert_eq!(
1489 table
1490 .columns()
1491 .iter()
1492 .map(|c| &c.column_name)
1493 .collect::<Vec<&String>>(),
1494 vec![
1495 "otel_scope_scope",
1496 "host",
1497 "greptime_timestamp",
1498 "greptime_value"
1499 ]
1500 );
1501 }
1502
1503 #[test]
1504 fn test_encode_sum() {
1505 let mut tables = MultiTableData::default();
1506
1507 let data_points = vec![
1508 NumberDataPoint {
1509 attributes: vec![keyvalue("host", "testserver")],
1510 time_unix_nano: 100,
1511 value: Some(Value::AsInt(100)),
1512 ..Default::default()
1513 },
1514 NumberDataPoint {
1515 attributes: vec![keyvalue("host", "testserver")],
1516 time_unix_nano: 105,
1517 value: Some(Value::AsInt(0)),
1518 ..Default::default()
1519 },
1520 ];
1521 let sum = Sum {
1522 data_points,
1523 ..Default::default()
1524 };
1525 encode_sum(
1526 &mut tables,
1527 "datamon",
1528 &sum,
1529 Some(&vec![]),
1530 Some(&vec![keyvalue("scope", "otel")]),
1531 &OtlpMetricCtx::default(),
1532 )
1533 .unwrap();
1534
1535 let table = tables.get_or_default_table_data("datamon", 0, 0);
1536 assert_eq!(table.num_rows(), 2);
1537 assert_eq!(table.num_columns(), 4);
1538 assert_eq!(
1539 table
1540 .columns()
1541 .iter()
1542 .map(|c| &c.column_name)
1543 .collect::<Vec<&String>>(),
1544 vec![
1545 "otel_scope_scope",
1546 "host",
1547 "greptime_timestamp",
1548 "greptime_value"
1549 ]
1550 );
1551 }
1552
1553 #[test]
1554 fn test_encode_summary() {
1555 let mut tables = MultiTableData::default();
1556
1557 let data_points = vec![SummaryDataPoint {
1558 attributes: vec![keyvalue("host", "testserver")],
1559 time_unix_nano: 100,
1560 count: 25,
1561 sum: 5400.0,
1562 quantile_values: vec![
1563 ValueAtQuantile {
1564 quantile: 0.90,
1565 value: 1000.0,
1566 },
1567 ValueAtQuantile {
1568 quantile: 0.95,
1569 value: 3030.0,
1570 },
1571 ],
1572 ..Default::default()
1573 }];
1574 let summary = Summary { data_points };
1575 encode_summary(
1576 &mut tables,
1577 "datamon",
1578 &summary,
1579 Some(&vec![]),
1580 Some(&vec![keyvalue("scope", "otel")]),
1581 &OtlpMetricCtx::default(),
1582 )
1583 .unwrap();
1584
1585 let table = tables.get_or_default_table_data("datamon", 0, 0);
1586 assert_eq!(table.num_rows(), 2);
1587 assert_eq!(table.num_columns(), 5);
1588 assert_eq!(
1589 table
1590 .columns()
1591 .iter()
1592 .map(|c| &c.column_name)
1593 .collect::<Vec<&String>>(),
1594 vec![
1595 "otel_scope_scope",
1596 "host",
1597 "greptime_timestamp",
1598 "quantile",
1599 "greptime_value"
1600 ]
1601 );
1602
1603 let table = tables.get_or_default_table_data("datamon_count", 0, 0);
1604 assert_eq!(table.num_rows(), 1);
1605 assert_eq!(table.num_columns(), 4);
1606 assert_eq!(
1607 table
1608 .columns()
1609 .iter()
1610 .map(|c| &c.column_name)
1611 .collect::<Vec<&String>>(),
1612 vec![
1613 "otel_scope_scope",
1614 "host",
1615 "greptime_timestamp",
1616 "greptime_value"
1617 ]
1618 );
1619
1620 let table = tables.get_or_default_table_data("datamon_sum", 0, 0);
1621 assert_eq!(table.num_rows(), 1);
1622 assert_eq!(table.num_columns(), 4);
1623 assert_eq!(
1624 table
1625 .columns()
1626 .iter()
1627 .map(|c| &c.column_name)
1628 .collect::<Vec<&String>>(),
1629 vec![
1630 "otel_scope_scope",
1631 "host",
1632 "greptime_timestamp",
1633 "greptime_value"
1634 ]
1635 );
1636 }
1637
1638 #[test]
1639 fn test_encode_histogram() {
1640 let mut tables = MultiTableData::default();
1641
1642 let data_points = vec![HistogramDataPoint {
1643 attributes: vec![keyvalue("host", "testserver")],
1644 time_unix_nano: 100,
1645 start_time_unix_nano: 23,
1646 count: 25,
1647 sum: Some(100.),
1648 max: Some(200.),
1649 min: Some(0.03),
1650 bucket_counts: vec![2, 4, 6, 9, 4],
1651 explicit_bounds: vec![0.1, 1., 10., 100.],
1652 ..Default::default()
1653 }];
1654
1655 let histogram = Histogram {
1656 data_points,
1657 aggregation_temporality: AggregationTemporality::Delta.into(),
1658 };
1659 encode_histogram(
1660 &mut tables,
1661 "histo",
1662 &histogram,
1663 Some(&vec![]),
1664 Some(&vec![keyvalue("scope", "otel")]),
1665 &OtlpMetricCtx::default(),
1666 )
1667 .unwrap();
1668
1669 assert_eq!(3, tables.num_tables());
1670
1671 let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
1673 assert_eq!(bucket_table.num_rows(), 5);
1674 assert_eq!(bucket_table.num_columns(), 5);
1675 assert_eq!(
1676 bucket_table
1677 .columns()
1678 .iter()
1679 .map(|c| &c.column_name)
1680 .collect::<Vec<&String>>(),
1681 vec![
1682 "otel_scope_scope",
1683 "host",
1684 "greptime_timestamp",
1685 "le",
1686 "greptime_value",
1687 ]
1688 );
1689
1690 let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
1691 assert_eq!(sum_table.num_rows(), 1);
1692 assert_eq!(sum_table.num_columns(), 4);
1693 assert_eq!(
1694 sum_table
1695 .columns()
1696 .iter()
1697 .map(|c| &c.column_name)
1698 .collect::<Vec<&String>>(),
1699 vec![
1700 "otel_scope_scope",
1701 "host",
1702 "greptime_timestamp",
1703 "greptime_value"
1704 ]
1705 );
1706
1707 let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
1708 assert_eq!(count_table.num_rows(), 1);
1709 assert_eq!(count_table.num_columns(), 4);
1710 assert_eq!(
1711 count_table
1712 .columns()
1713 .iter()
1714 .map(|c| &c.column_name)
1715 .collect::<Vec<&String>>(),
1716 vec![
1717 "otel_scope_scope",
1718 "host",
1719 "greptime_timestamp",
1720 "greptime_value"
1721 ]
1722 );
1723 }
1724}