1use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests, Value};
19use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
22use pipeline::{GreptimePipelineParams, PipelineWay};
23use session::context::QueryContextRef;
24
25use crate::error::Result;
26use crate::otlp::trace::attributes::Attributes;
27use crate::otlp::trace::span::TraceSpan;
28use crate::otlp::trace::{
29 DURATION_NANO_COLUMN, KEY_SERVICE_NAME, PARENT_SPAN_ID_COLUMN, SCOPE_NAME_COLUMN,
30 SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN,
31 SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_MESSAGE_COLUMN,
32 TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN, TraceAuxData,
33};
34use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
35use crate::query_handler::PipelineHandlerRef;
36use crate::row_writer::{self, MultiTableData, TableData};
37
38const APPROXIMATE_COLUMN_COUNT: usize = 30;
39
40const MAX_TIMESTAMP: i64 = 4102444800000000000;
42
43pub fn v1_to_grpc_main_insert_requests(
48 spans: &[TraceSpan],
49 _pipeline: &PipelineWay,
50 _pipeline_params: &GreptimePipelineParams,
51 table_name: &str,
52 _query_ctx: &QueryContextRef,
53 _pipeline_handler: PipelineHandlerRef,
54) -> Result<(RowInsertRequests, usize)> {
55 let mut multi_table_writer = MultiTableData::default();
56 let trace_writer = build_trace_table_data(spans)?;
57 multi_table_writer.add_table_data(table_name, trace_writer);
58
59 Ok(multi_table_writer.into_row_insert_requests())
60}
61
62pub fn build_trace_table_data(spans: &[TraceSpan]) -> Result<TableData> {
64 let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
65 for span in spans.iter().cloned() {
66 write_span_to_row(&mut trace_writer, span)?;
67 }
68
69 Ok(trace_writer)
70}
71
72pub fn build_aux_table_requests(
74 aux_data: TraceAuxData,
75 table_name: &str,
76) -> Result<(RowInsertRequests, usize)> {
77 let mut multi_table_writer = MultiTableData::default();
78 let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
79 let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
80
81 write_trace_services_to_row(&mut trace_services_writer, aux_data.services)?;
82 write_trace_operations_to_row(&mut trace_operations_writer, aux_data.operations)?;
83
84 multi_table_writer.add_table_data(trace_services_table_name(table_name), trace_services_writer);
85 multi_table_writer.add_table_data(
86 trace_operations_table_name(table_name),
87 trace_operations_writer,
88 );
89
90 Ok(multi_table_writer.into_row_insert_requests())
91}
92
93pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
94 let mut row = writer.alloc_one_row();
95
96 row_writer::write_ts_to_nanos(
98 writer,
99 TIMESTAMP_COLUMN,
100 Some(span.start_in_nanosecond as i64),
101 Precision::Nanosecond,
102 &mut row,
103 )?;
104
105 let fields = vec![
107 make_column_data(
108 "timestamp_end",
109 ColumnDataType::TimestampNanosecond,
110 Some(ValueData::TimestampNanosecondValue(
111 span.end_in_nanosecond as i64,
112 )),
113 ),
114 make_column_data(
115 DURATION_NANO_COLUMN,
116 ColumnDataType::Uint64,
117 Some(ValueData::U64Value(
118 span.end_in_nanosecond - span.start_in_nanosecond,
119 )),
120 ),
121 make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
122 make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
123 make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
124 make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
125 make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
126 make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)),
127 make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)),
128 make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)),
129 make_string_column_data(SCOPE_NAME_COLUMN, Some(span.scope_name)),
130 make_string_column_data(SCOPE_VERSION_COLUMN, Some(span.scope_version)),
131 ];
132 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
133
134 if let Some(service_name) = span.service_name {
135 row_writer::write_tags(
136 writer,
137 std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
138 &mut row,
139 )?;
140 }
141
142 write_attributes(writer, "span_attributes", span.span_attributes, &mut row)?;
143 write_attributes(writer, "scope_attributes", span.scope_attributes, &mut row)?;
144 write_attributes(
145 writer,
146 "resource_attributes",
147 span.resource_attributes,
148 &mut row,
149 )?;
150
151 row_writer::write_json(
152 writer,
153 SPAN_EVENTS_COLUMN,
154 span.span_events.into(),
155 &mut row,
156 )?;
157 row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
158
159 writer.add_row(row);
160
161 Ok(())
162}
163
164fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
165 for service_name in services {
166 let mut row = writer.alloc_one_row();
167 row_writer::write_ts_to_nanos(
169 writer,
170 TIMESTAMP_COLUMN,
171 Some(MAX_TIMESTAMP),
172 Precision::Nanosecond,
173 &mut row,
174 )?;
175
176 row_writer::write_tags(
178 writer,
179 std::iter::once((SERVICE_NAME_COLUMN.to_string(), service_name)),
180 &mut row,
181 )?;
182 writer.add_row(row);
183 }
184
185 Ok(())
186}
187
188fn write_trace_operations_to_row(
189 writer: &mut TableData,
190 operations: HashSet<(String, String, String)>,
191) -> Result<()> {
192 for (service_name, span_name, span_kind) in operations {
193 let mut row = writer.alloc_one_row();
194 row_writer::write_ts_to_nanos(
196 writer,
197 TIMESTAMP_COLUMN,
198 Some(MAX_TIMESTAMP),
199 Precision::Nanosecond,
200 &mut row,
201 )?;
202
203 row_writer::write_tags(
205 writer,
206 vec![
207 (SERVICE_NAME_COLUMN.to_string(), service_name),
208 (SPAN_NAME_COLUMN.to_string(), span_name),
209 (SPAN_KIND_COLUMN.to_string(), span_kind),
210 ]
211 .into_iter(),
212 &mut row,
213 )?;
214 writer.add_row(row);
215 }
216
217 Ok(())
218}
219
220pub(crate) fn write_attributes(
221 writer: &mut TableData,
222 prefix: &str,
223 attributes: Attributes,
224 row: &mut Vec<Value>,
225) -> Result<()> {
226 for attr in attributes.take().into_iter() {
227 let key_suffix = attr.key;
228 if prefix == "resource_attributes" && key_suffix == KEY_SERVICE_NAME {
231 continue;
232 }
233
234 let key = format!("{}.{}", prefix, key_suffix);
235 match attr.value.and_then(|v| v.value) {
236 Some(OtlpValue::StringValue(v)) => {
237 writer.write_field_unchecked(
240 &key,
241 ColumnDataType::String,
242 Some(ValueData::StringValue(v)),
243 row,
244 );
245 }
246 Some(OtlpValue::BoolValue(v)) => {
247 writer.write_field_unchecked(
249 &key,
250 ColumnDataType::Boolean,
251 Some(ValueData::BoolValue(v)),
252 row,
253 );
254 }
255 Some(OtlpValue::IntValue(v)) => {
256 writer.write_field_unchecked(
258 &key,
259 ColumnDataType::Int64,
260 Some(ValueData::I64Value(v)),
261 row,
262 );
263 }
264 Some(OtlpValue::DoubleValue(v)) => {
265 writer.write_field_unchecked(
266 &key,
267 ColumnDataType::Float64,
268 Some(ValueData::F64Value(v)),
269 row,
270 );
271 }
272 Some(OtlpValue::ArrayValue(v)) => row_writer::write_json(
273 writer,
274 key,
275 any_value_to_jsonb(OtlpValue::ArrayValue(v)),
276 row,
277 )?,
278 Some(OtlpValue::KvlistValue(v)) => row_writer::write_json(
279 writer,
280 key,
281 any_value_to_jsonb(OtlpValue::KvlistValue(v)),
282 row,
283 )?,
284 Some(OtlpValue::BytesValue(v)) => {
285 row_writer::write_fields(
286 writer,
287 std::iter::once(make_column_data(
288 &key,
289 ColumnDataType::Binary,
290 Some(ValueData::BinaryValue(v)),
291 )),
292 row,
293 )?;
294 }
295 None => {}
296 }
297 }
298
299 Ok(())
300}
301
302#[cfg(test)]
303mod tests {
304 use api::v1::value::ValueData;
305 use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
306 use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
307
308 use super::*;
309 use crate::otlp::trace::TraceAuxData;
310 use crate::otlp::trace::attributes::Attributes;
311 use crate::otlp::trace::span::{SpanEvents, SpanLinks};
312 use crate::row_writer::TableData;
313
314 fn make_kv(key: &str, value: OtlpValue) -> KeyValue {
315 KeyValue {
316 key: key.to_string(),
317 value: Some(AnyValue { value: Some(value) }),
318 }
319 }
320
321 fn make_span(service_name: &str, trace_id: &str, span_id: &str) -> TraceSpan {
322 TraceSpan {
323 service_name: Some(service_name.to_string()),
324 trace_id: trace_id.to_string(),
325 span_id: span_id.to_string(),
326 parent_span_id: None,
327 resource_attributes: Attributes::from(vec![]),
328 scope_name: "scope".to_string(),
329 scope_version: "v1".to_string(),
330 scope_attributes: Attributes::from(vec![]),
331 trace_state: String::new(),
332 span_name: "op".to_string(),
333 span_kind: "SPAN_KIND_SERVER".to_string(),
334 span_status_code: "STATUS_CODE_UNSET".to_string(),
335 span_status_message: String::new(),
336 span_attributes: Attributes::from(vec![]),
337 span_events: SpanEvents::from(vec![]),
338 span_links: SpanLinks::from(vec![]),
339 start_in_nanosecond: 1,
340 end_in_nanosecond: 2,
341 }
342 }
343
344 #[test]
345 fn test_keep_mixed_numeric_values_until_frontend_reconciliation() {
346 let mut writer = TableData::new(4, 2);
347
348 let attrs1 = Attributes::from(vec![make_kv("val", OtlpValue::DoubleValue(1.5))]);
349 let mut row1 = writer.alloc_one_row();
350 write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
351 writer.add_row(row1);
352
353 let attrs2 = Attributes::from(vec![make_kv("val", OtlpValue::IntValue(42))]);
354 let mut row2 = writer.alloc_one_row();
355 write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
356 writer.add_row(row2);
357
358 let (schema, rows) = writer.into_schema_and_rows();
359
360 let col_idx = schema
361 .iter()
362 .position(|c| c.column_name == "attr.val")
363 .unwrap();
364 assert_eq!(schema[col_idx].datatype, ColumnDataType::Float64 as i32);
365
366 assert_eq!(
367 rows[0].values[col_idx].value_data,
368 Some(ValueData::F64Value(1.5))
369 );
370 assert_eq!(
371 rows[1].values[col_idx].value_data,
372 Some(ValueData::I64Value(42))
373 );
374 }
375
376 #[test]
377 fn test_keep_mixed_string_and_int_values_until_frontend_reconciliation() {
378 let mut writer = TableData::new(4, 2);
379
380 let attrs1 = Attributes::from(vec![make_kv("val", OtlpValue::IntValue(10))]);
381 let mut row1 = writer.alloc_one_row();
382 write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
383 writer.add_row(row1);
384
385 let attrs2 = Attributes::from(vec![make_kv(
386 "val",
387 OtlpValue::StringValue("20".to_string()),
388 )]);
389 let mut row2 = writer.alloc_one_row();
390 write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
391 writer.add_row(row2);
392
393 let (schema, rows) = writer.into_schema_and_rows();
394 let col_idx = schema
395 .iter()
396 .position(|c| c.column_name == "attr.val")
397 .unwrap();
398 assert_eq!(schema[col_idx].datatype, ColumnDataType::Int64 as i32);
399 assert_eq!(
400 rows[1].values[col_idx].value_data,
401 Some(ValueData::StringValue("20".to_string()))
402 );
403 }
404
405 #[test]
406 fn test_keep_first_seen_schema_until_frontend_reconciliation() {
407 let mut writer = TableData::new(4, 2);
408
409 let attrs1 = Attributes::from(vec![make_kv(
410 "val",
411 OtlpValue::StringValue("10".to_string()),
412 )]);
413 let mut row1 = writer.alloc_one_row();
414 write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
415 writer.add_row(row1);
416
417 let attrs2 = Attributes::from(vec![make_kv("val", OtlpValue::IntValue(20))]);
418 let mut row2 = writer.alloc_one_row();
419 write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
420 writer.add_row(row2);
421
422 let (schema, rows) = writer.into_schema_and_rows();
423 let col_idx = schema
424 .iter()
425 .position(|c| c.column_name == "attr.val")
426 .unwrap();
427 assert_eq!(schema[col_idx].datatype, ColumnDataType::String as i32);
428 assert_eq!(
429 rows[0].values[col_idx].value_data,
430 Some(ValueData::StringValue("10".to_string()))
431 );
432 assert_eq!(
433 rows[1].values[col_idx].value_data,
434 Some(ValueData::I64Value(20))
435 );
436 }
437
438 #[test]
439 fn test_keep_mixed_string_and_float_values_until_frontend_reconciliation() {
440 let mut writer = TableData::new(4, 2);
441
442 let attrs1 = Attributes::from(vec![make_kv("val", OtlpValue::DoubleValue(1.5))]);
443 let mut row1 = writer.alloc_one_row();
444 write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
445 writer.add_row(row1);
446
447 let attrs2 = Attributes::from(vec![make_kv(
448 "val",
449 OtlpValue::StringValue("1.5".to_string()),
450 )]);
451 let mut row2 = writer.alloc_one_row();
452 write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
453 writer.add_row(row2);
454
455 let (schema, rows) = writer.into_schema_and_rows();
456 let col_idx = schema
457 .iter()
458 .position(|c| c.column_name == "attr.val")
459 .unwrap();
460 assert_eq!(schema[col_idx].datatype, ColumnDataType::Float64 as i32);
461 assert_eq!(
462 rows[1].values[col_idx].value_data,
463 Some(ValueData::StringValue("1.5".to_string()))
464 );
465 }
466
467 #[test]
468 fn test_keep_mixed_string_and_bool_values_until_frontend_reconciliation() {
469 let mut writer = TableData::new(4, 2);
470
471 let attrs1 = Attributes::from(vec![make_kv(
472 "val",
473 OtlpValue::StringValue("true".to_string()),
474 )]);
475 let mut row1 = writer.alloc_one_row();
476 write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
477 writer.add_row(row1);
478
479 let attrs2 = Attributes::from(vec![make_kv("val", OtlpValue::BoolValue(false))]);
480 let mut row2 = writer.alloc_one_row();
481 write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
482 writer.add_row(row2);
483
484 let (schema, rows) = writer.into_schema_and_rows();
485 let col_idx = schema
486 .iter()
487 .position(|c| c.column_name == "attr.val")
488 .unwrap();
489 assert_eq!(schema[col_idx].datatype, ColumnDataType::String as i32);
490 assert_eq!(
491 rows[0].values[col_idx].value_data,
492 Some(ValueData::StringValue("true".to_string()))
493 );
494 assert_eq!(
495 rows[1].values[col_idx].value_data,
496 Some(ValueData::BoolValue(false))
497 );
498 }
499
500 #[test]
501 fn test_keep_mixed_binary_and_string_values_until_frontend_reconciliation() {
502 let mut writer = TableData::new(4, 2);
503
504 let attrs1 = Attributes::from(vec![make_kv(
505 "val",
506 OtlpValue::BytesValue(vec![1_u8, 2, 3]),
507 )]);
508 let mut row1 = writer.alloc_one_row();
509 write_attributes(&mut writer, "attr", attrs1, &mut row1).unwrap();
510 writer.add_row(row1);
511
512 let attrs2 = Attributes::from(vec![make_kv(
513 "val",
514 OtlpValue::StringValue("false".to_string()),
515 )]);
516 let mut row2 = writer.alloc_one_row();
517 write_attributes(&mut writer, "attr", attrs2, &mut row2).unwrap();
518 writer.add_row(row2);
519
520 let (schema, rows) = writer.into_schema_and_rows();
521 let col_idx = schema
522 .iter()
523 .position(|c| c.column_name == "attr.val")
524 .unwrap();
525 assert_eq!(schema[col_idx].datatype, ColumnDataType::Binary as i32);
526 assert_eq!(
527 rows[0].values[col_idx].value_data,
528 Some(ValueData::BinaryValue(vec![1_u8, 2, 3]))
529 );
530 assert_eq!(
531 rows[1].values[col_idx].value_data,
532 Some(ValueData::StringValue("false".to_string()))
533 );
534 }
535
536 #[test]
537 fn test_build_aux_table_requests_deduplicates_services_and_operations() {
538 let spans = vec![
539 make_span("svc-a", "trace-a", "span-a"),
540 make_span("svc-a", "trace-b", "span-b"),
541 ];
542 let mut aux_data = TraceAuxData::default();
543 for span in &spans {
544 aux_data.observe_span(span);
545 }
546
547 let (requests, total_rows) =
548 build_aux_table_requests(aux_data, "opentelemetry_traces").unwrap();
549 assert_eq!(requests.inserts.len(), 2);
550 assert_eq!(total_rows, 2);
551 }
552 }