1use ahash::HashSet;
16use api::v1::{RowInsertRequests, Value};
17use common_grpc::precision::Precision;
18use common_query::prelude::{GREPTIME_COUNT, greptime_timestamp, greptime_value};
19use lazy_static::lazy_static;
20use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
21use otel_arrow_rust::proto::opentelemetry::common::v1::{AnyValue, KeyValue, any_value};
22use otel_arrow_rust::proto::opentelemetry::metrics::v1::{metric, number_data_point, *};
23use session::protocol_ctx::{MetricType, OtlpMetricCtx};
24use table::requests::{
25 METADATA_QUALITY_DECLARED, SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_METRIC_ORIGINAL_NAME,
26 SEMANTIC_METRIC_TEMPORALITY, SEMANTIC_METRIC_TYPE, SEMANTIC_METRIC_UNIT,
27};
28
29use crate::error::Result;
30use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME};
31use crate::row_writer::{self, MultiTableData, TableData};
32
33mod semantic;
34mod translator;
35
36pub use semantic::SemanticIndex;
37pub use translator::legacy_normalize_otlp_name;
38use translator::{translate_label_name, translate_metric_name};
39
40const APPROXIMATE_COLUMN_COUNT: usize = 8;
42
43const COUNT_TABLE_SUFFIX: &str = "_count";
44const SUM_TABLE_SUFFIX: &str = "_sum";
45const BUCKET_TABLE_SUFFIX: &str = "_bucket";
46
47const METRIC_TYPE_COUNTER: &str = "counter";
51const METRIC_TYPE_UPDOWN_COUNTER: &str = "updown_counter";
52const METRIC_TYPE_GAUGE: &str = "gauge";
53const METRIC_TYPE_HISTOGRAM: &str = "histogram";
54const METRIC_TYPE_SUMMARY: &str = "summary";
55
56const JOB_KEY: &str = "job";
57const INSTANCE_KEY: &str = "instance";
58
59const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [
61 "service.instance.id",
62 "service.name",
63 "service.namespace",
64 "service.version",
65 "cloud.availability_zone",
66 "cloud.region",
67 "container.name",
68 "deployment.environment",
69 "deployment.environment.name",
70 "k8s.cluster.name",
71 "k8s.container.name",
72 "k8s.cronjob.name",
73 "k8s.daemonset.name",
74 "k8s.deployment.name",
75 "k8s.job.name",
76 "k8s.namespace.name",
77 "k8s.pod.name",
78 "k8s.replicaset.name",
79 "k8s.statefulset.name",
80];
81
82lazy_static! {
83 static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet<String> =
84 HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string()));
85}
86
87const OTEL_SCOPE_NAME: &str = "name";
88const OTEL_SCOPE_VERSION: &str = "version";
89const OTEL_SCOPE_SCHEMA_URL: &str = "schema_url";
90
91pub fn to_grpc_insert_requests(
100 request: ExportMetricsServiceRequest,
101 metric_ctx: &mut OtlpMetricCtx,
102) -> Result<(RowInsertRequests, usize, SemanticIndex)> {
103 let mut table_writer = MultiTableData::default();
104 let mut semantic_index = SemanticIndex::default();
105
106 for resource in &request.resource_metrics {
107 let resource_attrs = resource.resource.as_ref().map(|r| {
108 let mut attrs = r.attributes.clone();
109 process_resource_attrs(&mut attrs, metric_ctx);
110 attrs
111 });
112
113 for scope in &resource.scope_metrics {
114 let scope_attrs = process_scope_attrs(scope, metric_ctx);
115
116 for metric in &scope.metrics {
117 if metric.data.is_none() {
118 continue;
119 }
120 if let Some(t) = metric.data.as_ref().map(from_metric_type) {
121 metric_ctx.set_metric_type(t);
122 }
123
124 encode_metrics(
125 &mut table_writer,
126 metric,
127 resource_attrs.as_ref(),
128 scope_attrs.as_ref(),
129 metric_ctx,
130 &mut semantic_index,
131 )?;
132 }
133 }
134 }
135
136 let (requests, rows) = table_writer.into_row_insert_requests();
137 Ok((requests, rows, semantic_index))
138}
139
140fn emitted_semantic_tables(
145 metric_type: &MetricType,
146 is_legacy: bool,
147 base: &str,
148) -> Vec<(String, &'static str)> {
149 match metric_type {
150 MetricType::Gauge => vec![(base.to_string(), METRIC_TYPE_GAUGE)],
151 MetricType::MonotonicSum => vec![(base.to_string(), METRIC_TYPE_COUNTER)],
152 MetricType::NonMonotonicSum => vec![(base.to_string(), METRIC_TYPE_UPDOWN_COUNTER)],
153 MetricType::Histogram => vec![
154 (
155 format!("{base}{BUCKET_TABLE_SUFFIX}"),
156 METRIC_TYPE_HISTOGRAM,
157 ),
158 (format!("{base}{SUM_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER),
159 (format!("{base}{COUNT_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER),
160 ],
161 MetricType::Summary if is_legacy => vec![(base.to_string(), METRIC_TYPE_SUMMARY)],
162 MetricType::Summary => vec![
163 (base.to_string(), METRIC_TYPE_SUMMARY),
164 (format!("{base}{COUNT_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER),
165 (format!("{base}{SUM_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER),
166 ],
167 MetricType::ExponentialHistogram | MetricType::Init => vec![],
169 }
170}
171
172fn temporality_value(data: &metric::Data) -> Option<&'static str> {
175 let raw = match data {
176 metric::Data::Sum(sum) => sum.aggregation_temporality,
177 metric::Data::Histogram(hist) => hist.aggregation_temporality,
178 _ => return None,
179 };
180 match AggregationTemporality::try_from(raw) {
181 Ok(AggregationTemporality::Delta) => Some("delta"),
182 Ok(AggregationTemporality::Cumulative) => Some("cumulative"),
183 _ => None,
184 }
185}
186
187fn record_metric_semantics(
190 index: &mut SemanticIndex,
191 metric: &Metric,
192 name: &str,
193 metric_ctx: &OtlpMetricCtx,
194) {
195 let emitted = emitted_semantic_tables(&metric_ctx.metric_type, metric_ctx.is_legacy, name);
196 if emitted.is_empty() {
197 return;
198 }
199
200 let temporality = metric.data.as_ref().and_then(temporality_value);
201 let unit = metric.unit.trim();
202 let original_name = (name != metric.name.as_str()).then_some(metric.name.as_str());
204
205 for (table, metric_type) in &emitted {
206 index.record_scalar(table, SEMANTIC_METRIC_TYPE, metric_type);
207 index.record_scalar(
208 table,
209 SEMANTIC_METRIC_METADATA_QUALITY,
210 METADATA_QUALITY_DECLARED,
211 );
212 if let Some(temporality) = temporality {
213 index.record_scalar(table, SEMANTIC_METRIC_TEMPORALITY, temporality);
214 }
215 if !unit.is_empty() {
216 index.record_scalar(table, SEMANTIC_METRIC_UNIT, unit);
217 }
218 if let Some(original_name) = original_name {
219 index.record_scalar(table, SEMANTIC_METRIC_ORIGINAL_NAME, original_name);
220 }
221 }
222}
223
224fn from_metric_type(data: &metric::Data) -> MetricType {
225 match data {
226 metric::Data::Gauge(_) => MetricType::Gauge,
227 metric::Data::Sum(s) => {
228 if s.is_monotonic {
229 MetricType::MonotonicSum
230 } else {
231 MetricType::NonMonotonicSum
232 }
233 }
234 metric::Data::Histogram(_) => MetricType::Histogram,
235 metric::Data::ExponentialHistogram(_) => MetricType::ExponentialHistogram,
236 metric::Data::Summary(_) => MetricType::Summary,
237 }
238}
239
240fn process_resource_attrs(attrs: &mut Vec<KeyValue>, metric_ctx: &OtlpMetricCtx) {
241 if metric_ctx.is_legacy {
242 return;
243 }
244
245 let mut tmp = Vec::with_capacity(2);
247 for kv in attrs.iter() {
248 match &kv.key as &str {
249 KEY_SERVICE_NAME => {
250 tmp.push(KeyValue {
251 key: JOB_KEY.to_string(),
252 value: kv.value.clone(),
253 });
254 }
255 KEY_SERVICE_INSTANCE_ID => {
256 tmp.push(KeyValue {
257 key: INSTANCE_KEY.to_string(),
258 value: kv.value.clone(),
259 });
260 }
261 _ => {}
262 }
263 }
264
265 if metric_ctx.promote_all_resource_attrs {
267 attrs.retain(|kv| !metric_ctx.resource_attrs.contains(&kv.key));
268 } else {
269 attrs.retain(|kv| {
270 metric_ctx.resource_attrs.contains(&kv.key)
271 || DEFAULT_PROMOTE_ATTRS_SET.contains(&kv.key)
272 });
273 }
274
275 attrs.extend(tmp);
276}
277
278fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Option<Vec<KeyValue>> {
279 if metric_ctx.is_legacy {
280 return scope.scope.as_ref().map(|s| s.attributes.clone());
281 };
282
283 if !metric_ctx.promote_scope_attrs {
284 return None;
285 }
286
287 scope.scope.as_ref().map(|s| {
289 let mut attrs = s.attributes.clone();
290 attrs.push(KeyValue {
291 key: OTEL_SCOPE_NAME.to_string(),
292 value: Some(AnyValue {
293 value: Some(any_value::Value::StringValue(s.name.clone())),
294 }),
295 });
296 attrs.push(KeyValue {
297 key: OTEL_SCOPE_VERSION.to_string(),
298 value: Some(AnyValue {
299 value: Some(any_value::Value::StringValue(s.version.clone())),
300 }),
301 });
302 attrs.push(KeyValue {
303 key: OTEL_SCOPE_SCHEMA_URL.to_string(),
304 value: Some(AnyValue {
305 value: Some(any_value::Value::StringValue(scope.schema_url.clone())),
306 }),
307 });
308 attrs
309 })
310}
311
312fn encode_metrics(
313 table_writer: &mut MultiTableData,
314 metric: &Metric,
315 resource_attrs: Option<&Vec<KeyValue>>,
316 scope_attrs: Option<&Vec<KeyValue>>,
317 metric_ctx: &OtlpMetricCtx,
318 semantic_index: &mut SemanticIndex,
319) -> Result<()> {
320 let name = if metric_ctx.is_legacy {
321 legacy_normalize_otlp_name(&metric.name)
322 } else {
323 translate_metric_name(
324 metric,
325 &metric_ctx.metric_type,
326 metric_ctx.metric_translation_strategy,
327 )
328 };
329
330 record_metric_semantics(semantic_index, metric, &name, metric_ctx);
334
335 if let Some(data) = &metric.data {
336 match data {
337 metric::Data::Gauge(gauge) => {
338 encode_gauge(
339 table_writer,
340 &name,
341 gauge,
342 resource_attrs,
343 scope_attrs,
344 metric_ctx,
345 )?;
346 }
347 metric::Data::Sum(sum) => {
348 encode_sum(
349 table_writer,
350 &name,
351 sum,
352 resource_attrs,
353 scope_attrs,
354 metric_ctx,
355 )?;
356 }
357 metric::Data::Summary(summary) => {
358 encode_summary(
359 table_writer,
360 &name,
361 summary,
362 resource_attrs,
363 scope_attrs,
364 metric_ctx,
365 )?;
366 }
367 metric::Data::Histogram(hist) => {
368 encode_histogram(
369 table_writer,
370 &name,
371 hist,
372 resource_attrs,
373 scope_attrs,
374 metric_ctx,
375 )?;
376 }
377 metric::Data::ExponentialHistogram(_hist) => {}
379 }
380 }
381
382 Ok(())
383}
384
385#[derive(Debug, Clone, Copy, PartialEq, Eq)]
386enum AttributeType {
387 Resource,
388 Scope,
389 DataPoint,
390 Legacy,
391}
392
393fn write_attributes(
394 writer: &mut TableData,
395 row: &mut Vec<Value>,
396 attrs: Option<&Vec<KeyValue>>,
397 attribute_type: AttributeType,
398 metric_ctx: &OtlpMetricCtx,
399) -> Result<()> {
400 let Some(attrs) = attrs else {
401 return Ok(());
402 };
403
404 let tags = attrs.iter().filter_map(|attr| {
405 attr.value
406 .as_ref()
407 .and_then(|v| v.value.as_ref())
408 .and_then(|val| {
409 let key = match attribute_type {
410 AttributeType::Resource | AttributeType::DataPoint => {
411 translate_label_name(&attr.key, metric_ctx.metric_translation_strategy)
412 }
413 AttributeType::Scope => {
414 format!(
415 "otel_scope_{}",
416 translate_label_name(&attr.key, metric_ctx.metric_translation_strategy)
417 )
418 }
419 AttributeType::Legacy => legacy_normalize_otlp_name(&attr.key),
420 };
421 match val {
422 any_value::Value::StringValue(s) => Some((key, s.clone())),
423 any_value::Value::IntValue(v) => Some((key, v.to_string())),
424 any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
425 _ => None, }
427 })
428 });
429 row_writer::write_tags(writer, tags, row)?;
430
431 Ok(())
432}
433
434fn write_timestamp(
435 table: &mut TableData,
436 row: &mut Vec<Value>,
437 time_nano: i64,
438 legacy_mode: bool,
439) -> Result<()> {
440 if legacy_mode {
441 row_writer::write_ts_to_nanos(
442 table,
443 greptime_timestamp(),
444 Some(time_nano),
445 Precision::Nanosecond,
446 row,
447 )
448 } else {
449 row_writer::write_ts_to_millis(
450 table,
451 greptime_timestamp(),
452 Some(time_nano / 1000000),
453 Precision::Millisecond,
454 row,
455 )
456 }
457}
458
459fn write_data_point_value(
460 table: &mut TableData,
461 row: &mut Vec<Value>,
462 field: &str,
463 value: &Option<number_data_point::Value>,
464) -> Result<()> {
465 match value {
466 Some(number_data_point::Value::AsInt(val)) => {
467 row_writer::write_f64(table, field, *val as f64, row)?;
469 }
470 Some(number_data_point::Value::AsDouble(val)) => {
471 row_writer::write_f64(table, field, *val, row)?;
472 }
473 _ => {}
474 }
475 Ok(())
476}
477
478fn write_tags_and_timestamp(
479 table: &mut TableData,
480 row: &mut Vec<Value>,
481 resource_attrs: Option<&Vec<KeyValue>>,
482 scope_attrs: Option<&Vec<KeyValue>>,
483 data_point_attrs: Option<&Vec<KeyValue>>,
484 timestamp_nanos: i64,
485 metric_ctx: &OtlpMetricCtx,
486) -> Result<()> {
487 if metric_ctx.is_legacy {
488 write_attributes(
489 table,
490 row,
491 resource_attrs,
492 AttributeType::Legacy,
493 metric_ctx,
494 )?;
495 write_attributes(table, row, scope_attrs, AttributeType::Legacy, metric_ctx)?;
496 write_attributes(
497 table,
498 row,
499 data_point_attrs,
500 AttributeType::Legacy,
501 metric_ctx,
502 )?;
503 } else {
504 write_attributes(
506 table,
507 row,
508 resource_attrs,
509 AttributeType::Resource,
510 metric_ctx,
511 )?;
512 write_attributes(table, row, scope_attrs, AttributeType::Scope, metric_ctx)?;
513 write_attributes(
514 table,
515 row,
516 data_point_attrs,
517 AttributeType::DataPoint,
518 metric_ctx,
519 )?;
520 }
521
522 write_timestamp(table, row, timestamp_nanos, metric_ctx.is_legacy)?;
523
524 Ok(())
525}
526
527fn encode_gauge(
532 table_writer: &mut MultiTableData,
533 name: &str,
534 gauge: &Gauge,
535 resource_attrs: Option<&Vec<KeyValue>>,
536 scope_attrs: Option<&Vec<KeyValue>>,
537 metric_ctx: &OtlpMetricCtx,
538) -> Result<()> {
539 let table = table_writer.get_or_default_table_data(
540 name,
541 APPROXIMATE_COLUMN_COUNT,
542 gauge.data_points.len(),
543 );
544
545 for data_point in &gauge.data_points {
546 let mut row = table.alloc_one_row();
547 write_tags_and_timestamp(
548 table,
549 &mut row,
550 resource_attrs,
551 scope_attrs,
552 Some(data_point.attributes.as_ref()),
553 data_point.time_unix_nano as i64,
554 metric_ctx,
555 )?;
556
557 write_data_point_value(table, &mut row, greptime_value(), &data_point.value)?;
558 table.add_row(row);
559 }
560
561 Ok(())
562}
563
564fn encode_sum(
568 table_writer: &mut MultiTableData,
569 name: &str,
570 sum: &Sum,
571 resource_attrs: Option<&Vec<KeyValue>>,
572 scope_attrs: Option<&Vec<KeyValue>>,
573 metric_ctx: &OtlpMetricCtx,
574) -> Result<()> {
575 let table = table_writer.get_or_default_table_data(
576 name,
577 APPROXIMATE_COLUMN_COUNT,
578 sum.data_points.len(),
579 );
580
581 for data_point in &sum.data_points {
582 let mut row = table.alloc_one_row();
583 write_tags_and_timestamp(
584 table,
585 &mut row,
586 resource_attrs,
587 scope_attrs,
588 Some(data_point.attributes.as_ref()),
589 data_point.time_unix_nano as i64,
590 metric_ctx,
591 )?;
592 write_data_point_value(table, &mut row, greptime_value(), &data_point.value)?;
593 table.add_row(row);
594 }
595
596 Ok(())
597}
598
599const HISTOGRAM_LE_COLUMN: &str = "le";
600
601fn encode_histogram(
613 table_writer: &mut MultiTableData,
614 name: &str,
615 hist: &Histogram,
616 resource_attrs: Option<&Vec<KeyValue>>,
617 scope_attrs: Option<&Vec<KeyValue>>,
618 metric_ctx: &OtlpMetricCtx,
619) -> Result<()> {
620 let normalized_name = name;
621
622 let bucket_table_name = format!("{}{}", normalized_name, BUCKET_TABLE_SUFFIX);
623 let sum_table_name = format!("{}{}", normalized_name, SUM_TABLE_SUFFIX);
624 let count_table_name = format!("{}{}", normalized_name, COUNT_TABLE_SUFFIX);
625
626 let data_points_len = hist.data_points.len();
627 let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
629 let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
630 let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
631
632 for data_point in &hist.data_points {
633 let mut accumulated_count = 0;
634 for (idx, count) in data_point.bucket_counts.iter().enumerate() {
635 let mut bucket_row = bucket_table.alloc_one_row();
636 write_tags_and_timestamp(
637 &mut bucket_table,
638 &mut bucket_row,
639 resource_attrs,
640 scope_attrs,
641 Some(data_point.attributes.as_ref()),
642 data_point.time_unix_nano as i64,
643 metric_ctx,
644 )?;
645
646 if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
647 row_writer::write_tag(
648 &mut bucket_table,
649 HISTOGRAM_LE_COLUMN,
650 upper_bounds,
651 &mut bucket_row,
652 )?;
653 } else if idx == data_point.explicit_bounds.len() {
654 row_writer::write_tag(
656 &mut bucket_table,
657 HISTOGRAM_LE_COLUMN,
658 f64::INFINITY,
659 &mut bucket_row,
660 )?;
661 }
662
663 accumulated_count += count;
664 row_writer::write_f64(
665 &mut bucket_table,
666 greptime_value(),
667 accumulated_count as f64,
668 &mut bucket_row,
669 )?;
670
671 bucket_table.add_row(bucket_row);
672 }
673
674 if let Some(sum) = data_point.sum {
675 let mut sum_row = sum_table.alloc_one_row();
676 write_tags_and_timestamp(
677 &mut sum_table,
678 &mut sum_row,
679 resource_attrs,
680 scope_attrs,
681 Some(data_point.attributes.as_ref()),
682 data_point.time_unix_nano as i64,
683 metric_ctx,
684 )?;
685
686 row_writer::write_f64(&mut sum_table, greptime_value(), sum, &mut sum_row)?;
687 sum_table.add_row(sum_row);
688 }
689
690 let mut count_row = count_table.alloc_one_row();
691 write_tags_and_timestamp(
692 &mut count_table,
693 &mut count_row,
694 resource_attrs,
695 scope_attrs,
696 Some(data_point.attributes.as_ref()),
697 data_point.time_unix_nano as i64,
698 metric_ctx,
699 )?;
700
701 row_writer::write_f64(
702 &mut count_table,
703 greptime_value(),
704 data_point.count as f64,
705 &mut count_row,
706 )?;
707 count_table.add_row(count_row);
708 }
709
710 table_writer.add_table_data(bucket_table_name, bucket_table);
711 table_writer.add_table_data(sum_table_name, sum_table);
712 table_writer.add_table_data(count_table_name, count_table);
713
714 Ok(())
715}
716
717#[allow(dead_code)]
718fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
719 Ok(())
721}
722
723fn encode_summary(
724 table_writer: &mut MultiTableData,
725 name: &str,
726 summary: &Summary,
727 resource_attrs: Option<&Vec<KeyValue>>,
728 scope_attrs: Option<&Vec<KeyValue>>,
729 metric_ctx: &OtlpMetricCtx,
730) -> Result<()> {
731 if metric_ctx.is_legacy {
732 let table = table_writer.get_or_default_table_data(
733 name,
734 APPROXIMATE_COLUMN_COUNT,
735 summary.data_points.len(),
736 );
737
738 for data_point in &summary.data_points {
739 let mut row = table.alloc_one_row();
740 write_tags_and_timestamp(
741 table,
742 &mut row,
743 resource_attrs,
744 scope_attrs,
745 Some(data_point.attributes.as_ref()),
746 data_point.time_unix_nano as i64,
747 metric_ctx,
748 )?;
749
750 for quantile in &data_point.quantile_values {
751 row_writer::write_f64(
752 table,
753 format!("greptime_p{:02}", quantile.quantile * 100f64),
754 quantile.value,
755 &mut row,
756 )?;
757 }
758
759 row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
760 table.add_row(row);
761 }
762 } else {
763 let metric_name = name;
768 let count_name = format!("{}{}", metric_name, COUNT_TABLE_SUFFIX);
769 let sum_name = format!("{}{}", metric_name, SUM_TABLE_SUFFIX);
770
771 for data_point in &summary.data_points {
772 {
773 let quantile_table = table_writer.get_or_default_table_data(
774 metric_name,
775 APPROXIMATE_COLUMN_COUNT,
776 summary.data_points.len(),
777 );
778
779 for quantile in &data_point.quantile_values {
780 let mut row = quantile_table.alloc_one_row();
781 write_tags_and_timestamp(
782 quantile_table,
783 &mut row,
784 resource_attrs,
785 scope_attrs,
786 Some(data_point.attributes.as_ref()),
787 data_point.time_unix_nano as i64,
788 metric_ctx,
789 )?;
790 row_writer::write_tag(quantile_table, "quantile", quantile.quantile, &mut row)?;
791 row_writer::write_f64(
792 quantile_table,
793 greptime_value(),
794 quantile.value,
795 &mut row,
796 )?;
797 quantile_table.add_row(row);
798 }
799 }
800 {
801 let count_table = table_writer.get_or_default_table_data(
802 &count_name,
803 APPROXIMATE_COLUMN_COUNT,
804 summary.data_points.len(),
805 );
806 let mut row = count_table.alloc_one_row();
807 write_tags_and_timestamp(
808 count_table,
809 &mut row,
810 resource_attrs,
811 scope_attrs,
812 Some(data_point.attributes.as_ref()),
813 data_point.time_unix_nano as i64,
814 metric_ctx,
815 )?;
816
817 row_writer::write_f64(
818 count_table,
819 greptime_value(),
820 data_point.count as f64,
821 &mut row,
822 )?;
823
824 count_table.add_row(row);
825 }
826 {
827 let sum_table = table_writer.get_or_default_table_data(
828 &sum_name,
829 APPROXIMATE_COLUMN_COUNT,
830 summary.data_points.len(),
831 );
832
833 let mut row = sum_table.alloc_one_row();
834 write_tags_and_timestamp(
835 sum_table,
836 &mut row,
837 resource_attrs,
838 scope_attrs,
839 Some(data_point.attributes.as_ref()),
840 data_point.time_unix_nano as i64,
841 metric_ctx,
842 )?;
843
844 row_writer::write_f64(sum_table, greptime_value(), data_point.sum, &mut row)?;
845
846 sum_table.add_row(row);
847 }
848 }
849 }
850
851 Ok(())
852}
853
854#[cfg(test)]
855mod tests {
856 use otel_arrow_rust::proto::opentelemetry::common::v1::AnyValue;
857 use otel_arrow_rust::proto::opentelemetry::common::v1::any_value::Value as Val;
858 use otel_arrow_rust::proto::opentelemetry::metrics::v1::number_data_point::Value;
859 use otel_arrow_rust::proto::opentelemetry::metrics::v1::summary_data_point::ValueAtQuantile;
860 use otel_arrow_rust::proto::opentelemetry::metrics::v1::{
861 AggregationTemporality, HistogramDataPoint, NumberDataPoint, SummaryDataPoint,
862 };
863
864 use super::*;
865
866 fn keyvalue(key: &str, value: &str) -> KeyValue {
867 KeyValue {
868 key: key.into(),
869 value: Some(AnyValue {
870 value: Some(Val::StringValue(value.into())),
871 }),
872 }
873 }
874
875 #[test]
876 fn test_encode_gauge() {
877 let mut tables = MultiTableData::default();
878
879 let data_points = vec![
880 NumberDataPoint {
881 attributes: vec![keyvalue("host", "testsevrer")],
882 time_unix_nano: 100,
883 value: Some(Value::AsInt(100)),
884 ..Default::default()
885 },
886 NumberDataPoint {
887 attributes: vec![keyvalue("host", "testserver")],
888 time_unix_nano: 105,
889 value: Some(Value::AsInt(105)),
890 ..Default::default()
891 },
892 ];
893 let gauge = Gauge { data_points };
894 encode_gauge(
895 &mut tables,
896 "datamon",
897 &gauge,
898 Some(&vec![]),
899 Some(&vec![keyvalue("scope", "otel")]),
900 &OtlpMetricCtx::default(),
901 )
902 .unwrap();
903
904 let table = tables.get_or_default_table_data("datamon", 0, 0);
905 assert_eq!(table.num_rows(), 2);
906 assert_eq!(table.num_columns(), 4);
907 assert_eq!(
908 table
909 .columns()
910 .iter()
911 .map(|c| &c.column_name)
912 .collect::<Vec<&String>>(),
913 vec![
914 "otel_scope_scope",
915 "host",
916 greptime_timestamp(),
917 greptime_value()
918 ]
919 );
920 }
921
922 #[test]
923 fn test_encode_sum() {
924 let mut tables = MultiTableData::default();
925
926 let data_points = vec![
927 NumberDataPoint {
928 attributes: vec![keyvalue("host", "testserver")],
929 time_unix_nano: 100,
930 value: Some(Value::AsInt(100)),
931 ..Default::default()
932 },
933 NumberDataPoint {
934 attributes: vec![keyvalue("host", "testserver")],
935 time_unix_nano: 105,
936 value: Some(Value::AsInt(0)),
937 ..Default::default()
938 },
939 ];
940 let sum = Sum {
941 data_points,
942 ..Default::default()
943 };
944 encode_sum(
945 &mut tables,
946 "datamon",
947 &sum,
948 Some(&vec![]),
949 Some(&vec![keyvalue("scope", "otel")]),
950 &OtlpMetricCtx::default(),
951 )
952 .unwrap();
953
954 let table = tables.get_or_default_table_data("datamon", 0, 0);
955 assert_eq!(table.num_rows(), 2);
956 assert_eq!(table.num_columns(), 4);
957 assert_eq!(
958 table
959 .columns()
960 .iter()
961 .map(|c| &c.column_name)
962 .collect::<Vec<&String>>(),
963 vec![
964 "otel_scope_scope",
965 "host",
966 greptime_timestamp(),
967 greptime_value()
968 ]
969 );
970 }
971
972 #[test]
973 fn test_encode_summary() {
974 let mut tables = MultiTableData::default();
975
976 let data_points = vec![SummaryDataPoint {
977 attributes: vec![keyvalue("host", "testserver")],
978 time_unix_nano: 100,
979 count: 25,
980 sum: 5400.0,
981 quantile_values: vec![
982 ValueAtQuantile {
983 quantile: 0.90,
984 value: 1000.0,
985 },
986 ValueAtQuantile {
987 quantile: 0.95,
988 value: 3030.0,
989 },
990 ],
991 ..Default::default()
992 }];
993 let summary = Summary { data_points };
994 encode_summary(
995 &mut tables,
996 "datamon",
997 &summary,
998 Some(&vec![]),
999 Some(&vec![keyvalue("scope", "otel")]),
1000 &OtlpMetricCtx::default(),
1001 )
1002 .unwrap();
1003
1004 let table = tables.get_or_default_table_data("datamon", 0, 0);
1005 assert_eq!(table.num_rows(), 2);
1006 assert_eq!(table.num_columns(), 5);
1007 assert_eq!(
1008 table
1009 .columns()
1010 .iter()
1011 .map(|c| &c.column_name)
1012 .collect::<Vec<&String>>(),
1013 vec![
1014 "otel_scope_scope",
1015 "host",
1016 greptime_timestamp(),
1017 "quantile",
1018 greptime_value()
1019 ]
1020 );
1021
1022 let table = tables.get_or_default_table_data("datamon_count", 0, 0);
1023 assert_eq!(table.num_rows(), 1);
1024 assert_eq!(table.num_columns(), 4);
1025 assert_eq!(
1026 table
1027 .columns()
1028 .iter()
1029 .map(|c| &c.column_name)
1030 .collect::<Vec<&String>>(),
1031 vec![
1032 "otel_scope_scope",
1033 "host",
1034 greptime_timestamp(),
1035 greptime_value()
1036 ]
1037 );
1038
1039 let table = tables.get_or_default_table_data("datamon_sum", 0, 0);
1040 assert_eq!(table.num_rows(), 1);
1041 assert_eq!(table.num_columns(), 4);
1042 assert_eq!(
1043 table
1044 .columns()
1045 .iter()
1046 .map(|c| &c.column_name)
1047 .collect::<Vec<&String>>(),
1048 vec![
1049 "otel_scope_scope",
1050 "host",
1051 greptime_timestamp(),
1052 greptime_value()
1053 ]
1054 );
1055 }
1056
1057 #[test]
1058 fn test_encode_histogram() {
1059 let mut tables = MultiTableData::default();
1060
1061 let data_points = vec![HistogramDataPoint {
1062 attributes: vec![keyvalue("host", "testserver")],
1063 time_unix_nano: 100,
1064 start_time_unix_nano: 23,
1065 count: 25,
1066 sum: Some(100.),
1067 max: Some(200.),
1068 min: Some(0.03),
1069 bucket_counts: vec![2, 4, 6, 9, 4],
1070 explicit_bounds: vec![0.1, 1., 10., 100.],
1071 ..Default::default()
1072 }];
1073
1074 let histogram = Histogram {
1075 data_points,
1076 aggregation_temporality: AggregationTemporality::Delta.into(),
1077 };
1078 encode_histogram(
1079 &mut tables,
1080 "histo",
1081 &histogram,
1082 Some(&vec![]),
1083 Some(&vec![keyvalue("scope", "otel")]),
1084 &OtlpMetricCtx::default(),
1085 )
1086 .unwrap();
1087
1088 assert_eq!(3, tables.num_tables());
1089
1090 let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
1092 assert_eq!(bucket_table.num_rows(), 5);
1093 assert_eq!(bucket_table.num_columns(), 5);
1094 assert_eq!(
1095 bucket_table
1096 .columns()
1097 .iter()
1098 .map(|c| &c.column_name)
1099 .collect::<Vec<&String>>(),
1100 vec![
1101 "otel_scope_scope",
1102 "host",
1103 greptime_timestamp(),
1104 "le",
1105 greptime_value(),
1106 ]
1107 );
1108
1109 let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
1110 assert_eq!(sum_table.num_rows(), 1);
1111 assert_eq!(sum_table.num_columns(), 4);
1112 assert_eq!(
1113 sum_table
1114 .columns()
1115 .iter()
1116 .map(|c| &c.column_name)
1117 .collect::<Vec<&String>>(),
1118 vec![
1119 "otel_scope_scope",
1120 "host",
1121 greptime_timestamp(),
1122 greptime_value()
1123 ]
1124 );
1125
1126 let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
1127 assert_eq!(count_table.num_rows(), 1);
1128 assert_eq!(count_table.num_columns(), 4);
1129 assert_eq!(
1130 count_table
1131 .columns()
1132 .iter()
1133 .map(|c| &c.column_name)
1134 .collect::<Vec<&String>>(),
1135 vec![
1136 "otel_scope_scope",
1137 "host",
1138 greptime_timestamp(),
1139 greptime_value()
1140 ]
1141 );
1142 }
1143
1144 use std::collections::BTreeMap;
1145
1146 use table::requests::validate_semantic_option;
1147
1148 fn decode(index: &SemanticIndex) -> BTreeMap<String, BTreeMap<String, String>> {
1149 serde_json::from_str(&index.encode().expect("non-empty index")).unwrap()
1150 }
1151
1152 fn record(metric: &Metric, metric_type: MetricType, name: &str) -> SemanticIndex {
1153 let ctx = OtlpMetricCtx {
1154 metric_type,
1155 ..Default::default()
1156 };
1157 let mut index = SemanticIndex::default();
1158 record_metric_semantics(&mut index, metric, name, &ctx);
1159 index
1160 }
1161
1162 #[test]
1163 fn test_metric_type_constants_validate() {
1164 for value in [
1165 METRIC_TYPE_COUNTER,
1166 METRIC_TYPE_UPDOWN_COUNTER,
1167 METRIC_TYPE_GAUGE,
1168 METRIC_TYPE_HISTOGRAM,
1169 METRIC_TYPE_SUMMARY,
1170 ] {
1171 assert!(
1172 validate_semantic_option(SEMANTIC_METRIC_TYPE, value),
1173 "metric.type value `{value}` must be in the vocabulary domain"
1174 );
1175 }
1176 for value in ["delta", "cumulative"] {
1177 assert!(validate_semantic_option(SEMANTIC_METRIC_TEMPORALITY, value));
1178 }
1179 }
1180
1181 #[test]
1182 fn test_record_monotonic_sum() {
1183 let metric = Metric {
1184 name: "claude_code.cost.usage".to_string(),
1185 unit: "USD".to_string(),
1186 data: Some(metric::Data::Sum(Sum {
1187 aggregation_temporality: AggregationTemporality::Delta as i32,
1188 is_monotonic: true,
1189 ..Default::default()
1190 })),
1191 ..Default::default()
1192 };
1193 let index = record(
1194 &metric,
1195 MetricType::MonotonicSum,
1196 "claude_code_cost_usage_USD_total",
1197 );
1198 let decoded = decode(&index);
1199 let t = &decoded["claude_code_cost_usage_USD_total"];
1200
1201 assert_eq!(
1202 t.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
1203 Some("counter")
1204 );
1205 assert_eq!(
1206 t.get(SEMANTIC_METRIC_TEMPORALITY).map(String::as_str),
1207 Some("delta")
1208 );
1209 assert_eq!(t.get(SEMANTIC_METRIC_UNIT).map(String::as_str), Some("USD"));
1210 assert_eq!(
1211 t.get(SEMANTIC_METRIC_ORIGINAL_NAME).map(String::as_str),
1212 Some("claude_code.cost.usage")
1213 );
1214 assert_eq!(
1215 t.get(SEMANTIC_METRIC_METADATA_QUALITY).map(String::as_str),
1216 Some("declared")
1217 );
1218 }
1219
1220 #[test]
1221 fn test_record_non_monotonic_sum() {
1222 let metric = Metric {
1223 name: "queue_size".to_string(),
1224 data: Some(metric::Data::Sum(Sum {
1225 aggregation_temporality: AggregationTemporality::Cumulative as i32,
1226 is_monotonic: false,
1227 ..Default::default()
1228 })),
1229 ..Default::default()
1230 };
1231 let index = record(&metric, MetricType::NonMonotonicSum, "queue_size");
1232 let decoded = decode(&index);
1233 let t = &decoded["queue_size"];
1234 assert_eq!(
1235 t.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
1236 Some("updown_counter")
1237 );
1238 assert_eq!(
1239 t.get(SEMANTIC_METRIC_TEMPORALITY).map(String::as_str),
1240 Some("cumulative")
1241 );
1242 assert_eq!(t.get(SEMANTIC_METRIC_ORIGINAL_NAME), None);
1244 }
1245
1246 #[test]
1247 fn test_record_gauge_has_no_temporality() {
1248 let metric = Metric {
1249 name: "temperature".to_string(),
1250 data: Some(metric::Data::Gauge(Gauge::default())),
1251 ..Default::default()
1252 };
1253 let index = record(&metric, MetricType::Gauge, "temperature");
1254 let decoded = decode(&index);
1255 let t = &decoded["temperature"];
1256 assert_eq!(
1257 t.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
1258 Some("gauge")
1259 );
1260 assert_eq!(t.get(SEMANTIC_METRIC_TEMPORALITY), None);
1261 }
1262
1263 #[test]
1264 fn test_record_histogram_fans_out_with_distinct_types() {
1265 let metric = Metric {
1266 name: "request.duration".to_string(),
1267 unit: "s".to_string(),
1268 data: Some(metric::Data::Histogram(Histogram {
1269 aggregation_temporality: AggregationTemporality::Cumulative as i32,
1270 ..Default::default()
1271 })),
1272 ..Default::default()
1273 };
1274 let index = record(&metric, MetricType::Histogram, "request_duration");
1275 let decoded = decode(&index);
1276
1277 let bucket = &decoded["request_duration_bucket"];
1278 assert_eq!(
1279 bucket.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
1280 Some("histogram")
1281 );
1282 assert_eq!(
1283 bucket.get(SEMANTIC_METRIC_UNIT).map(String::as_str),
1284 Some("s")
1285 );
1286
1287 for companion in ["request_duration_sum", "request_duration_count"] {
1288 let t = &decoded[companion];
1289 assert_eq!(
1290 t.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
1291 Some("counter")
1292 );
1293 assert_eq!(
1294 t.get(SEMANTIC_METRIC_TEMPORALITY).map(String::as_str),
1295 Some("cumulative")
1296 );
1297 }
1298 }
1299
1300 #[test]
1301 fn test_record_summary_fans_out() {
1302 let metric = Metric {
1303 name: "rpc.latency".to_string(),
1304 data: Some(metric::Data::Summary(Summary::default())),
1305 ..Default::default()
1306 };
1307 let index = record(&metric, MetricType::Summary, "rpc_latency");
1308 let decoded = decode(&index);
1309
1310 assert_eq!(
1311 decoded["rpc_latency"]
1312 .get(SEMANTIC_METRIC_TYPE)
1313 .map(String::as_str),
1314 Some("summary")
1315 );
1316 assert_eq!(
1318 decoded["rpc_latency"].get(SEMANTIC_METRIC_TEMPORALITY),
1319 None
1320 );
1321 for companion in ["rpc_latency_count", "rpc_latency_sum"] {
1322 assert_eq!(
1323 decoded[companion]
1324 .get(SEMANTIC_METRIC_TYPE)
1325 .map(String::as_str),
1326 Some("counter")
1327 );
1328 }
1329 }
1330}