1pub mod attributes;
16pub mod coerce;
17pub mod span;
18pub mod v0;
19pub mod v1;
20
21use std::collections::HashSet;
22
23use api::v1::RowInsertRequests;
24pub use common_catalog::consts::{
25 PARENT_SPAN_ID_COLUMN, SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN,
26};
27use pipeline::{GreptimePipelineParams, PipelineWay};
28use session::context::QueryContextRef;
29
30use crate::error::{NotSupportedSnafu, Result};
31use crate::otlp::trace::span::TraceSpan;
32use crate::query_handler::PipelineHandlerRef;
33
34pub const SERVICE_NAME_COLUMN: &str = "service_name";
36pub const TIMESTAMP_COLUMN: &str = "timestamp";
37pub const DURATION_NANO_COLUMN: &str = "duration_nano";
38pub const SPAN_KIND_COLUMN: &str = "span_kind";
39pub const SPAN_STATUS_CODE: &str = "span_status_code";
40pub const SPAN_STATUS_MESSAGE_COLUMN: &str = "span_status_message";
41pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
42pub const SPAN_EVENTS_COLUMN: &str = "span_events";
43pub const SCOPE_NAME_COLUMN: &str = "scope_name";
44pub const SCOPE_VERSION_COLUMN: &str = "scope_version";
45pub const RESOURCE_ATTRIBUTES_COLUMN: &str = "resource_attributes";
46pub const TRACE_STATE_COLUMN: &str = "trace_state";
47
48pub const KEY_SERVICE_NAME: &str = "service.name";
50pub const KEY_SERVICE_INSTANCE_ID: &str = "service.instance.id";
51pub const KEY_SPAN_KIND: &str = "span.kind";
52
53pub const KEY_OTEL_SCOPE_NAME: &str = "otel.scope.name";
55pub const KEY_OTEL_SCOPE_VERSION: &str = "otel.scope.version";
56pub const KEY_OTEL_STATUS_CODE: &str = "otel.status_code";
57pub const KEY_OTEL_STATUS_MESSAGE: &str = "otel.status_description";
58pub const KEY_OTEL_STATUS_ERROR_KEY: &str = "error";
59pub const KEY_OTEL_TRACE_STATE: &str = "w3c.tracestate";
60
61pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
64
65pub const SPAN_STATUS_PREFIX: &str = "STATUS_CODE_";
67pub const SPAN_STATUS_UNSET: &str = "STATUS_CODE_UNSET";
68pub const SPAN_STATUS_ERROR: &str = "STATUS_CODE_ERROR";
69
70#[derive(Debug, Default)]
77pub struct TraceAuxData {
78 pub services: HashSet<String>,
79 pub operations: HashSet<(String, String, String)>,
80}
81
82impl TraceAuxData {
83 pub fn observe_span(&mut self, span: &TraceSpan) {
86 if let Some(service_name) = &span.service_name {
87 self.services.insert(service_name.clone());
88 self.operations.insert((
89 service_name.clone(),
90 span.span_name.clone(),
91 span.span_kind.clone(),
92 ));
93 }
94 }
95
96 pub fn is_empty(&self) -> bool {
98 self.services.is_empty() && self.operations.is_empty()
99 }
100}
101
102pub fn to_grpc_insert_requests_from_spans(
104 spans: &[TraceSpan],
105 pipeline: &PipelineWay,
106 pipeline_params: &GreptimePipelineParams,
107 table_name: &str,
108 query_ctx: &QueryContextRef,
109 pipeline_handler: PipelineHandlerRef,
110) -> Result<(RowInsertRequests, usize)> {
111 match pipeline {
112 PipelineWay::OtlpTraceDirectV0 => v0::v0_to_grpc_main_insert_requests(
113 spans,
114 pipeline,
115 pipeline_params,
116 table_name,
117 query_ctx,
118 pipeline_handler,
119 ),
120 PipelineWay::OtlpTraceDirectV1 => v1::v1_to_grpc_main_insert_requests(
121 spans,
122 pipeline,
123 pipeline_params,
124 table_name,
125 query_ctx,
126 pipeline_handler,
127 ),
128 _ => NotSupportedSnafu {
129 feat: "Unsupported pipeline for trace",
130 }
131 .fail(),
132 }
133}
134
135pub fn to_grpc_insert_requests_for_aux_tables(
141 aux_data: TraceAuxData,
142 pipeline: &PipelineWay,
143 table_name: &str,
144) -> Result<(RowInsertRequests, usize)> {
145 match pipeline {
146 PipelineWay::OtlpTraceDirectV0 => v0::build_aux_table_requests(aux_data, table_name),
147 PipelineWay::OtlpTraceDirectV1 => v1::build_aux_table_requests(aux_data, table_name),
148 _ => NotSupportedSnafu {
149 feat: "Unsupported pipeline for trace",
150 }
151 .fail(),
152 }
153}