1use std::collections::HashSet;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, RowInsertRequests};
19use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
20use common_grpc::precision::Precision;
21use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
22use pipeline::{GreptimePipelineParams, PipelineWay};
23use session::context::QueryContextRef;
24
25use crate::error::Result;
26use crate::otlp::trace::span::{TraceSpan, parse};
27use crate::otlp::trace::{
28 DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
29 SPAN_EVENTS_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN,
30 TRACE_ID_COLUMN,
31};
32use crate::otlp::utils::{make_column_data, make_string_column_data};
33use crate::query_handler::PipelineHandlerRef;
34use crate::row_writer::{self, MultiTableData, TableData};
35
36const APPROXIMATE_COLUMN_COUNT: usize = 24;
37
38const MAX_TIMESTAMP: i64 = 4102444800000000000;
40
41pub fn v0_to_grpc_insert_requests(
44 request: ExportTraceServiceRequest,
45 _pipeline: PipelineWay,
46 _pipeline_params: GreptimePipelineParams,
47 table_name: String,
48 _query_ctx: &QueryContextRef,
49 _pipeline_handler: PipelineHandlerRef,
50) -> Result<(RowInsertRequests, usize)> {
51 let spans = parse(request);
52 let mut multi_table_writer = MultiTableData::default();
53 let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
54 let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
55 let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
56
57 let mut services = HashSet::new();
58 let mut operations = HashSet::new();
59 for span in spans {
60 if let Some(service_name) = &span.service_name {
61 if !services.contains(service_name) {
63 services.insert(service_name.clone());
64 }
65
66 let operation = (
68 service_name.clone(),
69 span.span_name.clone(),
70 span.span_kind.clone(),
71 );
72 if !operations.contains(&operation) {
73 operations.insert(operation);
74 }
75 }
76 write_span_to_row(&mut trace_writer, span)?;
77 }
78 write_trace_services_to_row(&mut trace_services_writer, services)?;
79 write_trace_operations_to_row(&mut trace_operations_writer, operations)?;
80
81 multi_table_writer.add_table_data(
82 trace_services_table_name(&table_name),
83 trace_services_writer,
84 );
85 multi_table_writer.add_table_data(
86 trace_operations_table_name(&table_name),
87 trace_operations_writer,
88 );
89 multi_table_writer.add_table_data(table_name, trace_writer);
90
91 Ok(multi_table_writer.into_row_insert_requests())
92}
93
94pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
95 let mut row = writer.alloc_one_row();
96
97 row_writer::write_ts_to_nanos(
99 writer,
100 TIMESTAMP_COLUMN,
101 Some(span.start_in_nanosecond as i64),
102 Precision::Nanosecond,
103 &mut row,
104 )?;
105
106 let fields = vec![
108 make_column_data(
109 "timestamp_end",
110 ColumnDataType::TimestampNanosecond,
111 Some(ValueData::TimestampNanosecondValue(
112 span.end_in_nanosecond as i64,
113 )),
114 ),
115 make_column_data(
116 DURATION_NANO_COLUMN,
117 ColumnDataType::Uint64,
118 Some(ValueData::U64Value(
119 span.end_in_nanosecond - span.start_in_nanosecond,
120 )),
121 ),
122 make_string_column_data(TRACE_ID_COLUMN, Some(span.trace_id)),
123 make_string_column_data(SPAN_ID_COLUMN, Some(span.span_id)),
124 make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
125 make_string_column_data(SPAN_KIND_COLUMN, Some(span.span_kind)),
126 make_string_column_data(SPAN_NAME_COLUMN, Some(span.span_name)),
127 make_string_column_data("span_status_code", Some(span.span_status_code)),
128 make_string_column_data("span_status_message", Some(span.span_status_message)),
129 make_string_column_data("trace_state", Some(span.trace_state)),
130 ];
131 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
132
133 if let Some(service_name) = span.service_name {
134 row_writer::write_tag(writer, SERVICE_NAME_COLUMN, service_name, &mut row)?;
135 }
136
137 row_writer::write_json(
138 writer,
139 SPAN_ATTRIBUTES_COLUMN,
140 span.span_attributes.into(),
141 &mut row,
142 )?;
143 row_writer::write_json(
144 writer,
145 SPAN_EVENTS_COLUMN,
146 span.span_events.into(),
147 &mut row,
148 )?;
149 row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
150
151 let fields = vec![
153 make_string_column_data("scope_name", Some(span.scope_name)),
154 make_string_column_data("scope_version", Some(span.scope_version)),
155 ];
156 row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
157
158 row_writer::write_json(
159 writer,
160 "scope_attributes",
161 span.scope_attributes.into(),
162 &mut row,
163 )?;
164
165 row_writer::write_json(
166 writer,
167 "resource_attributes",
168 span.resource_attributes.into(),
169 &mut row,
170 )?;
171
172 writer.add_row(row);
173
174 Ok(())
175}
176
177fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
178 for service_name in services {
179 let mut row = writer.alloc_one_row();
180 row_writer::write_ts_to_nanos(
182 writer,
183 TIMESTAMP_COLUMN,
184 Some(MAX_TIMESTAMP),
185 Precision::Nanosecond,
186 &mut row,
187 )?;
188
189 row_writer::write_fields(
191 writer,
192 std::iter::once(make_string_column_data(
193 SERVICE_NAME_COLUMN,
194 Some(service_name),
195 )),
196 &mut row,
197 )?;
198 writer.add_row(row);
199 }
200
201 Ok(())
202}
203
204fn write_trace_operations_to_row(
205 writer: &mut TableData,
206 operations: HashSet<(String, String, String)>,
207) -> Result<()> {
208 for (service_name, span_name, span_kind) in operations {
209 let mut row = writer.alloc_one_row();
210 row_writer::write_ts_to_nanos(
212 writer,
213 TIMESTAMP_COLUMN,
214 Some(MAX_TIMESTAMP),
215 Precision::Nanosecond,
216 &mut row,
217 )?;
218
219 row_writer::write_fields(
221 writer,
222 vec![
223 make_string_column_data(SERVICE_NAME_COLUMN, Some(service_name)),
224 make_string_column_data(SPAN_NAME_COLUMN, Some(span_name)),
225 make_string_column_data(SPAN_KIND_COLUMN, Some(span_kind)),
226 ]
227 .into_iter(),
228 &mut row,
229 )?;
230 writer.add_row(row);
231 }
232
233 Ok(())
234}