Skip to main content

servers/otlp/
trace.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod attributes;
16pub mod span;
17pub mod v0;
18pub mod v1;
19
20use std::collections::HashSet;
21
22use api::v1::RowInsertRequests;
23pub use common_catalog::consts::{
24    PARENT_SPAN_ID_COLUMN, SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN,
25};
26use pipeline::{GreptimePipelineParams, PipelineWay};
27use session::context::QueryContextRef;
28
29use crate::error::{NotSupportedSnafu, Result};
30use crate::otlp::trace::span::TraceSpan;
31use crate::query_handler::PipelineHandlerRef;
32
33// column names
34pub const SERVICE_NAME_COLUMN: &str = "service_name";
35pub const TIMESTAMP_COLUMN: &str = "timestamp";
36pub const DURATION_NANO_COLUMN: &str = "duration_nano";
37pub const SPAN_KIND_COLUMN: &str = "span_kind";
38pub const SPAN_STATUS_CODE: &str = "span_status_code";
39pub const SPAN_STATUS_MESSAGE_COLUMN: &str = "span_status_message";
40pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
41pub const SPAN_EVENTS_COLUMN: &str = "span_events";
42pub const SCOPE_NAME_COLUMN: &str = "scope_name";
43pub const SCOPE_VERSION_COLUMN: &str = "scope_version";
44pub const RESOURCE_ATTRIBUTES_COLUMN: &str = "resource_attributes";
45pub const TRACE_STATE_COLUMN: &str = "trace_state";
46
47// const keys
48pub const KEY_SERVICE_NAME: &str = "service.name";
49pub const KEY_SERVICE_INSTANCE_ID: &str = "service.instance.id";
50pub const KEY_SPAN_KIND: &str = "span.kind";
51
52// jaeger const keys, not sure if they are general
53pub const KEY_OTEL_SCOPE_NAME: &str = "otel.scope.name";
54pub const KEY_OTEL_SCOPE_VERSION: &str = "otel.scope.version";
55pub const KEY_OTEL_STATUS_CODE: &str = "otel.status_code";
56pub const KEY_OTEL_STATUS_MESSAGE: &str = "otel.status_description";
57pub const KEY_OTEL_STATUS_ERROR_KEY: &str = "error";
58pub const KEY_OTEL_TRACE_STATE: &str = "w3c.tracestate";
59
60/// The span kind prefix in the database.
61/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
62pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
63
64// The span status code prefix in the database.
65pub const SPAN_STATUS_PREFIX: &str = "STATUS_CODE_";
66pub const SPAN_STATUS_UNSET: &str = "STATUS_CODE_UNSET";
67pub const SPAN_STATUS_ERROR: &str = "STATUS_CODE_ERROR";
68
69/// Deduplicated auxiliary trace entities derived from successfully ingested
70/// spans.
71///
72/// The main trace table is written first. Once a span is confirmed accepted, we
73/// record the service and operation tuples here so the auxiliary tables can be
74/// updated separately without affecting span acceptance accounting.
75#[derive(Debug, Default)]
76pub struct TraceAuxData {
77    pub services: HashSet<String>,
78    pub operations: HashSet<(String, String, String)>,
79}
80
81impl TraceAuxData {
82    /// Records the auxiliary service and operation rows implied by one accepted
83    /// span.
84    pub fn observe_span(&mut self, span: &TraceSpan) {
85        if let Some(service_name) = &span.service_name {
86            self.services.insert(service_name.clone());
87            self.operations.insert((
88                service_name.clone(),
89                span.span_name.clone(),
90                span.span_kind.clone(),
91            ));
92        }
93    }
94
95    /// Returns true when no auxiliary table updates are needed.
96    pub fn is_empty(&self) -> bool {
97        self.services.is_empty() && self.operations.is_empty()
98    }
99}
100
101/// Convert a subset of trace spans to GreptimeDB row insert requests.
102pub fn to_grpc_insert_requests_from_spans(
103    spans: &[TraceSpan],
104    pipeline: &PipelineWay,
105    pipeline_params: &GreptimePipelineParams,
106    table_name: &str,
107    query_ctx: &QueryContextRef,
108    pipeline_handler: PipelineHandlerRef,
109) -> Result<(RowInsertRequests, usize)> {
110    match pipeline {
111        PipelineWay::OtlpTraceDirectV0 => v0::v0_to_grpc_main_insert_requests(
112            spans,
113            pipeline,
114            pipeline_params,
115            table_name,
116            query_ctx,
117            pipeline_handler,
118        ),
119        PipelineWay::OtlpTraceDirectV1 => v1::v1_to_grpc_main_insert_requests(
120            spans,
121            pipeline,
122            pipeline_params,
123            table_name,
124            query_ctx,
125            pipeline_handler,
126        ),
127        _ => NotSupportedSnafu {
128            feat: "Unsupported pipeline for trace",
129        }
130        .fail(),
131    }
132}
133
134/// Build insert requests for the auxiliary trace tables derived from accepted
135/// spans.
136///
137/// "Aux" here refers to the trace service and trace operation tables, not the
138/// main trace span table itself.
139pub fn to_grpc_insert_requests_for_aux_tables(
140    aux_data: TraceAuxData,
141    pipeline: &PipelineWay,
142    table_name: &str,
143) -> Result<(RowInsertRequests, usize)> {
144    match pipeline {
145        PipelineWay::OtlpTraceDirectV0 => v0::build_aux_table_requests(aux_data, table_name),
146        PipelineWay::OtlpTraceDirectV1 => v1::build_aux_table_requests(aux_data, table_name),
147        _ => NotSupportedSnafu {
148            feat: "Unsupported pipeline for trace",
149        }
150        .fail(),
151    }
152}