1use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests};
19use common_catalog::consts::trace_services_table_name;
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use pipeline::{GreptimePipelineParams, PipelineWay};
23use session::context::QueryContextRef;
24
25use crate::error::Result;
26use crate::otlp::trace::span::{parse, TraceSpan};
27use crate::otlp::trace::{
28 DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
29 SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
30 TRACE_ID_COLUMN,
31};
32use crate::otlp::utils::{make_column_data, make_string_column_data};
33use crate::query_handler::PipelineHandlerRef;
34use crate::row_writer::{self, MultiTableData, TableData};
35
36const APPROXIMATE_COLUMN_COUNT: usize = 24;
37
38pub fn v0_to_grpc_insert_requests(
41 request: ExportTraceServiceRequest,
42 _pipeline: PipelineWay,
43 _pipeline_params: GreptimePipelineParams,
44 table_name: String,
45 _query_ctx: &QueryContextRef,
46 _pipeline_handler: PipelineHandlerRef,
47) -> Result<(RowInsertRequests, usize)> {
48 let spans = parse(request);
49 let mut multi_table_writer = MultiTableData::default();
50 let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
51 let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
52
53 let mut services = HashSet::new();
54 for span in spans {
55 if let Some(service_name) = &span.service_name {
56 if !services.contains(service_name) {
58 services.insert(service_name.clone());
59 }
60 }
61 write_span_to_row(&mut trace_writer, span)?;
62 }
63 write_trace_services_to_row(&mut trace_services_writer, services)?;
64
65 multi_table_writer.add_table_data(
66 trace_services_table_name(&table_name),
67 trace_services_writer,
68 );
69 multi_table_writer.add_table_data(table_name, trace_writer);
70
71 Ok(multi_table_writer.into_row_insert_requests())
72}
73
74pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
75 let mut row = writer.alloc_one_row();
76
77 row_writer::write_ts_to_nanos(
79 writer,
80 TIMESTAMP_COLUMN,
81 Some(span.start_in_nanosecond as i64),
82 Precision::Nanosecond,
83 &mut row,
84 )?;
85
86 let fields = vec![
88 make_column_data(
89 "timestamp_end",
90 ColumnDataType::TimestampNanosecond,
91 Some(ValueData::TimestampNanosecondValue(
92 span.end_in_nanosecond as i64,
93 )),
94 ),
95 make_column_data(
96 DURATION_NANO_COLUMN,
97 ColumnDataType::Uint64,
98 Some(ValueData::U64Value(
99 span.end_in_nanosecond - span.start_in_nanosecond,
100 )),
101 ),
102 make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
103 make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
104 make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
105 make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
106 make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
107 make_string_column_data("span_status_code", Some(span.span_status_code)),
108 make_string_column_data("span_status_message", Some(span.span_status_message)),
109 make_string_column_data("trace_state", Some(span.trace_state)),
110 ];
111 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
112
113 if let Some(service_name) = span.service_name {
114 row_writer::write_tag(writer, SERVICE_NAME_COLUMN, service_name, &mut row)?;
115 }
116
117 row_writer::write_json(
118 writer,
119 SPAN_ATTRIBUTES_COLUMN,
120 span.span_attributes.into(),
121 &mut row,
122 )?;
123 row_writer::write_json(
124 writer,
125 SPAN_EVENTS_COLUMN,
126 span.span_events.into(),
127 &mut row,
128 )?;
129 row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
130
131 let fields = vec![
133 make_string_column_data("scope_name", Some(span.scope_name)),
134 make_string_column_data("scope_version", Some(span.scope_version)),
135 ];
136 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
137
138 row_writer::write_json(
139 writer,
140 "scope_attributes",
141 span.scope_attributes.into(),
142 &mut row,
143 )?;
144
145 row_writer::write_json(
146 writer,
147 "resource_attributes",
148 span.resource_attributes.into(),
149 &mut row,
150 )?;
151
152 writer.add_row(row);
153
154 Ok(())
155}
156
157fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
158 for service_name in services {
159 let mut row = writer.alloc_one_row();
160 row_writer::write_ts_to_nanos(
162 writer,
163 TIMESTAMP_COLUMN,
164 Some(4102444800000000000), Precision::Nanosecond,
166 &mut row,
167 )?;
168
169 row_writer::write_fields(
171 writer,
172 std::iter::once(make_string_column_data(
173 SERVICE_NAME_COLUMN,
174 Some(service_name),
175 )),
176 &mut row,
177 )?;
178 writer.add_row(row);
179 }
180
181 Ok(())
182}