1use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests};
19use common_catalog::consts::trace_services_table_name;
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use pipeline::{GreptimePipelineParams, PipelineWay};
23use session::context::QueryContextRef;
24
25use crate::error::Result;
26use crate::otlp::trace::span::{parse, TraceSpan};
27use crate::otlp::trace::{
28 DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
29 SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
30 TRACE_ID_COLUMN,
31};
32use crate::otlp::utils::{make_column_data, make_string_column_data};
33use crate::query_handler::PipelineHandlerRef;
34use crate::row_writer::{self, MultiTableData, TableData};
35
36const APPROXIMATE_COLUMN_COUNT: usize = 24;
37
38pub fn v0_to_grpc_insert_requests(
41 request: ExportTraceServiceRequest,
42 _pipeline: PipelineWay,
43 _pipeline_params: GreptimePipelineParams,
44 table_name: String,
45 _query_ctx: &QueryContextRef,
46 _pipeline_handler: PipelineHandlerRef,
47) -> Result<(RowInsertRequests, usize)> {
48 let spans = parse(request);
49 let mut multi_table_writer = MultiTableData::default();
50 let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
51 let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
52
53 let mut services = HashSet::new();
54 for span in spans {
55 if let Some(service_name) = &span.service_name {
56 if !services.contains(service_name) {
58 services.insert(service_name.clone());
59 }
60 }
61 write_span_to_row(&mut trace_writer, span)?;
62 }
63 write_trace_services_to_row(&mut trace_services_writer, services)?;
64
65 multi_table_writer.add_table_data(
66 trace_services_table_name(&table_name),
67 trace_services_writer,
68 );
69 multi_table_writer.add_table_data(table_name, trace_writer);
70
71 Ok(multi_table_writer.into_row_insert_requests())
72}
73
74pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
75 let mut row = writer.alloc_one_row();
76
77 row_writer::write_ts_to_nanos(
79 writer,
80 TIMESTAMP_COLUMN,
81 Some(span.start_in_nanosecond as i64),
82 Precision::Nanosecond,
83 &mut row,
84 )?;
85 let fields = vec![
87 make_column_data(
88 "timestamp_end",
89 ColumnDataType::TimestampNanosecond,
90 ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64),
91 ),
92 make_column_data(
93 DURATION_NANO_COLUMN,
94 ColumnDataType::Uint64,
95 ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond),
96 ),
97 ];
98 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
99
100 if let Some(service_name) = span.service_name {
101 row_writer::write_tag(writer, SERVICE_NAME_COLUMN, service_name, &mut row)?;
102 }
103
104 let fields = vec![
106 make_string_column_data(TRACE_ID_COLUMN, span.trace_id),
107 make_string_column_data(SPAN_ID_COLUMN, span.span_id),
108 make_string_column_data(
109 PARENT_SPAN_ID_COLUMN,
110 span.parent_span_id.unwrap_or_default(),
111 ),
112 make_string_column_data(SPAN_KIND_COLUMN, span.span_kind),
113 make_string_column_data(SPAN_NAME_COLUMN, span.span_name),
114 make_string_column_data("span_status_code", span.span_status_code),
115 make_string_column_data("span_status_message", span.span_status_message),
116 make_string_column_data("trace_state", span.trace_state),
117 ];
118 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
119
120 row_writer::write_json(
121 writer,
122 SPAN_ATTRIBUTES_COLUMN,
123 span.span_attributes.into(),
124 &mut row,
125 )?;
126 row_writer::write_json(
127 writer,
128 SPAN_EVENTS_COLUMN,
129 span.span_events.into(),
130 &mut row,
131 )?;
132 row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
133
134 let fields = vec![
136 make_string_column_data("scope_name", span.scope_name),
137 make_string_column_data("scope_version", span.scope_version),
138 ];
139 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
140
141 row_writer::write_json(
142 writer,
143 "scope_attributes",
144 span.scope_attributes.into(),
145 &mut row,
146 )?;
147
148 row_writer::write_json(
149 writer,
150 "resource_attributes",
151 span.resource_attributes.into(),
152 &mut row,
153 )?;
154
155 writer.add_row(row);
156
157 Ok(())
158}
159
160fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
161 for service_name in services {
162 let mut row = writer.alloc_one_row();
163 row_writer::write_ts_to_nanos(
165 writer,
166 TIMESTAMP_COLUMN,
167 Some(4102444800000000000), Precision::Nanosecond,
169 &mut row,
170 )?;
171
172 row_writer::write_fields(
174 writer,
175 std::iter::once(make_string_column_data(SERVICE_NAME_COLUMN, service_name)),
176 &mut row,
177 )?;
178 writer.add_row(row);
179 }
180
181 Ok(())
182}