1use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests};
19use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
20use common_grpc::precision::Precision;
21use pipeline::{GreptimePipelineParams, PipelineWay};
22use session::context::QueryContextRef;
23
24use crate::error::Result;
25use crate::otlp::trace::span::TraceSpan;
26use crate::otlp::trace::{
27 DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
28 SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
29 SPAN_STATUS_MESSAGE_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_STATE_COLUMN,
30 TraceAuxData,
31};
32use crate::otlp::utils::{make_column_data, make_string_column_data};
33use crate::query_handler::PipelineHandlerRef;
34use crate::row_writer::{self, MultiTableData, TableData};
35
36const APPROXIMATE_COLUMN_COUNT: usize = 24;
37
38const MAX_TIMESTAMP: i64 = 4102444800000000000;
40
41pub fn v0_to_grpc_main_insert_requests(
46 spans: &[TraceSpan],
47 _pipeline: &PipelineWay,
48 _pipeline_params: &GreptimePipelineParams,
49 table_name: &str,
50 _query_ctx: &QueryContextRef,
51 _pipeline_handler: PipelineHandlerRef,
52) -> Result<(RowInsertRequests, usize)> {
53 let mut multi_table_writer = MultiTableData::default();
54 let trace_writer = build_trace_table_data(spans)?;
55 multi_table_writer.add_table_data(table_name, trace_writer);
56
57 Ok(multi_table_writer.into_row_insert_requests())
58}
59
60pub fn build_trace_table_data(spans: &[TraceSpan]) -> Result<TableData> {
62 let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
63 for span in spans.iter().cloned() {
64 write_span_to_row(&mut trace_writer, span)?;
65 }
66
67 Ok(trace_writer)
68}
69
70pub fn build_aux_table_requests(
72 aux_data: TraceAuxData,
73 table_name: &str,
74) -> Result<(RowInsertRequests, usize)> {
75 let mut multi_table_writer = MultiTableData::default();
76 let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
77 let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
78
79 write_trace_services_to_row(&mut trace_services_writer, aux_data.services)?;
80 write_trace_operations_to_row(&mut trace_operations_writer, aux_data.operations)?;
81
82 multi_table_writer.add_table_data(trace_services_table_name(table_name), trace_services_writer);
83 multi_table_writer.add_table_data(
84 trace_operations_table_name(table_name),
85 trace_operations_writer,
86 );
87 Ok(multi_table_writer.into_row_insert_requests())
88}
89
90pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
91 let mut row = writer.alloc_one_row();
92
93 row_writer::write_ts_to_nanos(
95 writer,
96 TIMESTAMP_COLUMN,
97 Some(span.start_in_nanosecond as i64),
98 Precision::Nanosecond,
99 &mut row,
100 )?;
101
102 let fields = vec![
104 make_column_data(
105 "timestamp_end",
106 ColumnDataType::TimestampNanosecond,
107 Some(ValueData::TimestampNanosecondValue(
108 span.end_in_nanosecond as i64,
109 )),
110 ),
111 make_column_data(
112 DURATION_NANO_COLUMN,
113 ColumnDataType::Uint64,
114 Some(ValueData::U64Value(
115 span.end_in_nanosecond - span.start_in_nanosecond,
116 )),
117 ),
118 make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
119 make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
120 make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
121 make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
122 make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
123 make_string_column_data(SPAN_STATUS_CODE, Some(span.span_status_code)),
124 make_string_column_data(SPAN_STATUS_MESSAGE_COLUMN, Some(span.span_status_message)),
125 make_string_column_data(TRACE_STATE_COLUMN, Some(span.trace_state)),
126 ];
127 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
128
129 if let Some(service_name) = span.service_name {
130 row_writer::write_tag(writer, SERVICE_NAME_COLUMN, service_name, &mut row)?;
131 }
132
133 row_writer::write_json(
134 writer,
135 SPAN_ATTRIBUTES_COLUMN,
136 span.span_attributes.into(),
137 &mut row,
138 )?;
139 row_writer::write_json(
140 writer,
141 SPAN_EVENTS_COLUMN,
142 span.span_events.into(),
143 &mut row,
144 )?;
145 row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
146
147 let fields = vec![
149 make_string_column_data("scope_name", Some(span.scope_name)),
150 make_string_column_data("scope_version", Some(span.scope_version)),
151 ];
152 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
153
154 row_writer::write_json(
155 writer,
156 "scope_attributes",
157 span.scope_attributes.into(),
158 &mut row,
159 )?;
160
161 row_writer::write_json(
162 writer,
163 "resource_attributes",
164 span.resource_attributes.into(),
165 &mut row,
166 )?;
167
168 writer.add_row(row);
169
170 Ok(())
171}
172
173fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
174 for service_name in services {
175 let mut row = writer.alloc_one_row();
176 row_writer::write_ts_to_nanos(
178 writer,
179 TIMESTAMP_COLUMN,
180 Some(MAX_TIMESTAMP),
181 Precision::Nanosecond,
182 &mut row,
183 )?;
184
185 row_writer::write_fields(
187 writer,
188 std::iter::once(make_string_column_data(
189 SERVICE_NAME_COLUMN,
190 Some(service_name),
191 )),
192 &mut row,
193 )?;
194 writer.add_row(row);
195 }
196
197 Ok(())
198}
199
200fn write_trace_operations_to_row(
201 writer: &mut TableData,
202 operations: HashSet<(String, String, String)>,
203) -> Result<()> {
204 for (service_name, span_name, span_kind) in operations {
205 let mut row = writer.alloc_one_row();
206 row_writer::write_ts_to_nanos(
208 writer,
209 TIMESTAMP_COLUMN,
210 Some(MAX_TIMESTAMP),
211 Precision::Nanosecond,
212 &mut row,
213 )?;
214
215 row_writer::write_fields(
217 writer,
218 vec![
219 make_string_column_data(SERVICE_NAME_COLUMN, Some(service_name)),
220 make_string_column_data(SPAN_NAME_COLUMN, Some(span_name)),
221 make_string_column_data(SPAN_KIND_COLUMN, Some(span_kind)),
222 ]
223 .into_iter(),
224 &mut row,
225 )?;
226 writer.add_row(row);
227 }
228
229 Ok(())
230}
231
232#[cfg(test)]
233mod tests {
234 use super::{build_aux_table_requests, build_trace_table_data};
235 use crate::otlp::trace::TraceAuxData;
236 use crate::otlp::trace::attributes::Attributes;
237 use crate::otlp::trace::span::{SpanEvents, SpanLinks, TraceSpan};
238
239 fn make_span(service_name: &str, trace_id: &str, span_id: &str) -> TraceSpan {
240 TraceSpan {
241 service_name: Some(service_name.to_string()),
242 trace_id: trace_id.to_string(),
243 span_id: span_id.to_string(),
244 parent_span_id: None,
245 resource_attributes: Attributes::from(vec![]),
246 scope_name: "scope".to_string(),
247 scope_version: "v1".to_string(),
248 scope_attributes: Attributes::from(vec![]),
249 trace_state: String::new(),
250 span_name: "op".to_string(),
251 span_kind: "SPAN_KIND_SERVER".to_string(),
252 span_status_code: "STATUS_CODE_UNSET".to_string(),
253 span_status_message: String::new(),
254 span_attributes: Attributes::from(vec![]),
255 span_events: SpanEvents::from(vec![]),
256 span_links: SpanLinks::from(vec![]),
257 start_in_nanosecond: 1,
258 end_in_nanosecond: 2,
259 }
260 }
261
262 #[test]
263 fn test_build_trace_table_data_from_span_subset() {
264 let spans = [
265 make_span("svc-a", "trace-a", "span-a"),
266 make_span("svc-b", "trace-b", "span-b"),
267 ];
268
269 let writer = build_trace_table_data(&spans[..1]).unwrap();
270 let (_, rows) = writer.into_schema_and_rows();
271 assert_eq!(rows.len(), 1);
272 }
273
274 #[test]
275 fn test_build_aux_table_requests_deduplicates_services_and_operations() {
276 let spans = vec![
277 make_span("svc-a", "trace-a", "span-a"),
278 make_span("svc-a", "trace-b", "span-b"),
279 ];
280 let mut aux_data = TraceAuxData::default();
281 for span in &spans {
282 aux_data.observe_span(span);
283 }
284
285 let (requests, total_rows) =
286 build_aux_table_requests(aux_data, "opentelemetry_traces").unwrap();
287 assert_eq!(requests.inserts.len(), 2);
288 assert_eq!(total_rows, 2);
289 }
290}