servers/otlp/trace/
span.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17use common_time::timestamp::Timestamp;
18use itertools::Itertools;
19use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
20use opentelemetry_proto::tonic::common::v1::{any_value, InstrumentationScope, KeyValue};
21use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
22use opentelemetry_proto::tonic::trace::v1::{Span, Status};
23use serde::Serialize;
24
25use crate::otlp::trace::attributes::Attributes;
26use crate::otlp::trace::KEY_SERVICE_NAME;
27use crate::otlp::utils::bytes_to_hex_string;
28
29#[derive(Debug, Clone)]
30pub struct TraceSpan {
31    // the following are tags
32    pub service_name: Option<String>,
33    pub trace_id: String,
34    pub span_id: String,
35    pub parent_span_id: Option<String>,
36
37    // the following are fields
38    pub resource_attributes: Attributes,
39    pub scope_name: String,
40    pub scope_version: String,
41    pub scope_attributes: Attributes,
42    pub trace_state: String,
43    pub span_name: String,
44    pub span_kind: String,
45    pub span_status_code: String,
46    pub span_status_message: String,
47    pub span_attributes: Attributes,
48    pub span_events: SpanEvents,  // TODO(yuanbohan): List in the future
49    pub span_links: SpanLinks,    // TODO(yuanbohan): List in the future
50    pub start_in_nanosecond: u64, // this is also the Timestamp Index
51    pub end_in_nanosecond: u64,
52}
53
54pub type TraceSpans = Vec<TraceSpan>;
55
56#[derive(Debug, Clone, Serialize)]
57pub struct SpanLink {
58    pub trace_id: String,
59    pub span_id: String,
60    pub trace_state: String,
61    pub attributes: Attributes, // TODO(yuanbohan): Map in the future
62}
63
64impl From<Link> for SpanLink {
65    fn from(link: Link) -> Self {
66        Self {
67            trace_id: bytes_to_hex_string(&link.trace_id),
68            span_id: bytes_to_hex_string(&link.span_id),
69            trace_state: link.trace_state,
70            attributes: Attributes::from(link.attributes),
71        }
72    }
73}
74
75impl From<SpanLink> for jsonb::Value<'static> {
76    fn from(value: SpanLink) -> jsonb::Value<'static> {
77        jsonb::Value::Object(
78            vec![
79                (
80                    "trace_id".to_string(),
81                    jsonb::Value::String(value.trace_id.into()),
82                ),
83                (
84                    "span_id".to_string(),
85                    jsonb::Value::String(value.span_id.into()),
86                ),
87                (
88                    "trace_state".to_string(),
89                    jsonb::Value::String(value.trace_state.into()),
90                ),
91                ("attributes".to_string(), value.attributes.into()),
92            ]
93            .into_iter()
94            .collect(),
95        )
96    }
97}
98
99#[derive(Debug, Clone, Serialize)]
100pub struct SpanLinks(Vec<SpanLink>);
101
102impl From<Vec<Link>> for SpanLinks {
103    fn from(value: Vec<Link>) -> Self {
104        let links = value.into_iter().map(SpanLink::from).collect_vec();
105        Self(links)
106    }
107}
108
109impl From<SpanLinks> for jsonb::Value<'static> {
110    fn from(value: SpanLinks) -> jsonb::Value<'static> {
111        jsonb::Value::Array(value.0.into_iter().map(Into::into).collect())
112    }
113}
114
115impl Display for SpanLinks {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        write!(f, "{}", serde_json::to_string(self).unwrap_or_default())
118    }
119}
120
121impl SpanLinks {
122    pub fn get_ref(&self) -> &Vec<SpanLink> {
123        &self.0
124    }
125
126    pub fn get_mut(&mut self) -> &mut Vec<SpanLink> {
127        &mut self.0
128    }
129}
130
131#[derive(Debug, Clone, Serialize)]
132pub struct SpanEvent {
133    pub name: String,
134    pub time: String,
135    pub attributes: Attributes, // TODO(yuanbohan): Map in the future
136}
137
138impl From<Event> for SpanEvent {
139    fn from(event: Event) -> Self {
140        Self {
141            name: event.name,
142            time: Timestamp::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
143            attributes: Attributes::from(event.attributes),
144        }
145    }
146}
147
148impl From<SpanEvent> for jsonb::Value<'static> {
149    fn from(value: SpanEvent) -> jsonb::Value<'static> {
150        jsonb::Value::Object(
151            vec![
152                ("name".to_string(), jsonb::Value::String(value.name.into())),
153                ("time".to_string(), jsonb::Value::String(value.time.into())),
154                ("attributes".to_string(), value.attributes.into()),
155            ]
156            .into_iter()
157            .collect(),
158        )
159    }
160}
161
162#[derive(Debug, Clone, Serialize)]
163pub struct SpanEvents(Vec<SpanEvent>);
164
165impl From<Vec<Event>> for SpanEvents {
166    fn from(value: Vec<Event>) -> Self {
167        let events = value.into_iter().map(SpanEvent::from).collect_vec();
168        Self(events)
169    }
170}
171
172impl From<SpanEvents> for jsonb::Value<'static> {
173    fn from(value: SpanEvents) -> jsonb::Value<'static> {
174        jsonb::Value::Array(value.0.into_iter().map(Into::into).collect())
175    }
176}
177
178impl Display for SpanEvents {
179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180        write!(f, "{}", serde_json::to_string(self).unwrap_or_default())
181    }
182}
183
184impl SpanEvents {
185    pub fn get_ref(&self) -> &Vec<SpanEvent> {
186        &self.0
187    }
188
189    pub fn get_mut(&mut self) -> &mut Vec<SpanEvent> {
190        &mut self.0
191    }
192}
193
194pub fn parse_span(
195    service_name: Option<String>,
196    resource_attrs: &[KeyValue],
197    scope: &InstrumentationScope,
198    span: Span,
199) -> TraceSpan {
200    let (span_status_code, span_status_message) = status_to_string(&span.status);
201    let span_kind = span.kind().as_str_name().into();
202    TraceSpan {
203        service_name,
204        trace_id: bytes_to_hex_string(&span.trace_id),
205        span_id: bytes_to_hex_string(&span.span_id),
206        parent_span_id: if span.parent_span_id.is_empty() {
207            None
208        } else {
209            Some(bytes_to_hex_string(&span.parent_span_id))
210        },
211
212        resource_attributes: Attributes::from(resource_attrs),
213        trace_state: span.trace_state,
214
215        scope_name: scope.name.clone(),
216        scope_version: scope.version.clone(),
217        scope_attributes: Attributes::from(scope.attributes.clone()),
218
219        span_name: span.name,
220        span_kind,
221        span_status_code,
222        span_status_message,
223        span_attributes: Attributes::from(span.attributes),
224        span_events: SpanEvents::from(span.events),
225        span_links: SpanLinks::from(span.links),
226
227        start_in_nanosecond: span.start_time_unix_nano,
228        end_in_nanosecond: span.end_time_unix_nano,
229    }
230}
231
232pub fn status_to_string(status: &Option<Status>) -> (String, String) {
233    match status {
234        Some(status) => (status.code().as_str_name().into(), status.message.clone()),
235        None => ("".into(), "".into()),
236    }
237}
238
239/// Convert OpenTelemetry traces to SpanTraces
240///
241/// See
242/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
243/// for data structure of OTLP traces.
244pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
245    let span_size = request
246        .resource_spans
247        .iter()
248        .flat_map(|res| res.scope_spans.iter())
249        .flat_map(|scope| scope.spans.iter())
250        .count();
251    let mut spans = Vec::with_capacity(span_size);
252    for resource_spans in request.resource_spans {
253        let resource_attrs = resource_spans
254            .resource
255            .map(|r| r.attributes)
256            .unwrap_or_default();
257        let service_name = resource_attrs
258            .iter()
259            .find_or_first(|kv| kv.key == KEY_SERVICE_NAME)
260            .and_then(|kv| kv.value.clone())
261            .and_then(|v| match v.value {
262                Some(any_value::Value::StringValue(s)) => Some(s),
263                Some(any_value::Value::BytesValue(b)) => {
264                    Some(String::from_utf8_lossy(&b).to_string())
265                }
266                _ => None,
267            });
268
269        for scope_spans in resource_spans.scope_spans {
270            let scope = scope_spans.scope.unwrap_or_default();
271            for span in scope_spans.spans {
272                spans.push(parse_span(
273                    service_name.clone(),
274                    &resource_attrs,
275                    &scope,
276                    span,
277                ));
278            }
279        }
280    }
281    spans
282}
283
284#[cfg(test)]
285mod tests {
286    use opentelemetry_proto::tonic::trace::v1::Status;
287
288    use crate::otlp::trace::span::{bytes_to_hex_string, status_to_string};
289
290    #[test]
291    fn test_bytes_to_hex_string() {
292        assert_eq!(
293            "24fe79948641b110a29bc27859307e8d",
294            bytes_to_hex_string(&[
295                36, 254, 121, 148, 134, 65, 177, 16, 162, 155, 194, 120, 89, 48, 126, 141,
296            ])
297        );
298
299        assert_eq!(
300            "baffeedd7b8debc0",
301            bytes_to_hex_string(&[186, 255, 238, 221, 123, 141, 235, 192,])
302        );
303    }
304
305    #[test]
306    fn test_status_to_string() {
307        let message = String::from("status message");
308        let status = Status {
309            code: 1,
310            message: message.clone(),
311        };
312
313        assert_eq!(
314            ("STATUS_CODE_OK".into(), message),
315            status_to_string(&Some(status)),
316        );
317    }
318}