Skip to main content

servers/otlp/trace/
span.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17use common_time::timestamp::Timestamp;
18use itertools::Itertools;
19use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
20use opentelemetry_proto::tonic::common::v1::{InstrumentationScope, KeyValue, any_value};
21use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
22use opentelemetry_proto::tonic::trace::v1::{Span, Status};
23use serde::Serialize;
24
25use crate::otlp::trace::KEY_SERVICE_NAME;
26use crate::otlp::trace::attributes::Attributes;
27use crate::otlp::utils::bytes_to_hex_string;
28
29#[derive(Debug, Clone)]
30pub struct TraceSpan {
31    // the following are tags
32    pub service_name: Option<String>,
33    pub trace_id: String,
34    pub span_id: String,
35    pub parent_span_id: Option<String>,
36
37    // the following are fields
38    pub resource_attributes: Attributes,
39    pub scope_name: String,
40    pub scope_version: String,
41    pub scope_attributes: Attributes,
42    pub trace_state: String,
43    pub span_name: String,
44    pub span_kind: String,
45    pub span_status_code: String,
46    pub span_status_message: String,
47    pub span_attributes: Attributes,
48    pub span_events: SpanEvents,  // TODO(yuanbohan): List in the future
49    pub span_links: SpanLinks,    // TODO(yuanbohan): List in the future
50    pub start_in_nanosecond: u64, // this is also the Timestamp Index
51    pub end_in_nanosecond: u64,
52}
53
54pub type TraceSpans = Vec<TraceSpan>;
55
56#[derive(Debug, Clone)]
57pub struct TraceSpanGroup {
58    pub service_name: Option<String>,
59    pub resource_attributes: Attributes,
60    pub scope_name: String,
61    pub scope_version: String,
62    pub scope_attributes: Attributes,
63    pub spans: TraceSpans,
64}
65
66pub type TraceSpanGroups = Vec<TraceSpanGroup>;
67
68#[derive(Debug, Clone, Serialize)]
69pub struct SpanLink {
70    pub trace_id: String,
71    pub span_id: String,
72    pub trace_state: String,
73    pub attributes: Attributes, // TODO(yuanbohan): Map in the future
74}
75
76impl From<Link> for SpanLink {
77    fn from(link: Link) -> Self {
78        Self {
79            trace_id: bytes_to_hex_string(&link.trace_id),
80            span_id: bytes_to_hex_string(&link.span_id),
81            trace_state: link.trace_state,
82            attributes: Attributes::from(link.attributes),
83        }
84    }
85}
86
87impl From<SpanLink> for jsonb::Value<'static> {
88    fn from(value: SpanLink) -> jsonb::Value<'static> {
89        jsonb::Value::Object(
90            vec![
91                (
92                    "trace_id".to_string(),
93                    jsonb::Value::String(value.trace_id.into()),
94                ),
95                (
96                    "span_id".to_string(),
97                    jsonb::Value::String(value.span_id.into()),
98                ),
99                (
100                    "trace_state".to_string(),
101                    jsonb::Value::String(value.trace_state.into()),
102                ),
103                ("attributes".to_string(), value.attributes.into()),
104            ]
105            .into_iter()
106            .collect(),
107        )
108    }
109}
110
111#[derive(Debug, Clone, Serialize)]
112pub struct SpanLinks(Vec<SpanLink>);
113
114impl From<Vec<Link>> for SpanLinks {
115    fn from(value: Vec<Link>) -> Self {
116        let links = value.into_iter().map(SpanLink::from).collect_vec();
117        Self(links)
118    }
119}
120
121impl From<SpanLinks> for jsonb::Value<'static> {
122    fn from(value: SpanLinks) -> jsonb::Value<'static> {
123        jsonb::Value::Array(value.0.into_iter().map(Into::into).collect())
124    }
125}
126
127impl Display for SpanLinks {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        write!(f, "{}", serde_json::to_string(self).unwrap_or_default())
130    }
131}
132
133impl SpanLinks {
134    pub fn get_ref(&self) -> &Vec<SpanLink> {
135        &self.0
136    }
137
138    pub fn get_mut(&mut self) -> &mut Vec<SpanLink> {
139        &mut self.0
140    }
141}
142
143#[derive(Debug, Clone, Serialize)]
144pub struct SpanEvent {
145    pub name: String,
146    pub time: String,
147    pub attributes: Attributes, // TODO(yuanbohan): Map in the future
148}
149
150impl From<Event> for SpanEvent {
151    fn from(event: Event) -> Self {
152        Self {
153            name: event.name,
154            time: Timestamp::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
155            attributes: Attributes::from(event.attributes),
156        }
157    }
158}
159
160impl From<SpanEvent> for jsonb::Value<'static> {
161    fn from(value: SpanEvent) -> jsonb::Value<'static> {
162        jsonb::Value::Object(
163            vec![
164                ("name".to_string(), jsonb::Value::String(value.name.into())),
165                ("time".to_string(), jsonb::Value::String(value.time.into())),
166                ("attributes".to_string(), value.attributes.into()),
167            ]
168            .into_iter()
169            .collect(),
170        )
171    }
172}
173
174#[derive(Debug, Clone, Serialize)]
175pub struct SpanEvents(Vec<SpanEvent>);
176
177impl From<Vec<Event>> for SpanEvents {
178    fn from(value: Vec<Event>) -> Self {
179        let events = value.into_iter().map(SpanEvent::from).collect_vec();
180        Self(events)
181    }
182}
183
184impl From<SpanEvents> for jsonb::Value<'static> {
185    fn from(value: SpanEvents) -> jsonb::Value<'static> {
186        jsonb::Value::Array(value.0.into_iter().map(Into::into).collect())
187    }
188}
189
190impl Display for SpanEvents {
191    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192        write!(f, "{}", serde_json::to_string(self).unwrap_or_default())
193    }
194}
195
196impl SpanEvents {
197    pub fn get_ref(&self) -> &Vec<SpanEvent> {
198        &self.0
199    }
200
201    pub fn get_mut(&mut self) -> &mut Vec<SpanEvent> {
202        &mut self.0
203    }
204}
205
206pub fn parse_span(
207    service_name: Option<String>,
208    resource_attrs: &[KeyValue],
209    scope: &InstrumentationScope,
210    span: Span,
211) -> TraceSpan {
212    let (span_status_code, span_status_message) = status_to_string(&span.status);
213    let span_kind = span.kind().as_str_name().into();
214    TraceSpan {
215        service_name,
216        trace_id: bytes_to_hex_string(&span.trace_id),
217        span_id: bytes_to_hex_string(&span.span_id),
218        parent_span_id: if span.parent_span_id.is_empty() {
219            None
220        } else {
221            Some(bytes_to_hex_string(&span.parent_span_id))
222        },
223
224        resource_attributes: Attributes::from(resource_attrs),
225        trace_state: span.trace_state,
226
227        scope_name: scope.name.clone(),
228        scope_version: scope.version.clone(),
229        scope_attributes: Attributes::from(scope.attributes.clone()),
230
231        span_name: span.name,
232        span_kind,
233        span_status_code,
234        span_status_message,
235        span_attributes: Attributes::from(span.attributes),
236        span_events: SpanEvents::from(span.events),
237        span_links: SpanLinks::from(span.links),
238
239        start_in_nanosecond: span.start_time_unix_nano,
240        end_in_nanosecond: span.end_time_unix_nano,
241    }
242}
243
244pub fn status_to_string(status: &Option<Status>) -> (String, String) {
245    match status {
246        Some(status) => (status.code().as_str_name().into(), status.message.clone()),
247        None => ("".into(), "".into()),
248    }
249}
250
251/// Convert OpenTelemetry traces to SpanTraces
252///
253/// See
254/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
255/// for data structure of OTLP traces.
256pub fn parse(request: ExportTraceServiceRequest) -> TraceSpanGroups {
257    let group_size = request
258        .resource_spans
259        .iter()
260        .flat_map(|res| res.scope_spans.iter())
261        .count();
262    let mut groups = Vec::with_capacity(group_size);
263    for resource_spans in request.resource_spans {
264        let resource_attrs = resource_spans
265            .resource
266            .map(|r| r.attributes)
267            .unwrap_or_default();
268        let service_name = resource_attrs
269            .iter()
270            .find_or_first(|kv| kv.key == KEY_SERVICE_NAME)
271            .and_then(|kv| kv.value.clone())
272            .and_then(|v| match v.value {
273                Some(any_value::Value::StringValue(s)) => Some(s),
274                Some(any_value::Value::BytesValue(b)) => {
275                    Some(String::from_utf8_lossy(&b).to_string())
276                }
277                _ => None,
278            });
279
280        for scope_spans in resource_spans.scope_spans {
281            let scope = scope_spans.scope.unwrap_or_default();
282            let mut spans = Vec::with_capacity(scope_spans.spans.len());
283            for span in scope_spans.spans {
284                spans.push(parse_span(
285                    service_name.clone(),
286                    &resource_attrs,
287                    &scope,
288                    span,
289                ));
290            }
291            groups.push(TraceSpanGroup {
292                service_name: service_name.clone(),
293                resource_attributes: Attributes::from(&resource_attrs[..]),
294                scope_name: scope.name,
295                scope_version: scope.version,
296                scope_attributes: Attributes::from(scope.attributes),
297                spans,
298            });
299        }
300    }
301    groups
302}
303
304#[cfg(test)]
305mod tests {
306    use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
307    use opentelemetry_proto::tonic::common::v1::{
308        AnyValue, InstrumentationScope, KeyValue, any_value,
309    };
310    use opentelemetry_proto::tonic::resource::v1::Resource;
311    use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status};
312
313    use crate::otlp::trace::KEY_SERVICE_NAME;
314    use crate::otlp::trace::span::{bytes_to_hex_string, parse, status_to_string};
315
316    fn make_kv(key: &str, value: &str) -> KeyValue {
317        KeyValue {
318            key: key.to_string(),
319            value: Some(AnyValue {
320                value: Some(any_value::Value::StringValue(value.to_string())),
321            }),
322        }
323    }
324
325    fn make_span(trace_id: u8, span_id: u8) -> Span {
326        Span {
327            trace_id: vec![trace_id; 16],
328            span_id: vec![span_id; 8],
329            ..Default::default()
330        }
331    }
332
333    #[test]
334    fn test_bytes_to_hex_string() {
335        assert_eq!(
336            "24fe79948641b110a29bc27859307e8d",
337            bytes_to_hex_string(&[
338                36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141,
339            ])
340        );
341
342        assert_eq!(
343            "baffeedd7b8debc0",
344            bytes_to_hex_string(&[186, 255, 238, 221, 123, 141, 235, 192,])
345        );
346    }
347
348    #[test]
349    fn test_status_to_string() {
350        let message = String::from("status message");
351        let status = Status {
352            code: 1,
353            message: message.clone(),
354        };
355
356        assert_eq!(
357            ("STATUS_CODE_OK".into(), message),
358            status_to_string(&Some(status)),
359        );
360    }
361
362    #[test]
363    fn test_parse_preserves_resource_scope_groups() {
364        let request = ExportTraceServiceRequest {
365            resource_spans: vec![
366                ResourceSpans {
367                    resource: Some(Resource {
368                        attributes: vec![make_kv(KEY_SERVICE_NAME, "svc-a")],
369                        ..Default::default()
370                    }),
371                    scope_spans: vec![
372                        ScopeSpans {
373                            scope: Some(InstrumentationScope {
374                                name: "scope-1".to_string(),
375                                ..Default::default()
376                            }),
377                            spans: vec![make_span(0x11, 0x21), make_span(0x12, 0x22)],
378                            ..Default::default()
379                        },
380                        ScopeSpans {
381                            scope: Some(InstrumentationScope {
382                                name: "scope-2".to_string(),
383                                ..Default::default()
384                            }),
385                            spans: vec![make_span(0x13, 0x23)],
386                            ..Default::default()
387                        },
388                    ],
389                    ..Default::default()
390                },
391                ResourceSpans {
392                    resource: Some(Resource {
393                        attributes: vec![make_kv(KEY_SERVICE_NAME, "svc-b")],
394                        ..Default::default()
395                    }),
396                    scope_spans: vec![ScopeSpans {
397                        scope: Some(InstrumentationScope {
398                            name: "scope-3".to_string(),
399                            ..Default::default()
400                        }),
401                        spans: vec![make_span(0x14, 0x24)],
402                        ..Default::default()
403                    }],
404                    ..Default::default()
405                },
406            ],
407        };
408
409        let groups = parse(request);
410        assert_eq!(groups.len(), 3);
411        assert_eq!(groups[0].service_name.as_deref(), Some("svc-a"));
412        assert_eq!(groups[0].scope_name, "scope-1");
413        assert_eq!(groups[0].spans.len(), 2);
414        assert_eq!(groups[1].scope_name, "scope-2");
415        assert_eq!(groups[1].spans.len(), 1);
416        assert_eq!(groups[2].service_name.as_deref(), Some("svc-b"));
417        assert_eq!(groups[2].scope_name, "scope-3");
418    }
419}