1use std::fmt::Display;
16
17use common_time::timestamp::Timestamp;
18use itertools::Itertools;
19use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
20use opentelemetry_proto::tonic::common::v1::{any_value, InstrumentationScope, KeyValue};
21use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
22use opentelemetry_proto::tonic::trace::v1::{Span, Status};
23use serde::Serialize;
24
25use crate::otlp::trace::attributes::Attributes;
26use crate::otlp::trace::KEY_SERVICE_NAME;
27use crate::otlp::utils::bytes_to_hex_string;
28
29#[derive(Debug, Clone)]
30pub struct TraceSpan {
31 pub service_name: Option<String>,
33 pub trace_id: String,
34 pub span_id: String,
35 pub parent_span_id: Option<String>,
36
37 pub resource_attributes: Attributes,
39 pub scope_name: String,
40 pub scope_version: String,
41 pub scope_attributes: Attributes,
42 pub trace_state: String,
43 pub span_name: String,
44 pub span_kind: String,
45 pub span_status_code: String,
46 pub span_status_message: String,
47 pub span_attributes: Attributes,
48 pub span_events: SpanEvents, pub span_links: SpanLinks, pub start_in_nanosecond: u64, pub end_in_nanosecond: u64,
52}
53
54pub type TraceSpans = Vec<TraceSpan>;
55
56#[derive(Debug, Clone, Serialize)]
57pub struct SpanLink {
58 pub trace_id: String,
59 pub span_id: String,
60 pub trace_state: String,
61 pub attributes: Attributes, }
63
64impl From<Link> for SpanLink {
65 fn from(link: Link) -> Self {
66 Self {
67 trace_id: bytes_to_hex_string(&link.trace_id),
68 span_id: bytes_to_hex_string(&link.span_id),
69 trace_state: link.trace_state,
70 attributes: Attributes::from(link.attributes),
71 }
72 }
73}
74
75impl From<SpanLink> for jsonb::Value<'static> {
76 fn from(value: SpanLink) -> jsonb::Value<'static> {
77 jsonb::Value::Object(
78 vec![
79 (
80 "trace_id".to_string(),
81 jsonb::Value::String(value.trace_id.into()),
82 ),
83 (
84 "span_id".to_string(),
85 jsonb::Value::String(value.span_id.into()),
86 ),
87 (
88 "trace_state".to_string(),
89 jsonb::Value::String(value.trace_state.into()),
90 ),
91 ("attributes".to_string(), value.attributes.into()),
92 ]
93 .into_iter()
94 .collect(),
95 )
96 }
97}
98
99#[derive(Debug, Clone, Serialize)]
100pub struct SpanLinks(Vec<SpanLink>);
101
102impl From<Vec<Link>> for SpanLinks {
103 fn from(value: Vec<Link>) -> Self {
104 let links = value.into_iter().map(SpanLink::from).collect_vec();
105 Self(links)
106 }
107}
108
109impl From<SpanLinks> for jsonb::Value<'static> {
110 fn from(value: SpanLinks) -> jsonb::Value<'static> {
111 jsonb::Value::Array(value.0.into_iter().map(Into::into).collect())
112 }
113}
114
115impl Display for SpanLinks {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 write!(f, "{}", serde_json::to_string(self).unwrap_or_default())
118 }
119}
120
121impl SpanLinks {
122 pub fn get_ref(&self) -> &Vec<SpanLink> {
123 &self.0
124 }
125
126 pub fn get_mut(&mut self) -> &mut Vec<SpanLink> {
127 &mut self.0
128 }
129}
130
131#[derive(Debug, Clone, Serialize)]
132pub struct SpanEvent {
133 pub name: String,
134 pub time: String,
135 pub attributes: Attributes, }
137
138impl From<Event> for SpanEvent {
139 fn from(event: Event) -> Self {
140 Self {
141 name: event.name,
142 time: Timestamp::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
143 attributes: Attributes::from(event.attributes),
144 }
145 }
146}
147
148impl From<SpanEvent> for jsonb::Value<'static> {
149 fn from(value: SpanEvent) -> jsonb::Value<'static> {
150 jsonb::Value::Object(
151 vec![
152 ("name".to_string(), jsonb::Value::String(value.name.into())),
153 ("time".to_string(), jsonb::Value::String(value.time.into())),
154 ("attributes".to_string(), value.attributes.into()),
155 ]
156 .into_iter()
157 .collect(),
158 )
159 }
160}
161
162#[derive(Debug, Clone, Serialize)]
163pub struct SpanEvents(Vec<SpanEvent>);
164
165impl From<Vec<Event>> for SpanEvents {
166 fn from(value: Vec<Event>) -> Self {
167 let events = value.into_iter().map(SpanEvent::from).collect_vec();
168 Self(events)
169 }
170}
171
172impl From<SpanEvents> for jsonb::Value<'static> {
173 fn from(value: SpanEvents) -> jsonb::Value<'static> {
174 jsonb::Value::Array(value.0.into_iter().map(Into::into).collect())
175 }
176}
177
178impl Display for SpanEvents {
179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 write!(f, "{}", serde_json::to_string(self).unwrap_or_default())
181 }
182}
183
184impl SpanEvents {
185 pub fn get_ref(&self) -> &Vec<SpanEvent> {
186 &self.0
187 }
188
189 pub fn get_mut(&mut self) -> &mut Vec<SpanEvent> {
190 &mut self.0
191 }
192}
193
194pub fn parse_span(
195 service_name: Option<String>,
196 resource_attrs: &[KeyValue],
197 scope: &InstrumentationScope,
198 span: Span,
199) -> TraceSpan {
200 let (span_status_code, span_status_message) = status_to_string(&span.status);
201 let span_kind = span.kind().as_str_name().into();
202 TraceSpan {
203 service_name,
204 trace_id: bytes_to_hex_string(&span.trace_id),
205 span_id: bytes_to_hex_string(&span.span_id),
206 parent_span_id: if span.parent_span_id.is_empty() {
207 None
208 } else {
209 Some(bytes_to_hex_string(&span.parent_span_id))
210 },
211
212 resource_attributes: Attributes::from(resource_attrs),
213 trace_state: span.trace_state,
214
215 scope_name: scope.name.clone(),
216 scope_version: scope.version.clone(),
217 scope_attributes: Attributes::from(scope.attributes.clone()),
218
219 span_name: span.name,
220 span_kind,
221 span_status_code,
222 span_status_message,
223 span_attributes: Attributes::from(span.attributes),
224 span_events: SpanEvents::from(span.events),
225 span_links: SpanLinks::from(span.links),
226
227 start_in_nanosecond: span.start_time_unix_nano,
228 end_in_nanosecond: span.end_time_unix_nano,
229 }
230}
231
232pub fn status_to_string(status: &Option<Status>) -> (String, String) {
233 match status {
234 Some(status) => (status.code().as_str_name().into(), status.message.clone()),
235 None => ("".into(), "".into()),
236 }
237}
238
239pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
245 let span_size = request
246 .resource_spans
247 .iter()
248 .flat_map(|res| res.scope_spans.iter())
249 .flat_map(|scope| scope.spans.iter())
250 .count();
251 let mut spans = Vec::with_capacity(span_size);
252 for resource_spans in request.resource_spans {
253 let resource_attrs = resource_spans
254 .resource
255 .map(|r| r.attributes)
256 .unwrap_or_default();
257 let service_name = resource_attrs
258 .iter()
259 .find_or_first(|kv| kv.key == KEY_SERVICE_NAME)
260 .and_then(|kv| kv.value.clone())
261 .and_then(|v| match v.value {
262 Some(any_value::Value::StringValue(s)) => Some(s),
263 Some(any_value::Value::BytesValue(b)) => {
264 Some(String::from_utf8_lossy(&b).to_string())
265 }
266 _ => None,
267 });
268
269 for scope_spans in resource_spans.scope_spans {
270 let scope = scope_spans.scope.unwrap_or_default();
271 for span in scope_spans.spans {
272 spans.push(parse_span(
273 service_name.clone(),
274 &resource_attrs,
275 &scope,
276 span,
277 ));
278 }
279 }
280 }
281 spans
282}
283
284#[cfg(test)]
285mod tests {
286 use opentelemetry_proto::tonic::trace::v1::Status;
287
288 use crate::otlp::trace::span::{bytes_to_hex_string, status_to_string};
289
290 #[test]
291 fn test_bytes_to_hex_string() {
292 assert_eq!(
293 "24fe79948641b110a29bc27859307e8d",
294 bytes_to_hex_string(&[
295 36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141,
296 ])
297 );
298
299 assert_eq!(
300 "baffeedd7b8debc0",
301 bytes_to_hex_string(&[186, 255, 238, 221, 123, 141, 235, 192,])
302 );
303 }
304
305 #[test]
306 fn test_status_to_string() {
307 let message = String::from("status message");
308 let status = Status {
309 code: 1,
310 message: message.clone(),
311 };
312
313 assert_eq!(
314 ("STATUS_CODE_OK".into(), message),
315 status_to_string(&Some(status)),
316 );
317 }
318}