Skip to main content

servers/otlp/
trace.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod attributes;
16pub mod coerce;
17pub mod span;
18pub mod v0;
19pub mod v1;
20
21use std::collections::HashSet;
22
23use api::v1::RowInsertRequests;
24pub use common_catalog::consts::{
25    PARENT_SPAN_ID_COLUMN, SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN,
26};
27use pipeline::{GreptimePipelineParams, PipelineWay};
28use session::context::QueryContextRef;
29
30use crate::error::{NotSupportedSnafu, Result};
31use crate::otlp::trace::span::TraceSpan;
32use crate::query_handler::PipelineHandlerRef;
33
34// column names
35pub const SERVICE_NAME_COLUMN: &str = "service_name";
36pub const TIMESTAMP_COLUMN: &str = "timestamp";
37pub const DURATION_NANO_COLUMN: &str = "duration_nano";
38pub const SPAN_KIND_COLUMN: &str = "span_kind";
39pub const SPAN_STATUS_CODE: &str = "span_status_code";
40pub const SPAN_STATUS_MESSAGE_COLUMN: &str = "span_status_message";
41pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
42pub const SPAN_EVENTS_COLUMN: &str = "span_events";
43pub const SCOPE_NAME_COLUMN: &str = "scope_name";
44pub const SCOPE_VERSION_COLUMN: &str = "scope_version";
45pub const RESOURCE_ATTRIBUTES_COLUMN: &str = "resource_attributes";
46pub const TRACE_STATE_COLUMN: &str = "trace_state";
47
48// const keys
49pub const KEY_SERVICE_NAME: &str = "service.name";
50pub const KEY_SERVICE_INSTANCE_ID: &str = "service.instance.id";
51pub const KEY_SPAN_KIND: &str = "span.kind";
52
53// jaeger const keys, not sure if they are general
54pub const KEY_OTEL_SCOPE_NAME: &str = "otel.scope.name";
55pub const KEY_OTEL_SCOPE_VERSION: &str = "otel.scope.version";
56pub const KEY_OTEL_STATUS_CODE: &str = "otel.status_code";
57pub const KEY_OTEL_STATUS_MESSAGE: &str = "otel.status_description";
58pub const KEY_OTEL_STATUS_ERROR_KEY: &str = "error";
59pub const KEY_OTEL_TRACE_STATE: &str = "w3c.tracestate";
60
61/// The span kind prefix in the database.
62/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
63pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
64
65// The span status code prefix in the database.
66pub const SPAN_STATUS_PREFIX: &str = "STATUS_CODE_";
67pub const SPAN_STATUS_UNSET: &str = "STATUS_CODE_UNSET";
68pub const SPAN_STATUS_ERROR: &str = "STATUS_CODE_ERROR";
69
70/// Deduplicated auxiliary trace entities derived from successfully ingested
71/// spans.
72///
73/// The main trace table is written first. Once a span is confirmed accepted, we
74/// record the service and operation tuples here so the auxiliary tables can be
75/// updated separately without affecting span acceptance accounting.
76#[derive(Debug, Default)]
77pub struct TraceAuxData {
78    pub services: HashSet<String>,
79    pub operations: HashSet<(String, String, String)>,
80}
81
82impl TraceAuxData {
83    /// Records the auxiliary service and operation rows implied by one accepted
84    /// span.
85    pub fn observe_span(&mut self, span: &TraceSpan) {
86        if let Some(service_name) = &span.service_name {
87            self.services.insert(service_name.clone());
88            self.operations.insert((
89                service_name.clone(),
90                span.span_name.clone(),
91                span.span_kind.clone(),
92            ));
93        }
94    }
95
96    /// Returns true when no auxiliary table updates are needed.
97    pub fn is_empty(&self) -> bool {
98        self.services.is_empty() && self.operations.is_empty()
99    }
100}
101
102/// Convert a subset of trace spans to GreptimeDB row insert requests.
103pub fn to_grpc_insert_requests_from_spans(
104    spans: &[TraceSpan],
105    pipeline: &PipelineWay,
106    pipeline_params: &GreptimePipelineParams,
107    table_name: &str,
108    query_ctx: &QueryContextRef,
109    pipeline_handler: PipelineHandlerRef,
110) -> Result<(RowInsertRequests, usize)> {
111    match pipeline {
112        PipelineWay::OtlpTraceDirectV0 => v0::v0_to_grpc_main_insert_requests(
113            spans,
114            pipeline,
115            pipeline_params,
116            table_name,
117            query_ctx,
118            pipeline_handler,
119        ),
120        PipelineWay::OtlpTraceDirectV1 => v1::v1_to_grpc_main_insert_requests(
121            spans,
122            pipeline,
123            pipeline_params,
124            table_name,
125            query_ctx,
126            pipeline_handler,
127        ),
128        _ => NotSupportedSnafu {
129            feat: "Unsupported pipeline for trace",
130        }
131        .fail(),
132    }
133}
134
135/// Build insert requests for the auxiliary trace tables derived from accepted
136/// spans.
137///
138/// "Aux" here refers to the trace service and trace operation tables, not the
139/// main trace span table itself.
140pub fn to_grpc_insert_requests_for_aux_tables(
141    aux_data: TraceAuxData,
142    pipeline: &PipelineWay,
143    table_name: &str,
144) -> Result<(RowInsertRequests, usize)> {
145    match pipeline {
146        PipelineWay::OtlpTraceDirectV0 => v0::build_aux_table_requests(aux_data, table_name),
147        PipelineWay::OtlpTraceDirectV1 => v1::build_aux_table_requests(aux_data, table_name),
148        _ => NotSupportedSnafu {
149            feat: "Unsupported pipeline for trace",
150        }
151        .fail(),
152    }
153}