1use api::v1::{RowInsertRequests, Value};
16use common_grpc::precision::Precision;
17use common_query::prelude::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
18use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
19use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue};
20use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *};
21
22use crate::error::Result;
23use crate::row_writer::{self, MultiTableData, TableData};
24
25const APPROXIMATE_COLUMN_COUNT: usize = 8;
27
28fn normalize_otlp_name(name: &str) -> String {
35 name.to_lowercase().replace(['.', '-'], "_")
36}
37
38pub fn to_grpc_insert_requests(
46 request: ExportMetricsServiceRequest,
47) -> Result<(RowInsertRequests, usize)> {
48 let mut table_writer = MultiTableData::default();
49
50 for resource in &request.resource_metrics {
51 let resource_attrs = resource.resource.as_ref().map(|r| &r.attributes);
52 for scope in &resource.scope_metrics {
53 let scope_attrs = scope.scope.as_ref().map(|s| &s.attributes);
54 for metric in &scope.metrics {
55 encode_metrics(&mut table_writer, metric, resource_attrs, scope_attrs)?;
56 }
57 }
58 }
59
60 Ok(table_writer.into_row_insert_requests())
61}
62
63fn encode_metrics(
64 table_writer: &mut MultiTableData,
65 metric: &Metric,
66 resource_attrs: Option<&Vec<KeyValue>>,
67 scope_attrs: Option<&Vec<KeyValue>>,
68) -> Result<()> {
69 let name = &metric.name;
70 if let Some(data) = &metric.data {
73 match data {
74 metric::Data::Gauge(gauge) => {
75 encode_gauge(table_writer, name, gauge, resource_attrs, scope_attrs)?;
76 }
77 metric::Data::Sum(sum) => {
78 encode_sum(table_writer, name, sum, resource_attrs, scope_attrs)?;
79 }
80 metric::Data::Summary(summary) => {
81 encode_summary(table_writer, name, summary, resource_attrs, scope_attrs)?;
82 }
83 metric::Data::Histogram(hist) => {
84 encode_histogram(table_writer, name, hist, resource_attrs, scope_attrs)?;
85 }
86 metric::Data::ExponentialHistogram(_hist) => {}
88 }
89 }
90
91 Ok(())
92}
93
94fn write_attributes(
95 writer: &mut TableData,
96 row: &mut Vec<Value>,
97 attrs: Option<&Vec<KeyValue>>,
98) -> Result<()> {
99 if let Some(attrs) = attrs {
100 let table_tags = attrs.iter().filter_map(|attr| {
101 if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) {
102 let key = normalize_otlp_name(&attr.key);
103 match val {
104 any_value::Value::StringValue(s) => Some((key, s.to_string())),
105 any_value::Value::IntValue(v) => Some((key, v.to_string())),
106 any_value::Value::DoubleValue(v) => Some((key, v.to_string())),
107 _ => None, }
109 } else {
110 None
111 }
112 });
113
114 row_writer::write_tags(writer, table_tags, row)?;
115 }
116 Ok(())
117}
118
119fn write_timestamp(table: &mut TableData, row: &mut Vec<Value>, time_nano: i64) -> Result<()> {
120 row_writer::write_ts_to_nanos(
121 table,
122 GREPTIME_TIMESTAMP,
123 Some(time_nano),
124 Precision::Nanosecond,
125 row,
126 )
127}
128
129fn write_data_point_value(
130 table: &mut TableData,
131 row: &mut Vec<Value>,
132 field: &str,
133 value: &Option<number_data_point::Value>,
134) -> Result<()> {
135 match value {
136 Some(number_data_point::Value::AsInt(val)) => {
137 row_writer::write_f64(table, field, *val as f64, row)?;
139 }
140 Some(number_data_point::Value::AsDouble(val)) => {
141 row_writer::write_f64(table, field, *val, row)?;
142 }
143 _ => {}
144 }
145 Ok(())
146}
147
148fn write_tags_and_timestamp(
149 table: &mut TableData,
150 row: &mut Vec<Value>,
151 resource_attrs: Option<&Vec<KeyValue>>,
152 scope_attrs: Option<&Vec<KeyValue>>,
153 data_point_attrs: Option<&Vec<KeyValue>>,
154 timestamp_nanos: i64,
155) -> Result<()> {
156 write_attributes(table, row, resource_attrs)?;
157 write_attributes(table, row, scope_attrs)?;
158 write_attributes(table, row, data_point_attrs)?;
159
160 write_timestamp(table, row, timestamp_nanos)?;
161
162 Ok(())
163}
164
165fn encode_gauge(
170 table_writer: &mut MultiTableData,
171 name: &str,
172 gauge: &Gauge,
173 resource_attrs: Option<&Vec<KeyValue>>,
174 scope_attrs: Option<&Vec<KeyValue>>,
175) -> Result<()> {
176 let table = table_writer.get_or_default_table_data(
177 normalize_otlp_name(name),
178 APPROXIMATE_COLUMN_COUNT,
179 gauge.data_points.len(),
180 );
181
182 for data_point in &gauge.data_points {
183 let mut row = table.alloc_one_row();
184 write_tags_and_timestamp(
185 table,
186 &mut row,
187 resource_attrs,
188 scope_attrs,
189 Some(data_point.attributes.as_ref()),
190 data_point.time_unix_nano as i64,
191 )?;
192
193 write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
194 table.add_row(row);
195 }
196
197 Ok(())
198}
199
200fn encode_sum(
204 table_writer: &mut MultiTableData,
205 name: &str,
206 sum: &Sum,
207 resource_attrs: Option<&Vec<KeyValue>>,
208 scope_attrs: Option<&Vec<KeyValue>>,
209) -> Result<()> {
210 let table = table_writer.get_or_default_table_data(
211 normalize_otlp_name(name),
212 APPROXIMATE_COLUMN_COUNT,
213 sum.data_points.len(),
214 );
215
216 for data_point in &sum.data_points {
217 let mut row = table.alloc_one_row();
218 write_tags_and_timestamp(
219 table,
220 &mut row,
221 resource_attrs,
222 scope_attrs,
223 Some(data_point.attributes.as_ref()),
224 data_point.time_unix_nano as i64,
225 )?;
226 write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?;
227 table.add_row(row);
228 }
229
230 Ok(())
231}
232
233const HISTOGRAM_LE_COLUMN: &str = "le";
234
235fn encode_histogram(
247 table_writer: &mut MultiTableData,
248 name: &str,
249 hist: &Histogram,
250 resource_attrs: Option<&Vec<KeyValue>>,
251 scope_attrs: Option<&Vec<KeyValue>>,
252) -> Result<()> {
253 let normalized_name = normalize_otlp_name(name);
254
255 let bucket_table_name = format!("{}_bucket", normalized_name);
256 let sum_table_name = format!("{}_sum", normalized_name);
257 let count_table_name = format!("{}_count", normalized_name);
258
259 let data_points_len = hist.data_points.len();
260 let mut bucket_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len * 3);
262 let mut sum_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
263 let mut count_table = TableData::new(APPROXIMATE_COLUMN_COUNT, data_points_len);
264
265 for data_point in &hist.data_points {
266 let mut accumulated_count = 0;
267 for (idx, count) in data_point.bucket_counts.iter().enumerate() {
268 let mut bucket_row = bucket_table.alloc_one_row();
269 write_tags_and_timestamp(
270 &mut bucket_table,
271 &mut bucket_row,
272 resource_attrs,
273 scope_attrs,
274 Some(data_point.attributes.as_ref()),
275 data_point.time_unix_nano as i64,
276 )?;
277
278 if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) {
279 row_writer::write_tag(
280 &mut bucket_table,
281 HISTOGRAM_LE_COLUMN,
282 upper_bounds,
283 &mut bucket_row,
284 )?;
285 } else if idx == data_point.explicit_bounds.len() {
286 row_writer::write_tag(
288 &mut bucket_table,
289 HISTOGRAM_LE_COLUMN,
290 f64::INFINITY,
291 &mut bucket_row,
292 )?;
293 }
294
295 accumulated_count += count;
296 row_writer::write_f64(
297 &mut bucket_table,
298 GREPTIME_VALUE,
299 accumulated_count as f64,
300 &mut bucket_row,
301 )?;
302
303 bucket_table.add_row(bucket_row);
304 }
305
306 if let Some(sum) = data_point.sum {
307 let mut sum_row = sum_table.alloc_one_row();
308 write_tags_and_timestamp(
309 &mut sum_table,
310 &mut sum_row,
311 resource_attrs,
312 scope_attrs,
313 Some(data_point.attributes.as_ref()),
314 data_point.time_unix_nano as i64,
315 )?;
316
317 row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?;
318 sum_table.add_row(sum_row);
319 }
320
321 let mut count_row = count_table.alloc_one_row();
322 write_tags_and_timestamp(
323 &mut count_table,
324 &mut count_row,
325 resource_attrs,
326 scope_attrs,
327 Some(data_point.attributes.as_ref()),
328 data_point.time_unix_nano as i64,
329 )?;
330
331 row_writer::write_f64(
332 &mut count_table,
333 GREPTIME_VALUE,
334 data_point.count as f64,
335 &mut count_row,
336 )?;
337 count_table.add_row(count_row);
338 }
339
340 table_writer.add_table_data(bucket_table_name, bucket_table);
341 table_writer.add_table_data(sum_table_name, sum_table);
342 table_writer.add_table_data(count_table_name, count_table);
343
344 Ok(())
345}
346
347#[allow(dead_code)]
348fn encode_exponential_histogram(_name: &str, _hist: &ExponentialHistogram) -> Result<()> {
349 Ok(())
351}
352
353fn encode_summary(
354 table_writer: &mut MultiTableData,
355 name: &str,
356 summary: &Summary,
357 resource_attrs: Option<&Vec<KeyValue>>,
358 scope_attrs: Option<&Vec<KeyValue>>,
359) -> Result<()> {
360 let table = table_writer.get_or_default_table_data(
361 normalize_otlp_name(name),
362 APPROXIMATE_COLUMN_COUNT,
363 summary.data_points.len(),
364 );
365
366 for data_point in &summary.data_points {
367 let mut row = table.alloc_one_row();
368 write_tags_and_timestamp(
369 table,
370 &mut row,
371 resource_attrs,
372 scope_attrs,
373 Some(data_point.attributes.as_ref()),
374 data_point.time_unix_nano as i64,
375 )?;
376
377 for quantile in &data_point.quantile_values {
378 row_writer::write_f64(
379 table,
380 format!("greptime_p{:02}", quantile.quantile * 100f64),
381 quantile.value,
382 &mut row,
383 )?;
384 }
385
386 row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?;
387 table.add_row(row);
388 }
389
390 Ok(())
391}
392
393#[cfg(test)]
394mod tests {
395 use opentelemetry_proto::tonic::common::v1::any_value::Value as Val;
396 use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
397 use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value;
398 use opentelemetry_proto::tonic::metrics::v1::summary_data_point::ValueAtQuantile;
399 use opentelemetry_proto::tonic::metrics::v1::{HistogramDataPoint, NumberDataPoint};
400
401 use super::*;
402
403 #[test]
404 fn test_normalize_otlp_name() {
405 assert_eq!(normalize_otlp_name("jvm.memory.free"), "jvm_memory_free");
406 assert_eq!(normalize_otlp_name("jvm-memory-free"), "jvm_memory_free");
407 assert_eq!(normalize_otlp_name("jvm_memory_free"), "jvm_memory_free");
408 assert_eq!(normalize_otlp_name("JVM_MEMORY_FREE"), "jvm_memory_free");
409 assert_eq!(normalize_otlp_name("JVM_memory_FREE"), "jvm_memory_free");
410 }
411
412 fn keyvalue(key: &str, value: &str) -> KeyValue {
413 KeyValue {
414 key: key.into(),
415 value: Some(AnyValue {
416 value: Some(Val::StringValue(value.into())),
417 }),
418 }
419 }
420
421 #[test]
422 fn test_encode_gauge() {
423 let mut tables = MultiTableData::default();
424
425 let data_points = vec![
426 NumberDataPoint {
427 attributes: vec![keyvalue("host", "testsevrer")],
428 time_unix_nano: 100,
429 value: Some(Value::AsInt(100)),
430 ..Default::default()
431 },
432 NumberDataPoint {
433 attributes: vec![keyvalue("host", "testserver")],
434 time_unix_nano: 105,
435 value: Some(Value::AsInt(105)),
436 ..Default::default()
437 },
438 ];
439 let gauge = Gauge { data_points };
440 encode_gauge(
441 &mut tables,
442 "datamon",
443 &gauge,
444 Some(&vec![keyvalue("resource", "app")]),
445 Some(&vec![keyvalue("scope", "otel")]),
446 )
447 .unwrap();
448
449 let table = tables.get_or_default_table_data("datamon", 0, 0);
450 assert_eq!(table.num_rows(), 2);
451 assert_eq!(table.num_columns(), 5);
452 assert_eq!(
453 table
454 .columns()
455 .iter()
456 .map(|c| &c.column_name)
457 .collect::<Vec<&String>>(),
458 vec![
459 "resource",
460 "scope",
461 "host",
462 "greptime_timestamp",
463 "greptime_value"
464 ]
465 );
466 }
467
468 #[test]
469 fn test_encode_sum() {
470 let mut tables = MultiTableData::default();
471
472 let data_points = vec![
473 NumberDataPoint {
474 attributes: vec![keyvalue("host", "testserver")],
475 time_unix_nano: 100,
476 value: Some(Value::AsInt(100)),
477 ..Default::default()
478 },
479 NumberDataPoint {
480 attributes: vec![keyvalue("host", "testserver")],
481 time_unix_nano: 105,
482 value: Some(Value::AsInt(0)),
483 ..Default::default()
484 },
485 ];
486 let sum = Sum {
487 data_points,
488 ..Default::default()
489 };
490 encode_sum(
491 &mut tables,
492 "datamon",
493 &sum,
494 Some(&vec![keyvalue("resource", "app")]),
495 Some(&vec![keyvalue("scope", "otel")]),
496 )
497 .unwrap();
498
499 let table = tables.get_or_default_table_data("datamon", 0, 0);
500 assert_eq!(table.num_rows(), 2);
501 assert_eq!(table.num_columns(), 5);
502 assert_eq!(
503 table
504 .columns()
505 .iter()
506 .map(|c| &c.column_name)
507 .collect::<Vec<&String>>(),
508 vec![
509 "resource",
510 "scope",
511 "host",
512 "greptime_timestamp",
513 "greptime_value"
514 ]
515 );
516 }
517
518 #[test]
519 fn test_encode_summary() {
520 let mut tables = MultiTableData::default();
521
522 let data_points = vec![SummaryDataPoint {
523 attributes: vec![keyvalue("host", "testserver")],
524 time_unix_nano: 100,
525 count: 25,
526 sum: 5400.0,
527 quantile_values: vec![
528 ValueAtQuantile {
529 quantile: 0.90,
530 value: 1000.0,
531 },
532 ValueAtQuantile {
533 quantile: 0.95,
534 value: 3030.0,
535 },
536 ],
537 ..Default::default()
538 }];
539 let summary = Summary { data_points };
540 encode_summary(
541 &mut tables,
542 "datamon",
543 &summary,
544 Some(&vec![keyvalue("resource", "app")]),
545 Some(&vec![keyvalue("scope", "otel")]),
546 )
547 .unwrap();
548
549 let table = tables.get_or_default_table_data("datamon", 0, 0);
550 assert_eq!(table.num_rows(), 1);
551 assert_eq!(table.num_columns(), 7);
552 assert_eq!(
553 table
554 .columns()
555 .iter()
556 .map(|c| &c.column_name)
557 .collect::<Vec<&String>>(),
558 vec![
559 "resource",
560 "scope",
561 "host",
562 "greptime_timestamp",
563 "greptime_p90",
564 "greptime_p95",
565 "greptime_count"
566 ]
567 );
568 }
569
570 #[test]
571 fn test_encode_histogram() {
572 let mut tables = MultiTableData::default();
573
574 let data_points = vec![HistogramDataPoint {
575 attributes: vec![keyvalue("host", "testserver")],
576 time_unix_nano: 100,
577 start_time_unix_nano: 23,
578 count: 25,
579 sum: Some(100.),
580 max: Some(200.),
581 min: Some(0.03),
582 bucket_counts: vec![2, 4, 6, 9, 4],
583 explicit_bounds: vec![0.1, 1., 10., 100.],
584 ..Default::default()
585 }];
586
587 let histogram = Histogram {
588 data_points,
589 aggregation_temporality: AggregationTemporality::Delta.into(),
590 };
591 encode_histogram(
592 &mut tables,
593 "histo",
594 &histogram,
595 Some(&vec![keyvalue("resource", "app")]),
596 Some(&vec![keyvalue("scope", "otel")]),
597 )
598 .unwrap();
599
600 assert_eq!(3, tables.num_tables());
601
602 let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0);
604 assert_eq!(bucket_table.num_rows(), 5);
605 assert_eq!(bucket_table.num_columns(), 6);
606 assert_eq!(
607 bucket_table
608 .columns()
609 .iter()
610 .map(|c| &c.column_name)
611 .collect::<Vec<&String>>(),
612 vec![
613 "resource",
614 "scope",
615 "host",
616 "greptime_timestamp",
617 "le",
618 "greptime_value",
619 ]
620 );
621
622 let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0);
623 assert_eq!(sum_table.num_rows(), 1);
624 assert_eq!(sum_table.num_columns(), 5);
625 assert_eq!(
626 sum_table
627 .columns()
628 .iter()
629 .map(|c| &c.column_name)
630 .collect::<Vec<&String>>(),
631 vec![
632 "resource",
633 "scope",
634 "host",
635 "greptime_timestamp",
636 "greptime_value",
637 ]
638 );
639
640 let count_table = tables.get_or_default_table_data("histo_count", 0, 0);
641 assert_eq!(count_table.num_rows(), 1);
642 assert_eq!(count_table.num_columns(), 5);
643 assert_eq!(
644 count_table
645 .columns()
646 .iter()
647 .map(|c| &c.column_name)
648 .collect::<Vec<&String>>(),
649 vec![
650 "resource",
651 "scope",
652 "host",
653 "greptime_timestamp",
654 "greptime_value",
655 ]
656 );
657 }
658}