1use std::fmt::Display;
16
17use common_time::timestamp::Timestamp;
18use itertools::Itertools;
19use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
20use opentelemetry_proto::tonic::common::v1::{InstrumentationScope, KeyValue, any_value};
21use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
22use opentelemetry_proto::tonic::trace::v1::{Span, Status};
23use serde::Serialize;
24
25use crate::otlp::trace::KEY_SERVICE_NAME;
26use crate::otlp::trace::attributes::Attributes;
27use crate::otlp::utils::bytes_to_hex_string;
28
29#[derive(Debug, Clone)]
30pub struct TraceSpan {
31 pub service_name: Option<String>,
33 pub trace_id: String,
34 pub span_id: String,
35 pub parent_span_id: Option<String>,
36
37 pub resource_attributes: Attributes,
39 pub scope_name: String,
40 pub scope_version: String,
41 pub scope_attributes: Attributes,
42 pub trace_state: String,
43 pub span_name: String,
44 pub span_kind: String,
45 pub span_status_code: String,
46 pub span_status_message: String,
47 pub span_attributes: Attributes,
48 pub span_events: SpanEvents, pub span_links: SpanLinks, pub start_in_nanosecond: u64, pub end_in_nanosecond: u64,
52}
53
54pub type TraceSpans = Vec<TraceSpan>;
55
56#[derive(Debug, Clone)]
57pub struct TraceSpanGroup {
58 pub service_name: Option<String>,
59 pub resource_attributes: Attributes,
60 pub scope_name: String,
61 pub scope_version: String,
62 pub scope_attributes: Attributes,
63 pub spans: TraceSpans,
64}
65
66pub type TraceSpanGroups = Vec<TraceSpanGroup>;
67
68#[derive(Debug, Clone, Serialize)]
69pub struct SpanLink {
70 pub trace_id: String,
71 pub span_id: String,
72 pub trace_state: String,
73 pub attributes: Attributes, }
75
76impl From<Link> for SpanLink {
77 fn from(link: Link) -> Self {
78 Self {
79 trace_id: bytes_to_hex_string(&link.trace_id),
80 span_id: bytes_to_hex_string(&link.span_id),
81 trace_state: link.trace_state,
82 attributes: Attributes::from(link.attributes),
83 }
84 }
85}
86
87impl From<SpanLink> for jsonb::Value<'static> {
88 fn from(value: SpanLink) -> jsonb::Value<'static> {
89 jsonb::Value::Object(
90 vec![
91 (
92 "trace_id".to_string(),
93 jsonb::Value::String(value.trace_id.into()),
94 ),
95 (
96 "span_id".to_string(),
97 jsonb::Value::String(value.span_id.into()),
98 ),
99 (
100 "trace_state".to_string(),
101 jsonb::Value::String(value.trace_state.into()),
102 ),
103 ("attributes".to_string(), value.attributes.into()),
104 ]
105 .into_iter()
106 .collect(),
107 )
108 }
109}
110
111#[derive(Debug, Clone, Serialize)]
112pub struct SpanLinks(Vec<SpanLink>);
113
114impl From<Vec<Link>> for SpanLinks {
115 fn from(value: Vec<Link>) -> Self {
116 let links = value.into_iter().map(SpanLink::from).collect_vec();
117 Self(links)
118 }
119}
120
121impl From<SpanLinks> for jsonb::Value<'static> {
122 fn from(value: SpanLinks) -> jsonb::Value<'static> {
123 jsonb::Value::Array(value.0.into_iter().map(Into::into).collect())
124 }
125}
126
127impl Display for SpanLinks {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 write!(f, "{}", serde_json::to_string(self).unwrap_or_default())
130 }
131}
132
133impl SpanLinks {
134 pub fn get_ref(&self) -> &Vec<SpanLink> {
135 &self.0
136 }
137
138 pub fn get_mut(&mut self) -> &mut Vec<SpanLink> {
139 &mut self.0
140 }
141}
142
143#[derive(Debug, Clone, Serialize)]
144pub struct SpanEvent {
145 pub name: String,
146 pub time: String,
147 pub attributes: Attributes, }
149
150impl From<Event> for SpanEvent {
151 fn from(event: Event) -> Self {
152 Self {
153 name: event.name,
154 time: Timestamp::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
155 attributes: Attributes::from(event.attributes),
156 }
157 }
158}
159
160impl From<SpanEvent> for jsonb::Value<'static> {
161 fn from(value: SpanEvent) -> jsonb::Value<'static> {
162 jsonb::Value::Object(
163 vec![
164 ("name".to_string(), jsonb::Value::String(value.name.into())),
165 ("time".to_string(), jsonb::Value::String(value.time.into())),
166 ("attributes".to_string(), value.attributes.into()),
167 ]
168 .into_iter()
169 .collect(),
170 )
171 }
172}
173
174#[derive(Debug, Clone, Serialize)]
175pub struct SpanEvents(Vec<SpanEvent>);
176
177impl From<Vec<Event>> for SpanEvents {
178 fn from(value: Vec<Event>) -> Self {
179 let events = value.into_iter().map(SpanEvent::from).collect_vec();
180 Self(events)
181 }
182}
183
184impl From<SpanEvents> for jsonb::Value<'static> {
185 fn from(value: SpanEvents) -> jsonb::Value<'static> {
186 jsonb::Value::Array(value.0.into_iter().map(Into::into).collect())
187 }
188}
189
190impl Display for SpanEvents {
191 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192 write!(f, "{}", serde_json::to_string(self).unwrap_or_default())
193 }
194}
195
196impl SpanEvents {
197 pub fn get_ref(&self) -> &Vec<SpanEvent> {
198 &self.0
199 }
200
201 pub fn get_mut(&mut self) -> &mut Vec<SpanEvent> {
202 &mut self.0
203 }
204}
205
206pub fn parse_span(
207 service_name: Option<String>,
208 resource_attrs: &[KeyValue],
209 scope: &InstrumentationScope,
210 span: Span,
211) -> TraceSpan {
212 let (span_status_code, span_status_message) = status_to_string(&span.status);
213 let span_kind = span.kind().as_str_name().into();
214 TraceSpan {
215 service_name,
216 trace_id: bytes_to_hex_string(&span.trace_id),
217 span_id: bytes_to_hex_string(&span.span_id),
218 parent_span_id: if span.parent_span_id.is_empty() {
219 None
220 } else {
221 Some(bytes_to_hex_string(&span.parent_span_id))
222 },
223
224 resource_attributes: Attributes::from(resource_attrs),
225 trace_state: span.trace_state,
226
227 scope_name: scope.name.clone(),
228 scope_version: scope.version.clone(),
229 scope_attributes: Attributes::from(scope.attributes.clone()),
230
231 span_name: span.name,
232 span_kind,
233 span_status_code,
234 span_status_message,
235 span_attributes: Attributes::from(span.attributes),
236 span_events: SpanEvents::from(span.events),
237 span_links: SpanLinks::from(span.links),
238
239 start_in_nanosecond: span.start_time_unix_nano,
240 end_in_nanosecond: span.end_time_unix_nano,
241 }
242}
243
244pub fn status_to_string(status: &Option<Status>) -> (String, String) {
245 match status {
246 Some(status) => (status.code().as_str_name().into(), status.message.clone()),
247 None => ("".into(), "".into()),
248 }
249}
250
251pub fn parse(request: ExportTraceServiceRequest) -> TraceSpanGroups {
257 let group_size = request
258 .resource_spans
259 .iter()
260 .flat_map(|res| res.scope_spans.iter())
261 .count();
262 let mut groups = Vec::with_capacity(group_size);
263 for resource_spans in request.resource_spans {
264 let resource_attrs = resource_spans
265 .resource
266 .map(|r| r.attributes)
267 .unwrap_or_default();
268 let service_name = resource_attrs
269 .iter()
270 .find_or_first(|kv| kv.key == KEY_SERVICE_NAME)
271 .and_then(|kv| kv.value.clone())
272 .and_then(|v| match v.value {
273 Some(any_value::Value::StringValue(s)) => Some(s),
274 Some(any_value::Value::BytesValue(b)) => {
275 Some(String::from_utf8_lossy(&b).to_string())
276 }
277 _ => None,
278 });
279
280 for scope_spans in resource_spans.scope_spans {
281 let scope = scope_spans.scope.unwrap_or_default();
282 let mut spans = Vec::with_capacity(scope_spans.spans.len());
283 for span in scope_spans.spans {
284 spans.push(parse_span(
285 service_name.clone(),
286 &resource_attrs,
287 &scope,
288 span,
289 ));
290 }
291 groups.push(TraceSpanGroup {
292 service_name: service_name.clone(),
293 resource_attributes: Attributes::from(&resource_attrs[..]),
294 scope_name: scope.name,
295 scope_version: scope.version,
296 scope_attributes: Attributes::from(scope.attributes),
297 spans,
298 });
299 }
300 }
301 groups
302}
303
304#[cfg(test)]
305mod tests {
306 use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
307 use opentelemetry_proto::tonic::common::v1::{
308 AnyValue, InstrumentationScope, KeyValue, any_value,
309 };
310 use opentelemetry_proto::tonic::resource::v1::Resource;
311 use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status};
312
313 use crate::otlp::trace::KEY_SERVICE_NAME;
314 use crate::otlp::trace::span::{bytes_to_hex_string, parse, status_to_string};
315
316 fn make_kv(key: &str, value: &str) -> KeyValue {
317 KeyValue {
318 key: key.to_string(),
319 value: Some(AnyValue {
320 value: Some(any_value::Value::StringValue(value.to_string())),
321 }),
322 }
323 }
324
325 fn make_span(trace_id: u8, span_id: u8) -> Span {
326 Span {
327 trace_id: vec![trace_id; 16],
328 span_id: vec![span_id; 8],
329 ..Default::default()
330 }
331 }
332
333 #[test]
334 fn test_bytes_to_hex_string() {
335 assert_eq!(
336 "24fe79948641b110a29bc27859307e8d",
337 bytes_to_hex_string(&[
338 36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141,
339 ])
340 );
341
342 assert_eq!(
343 "baffeedd7b8debc0",
344 bytes_to_hex_string(&[186, 255, 238, 221, 123, 141, 235, 192,])
345 );
346 }
347
348 #[test]
349 fn test_status_to_string() {
350 let message = String::from("status message");
351 let status = Status {
352 code: 1,
353 message: message.clone(),
354 };
355
356 assert_eq!(
357 ("STATUS_CODE_OK".into(), message),
358 status_to_string(&Some(status)),
359 );
360 }
361
362 #[test]
363 fn test_parse_preserves_resource_scope_groups() {
364 let request = ExportTraceServiceRequest {
365 resource_spans: vec![
366 ResourceSpans {
367 resource: Some(Resource {
368 attributes: vec![make_kv(KEY_SERVICE_NAME, "svc-a")],
369 ..Default::default()
370 }),
371 scope_spans: vec![
372 ScopeSpans {
373 scope: Some(InstrumentationScope {
374 name: "scope-1".to_string(),
375 ..Default::default()
376 }),
377 spans: vec![make_span(0x11, 0x21), make_span(0x12, 0x22)],
378 ..Default::default()
379 },
380 ScopeSpans {
381 scope: Some(InstrumentationScope {
382 name: "scope-2".to_string(),
383 ..Default::default()
384 }),
385 spans: vec![make_span(0x13, 0x23)],
386 ..Default::default()
387 },
388 ],
389 ..Default::default()
390 },
391 ResourceSpans {
392 resource: Some(Resource {
393 attributes: vec![make_kv(KEY_SERVICE_NAME, "svc-b")],
394 ..Default::default()
395 }),
396 scope_spans: vec![ScopeSpans {
397 scope: Some(InstrumentationScope {
398 name: "scope-3".to_string(),
399 ..Default::default()
400 }),
401 spans: vec![make_span(0x14, 0x24)],
402 ..Default::default()
403 }],
404 ..Default::default()
405 },
406 ],
407 };
408
409 let groups = parse(request);
410 assert_eq!(groups.len(), 3);
411 assert_eq!(groups[0].service_name.as_deref(), Some("svc-a"));
412 assert_eq!(groups[0].scope_name, "scope-1");
413 assert_eq!(groups[0].spans.len(), 2);
414 assert_eq!(groups[1].scope_name, "scope-2");
415 assert_eq!(groups[1].spans.len(), 1);
416 assert_eq!(groups[2].service_name.as_deref(), Some("svc-b"));
417 assert_eq!(groups[2].scope_name, "scope-3");
418 }
419}