1pub mod attributes;
16pub mod span;
17pub mod v0;
18pub mod v1;
19
20use std::collections::HashSet;
21
22use api::v1::RowInsertRequests;
23pub use common_catalog::consts::{
24 PARENT_SPAN_ID_COLUMN, SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN,
25};
26use pipeline::{GreptimePipelineParams, PipelineWay};
27use session::context::QueryContextRef;
28
29use crate::error::{NotSupportedSnafu, Result};
30use crate::otlp::trace::span::TraceSpan;
31use crate::query_handler::PipelineHandlerRef;
32
33pub const SERVICE_NAME_COLUMN: &str = "service_name";
35pub const TIMESTAMP_COLUMN: &str = "timestamp";
36pub const DURATION_NANO_COLUMN: &str = "duration_nano";
37pub const SPAN_KIND_COLUMN: &str = "span_kind";
38pub const SPAN_STATUS_CODE: &str = "span_status_code";
39pub const SPAN_STATUS_MESSAGE_COLUMN: &str = "span_status_message";
40pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
41pub const SPAN_EVENTS_COLUMN: &str = "span_events";
42pub const SCOPE_NAME_COLUMN: &str = "scope_name";
43pub const SCOPE_VERSION_COLUMN: &str = "scope_version";
44pub const RESOURCE_ATTRIBUTES_COLUMN: &str = "resource_attributes";
45pub const TRACE_STATE_COLUMN: &str = "trace_state";
46
47pub const KEY_SERVICE_NAME: &str = "service.name";
49pub const KEY_SERVICE_INSTANCE_ID: &str = "service.instance.id";
50pub const KEY_SPAN_KIND: &str = "span.kind";
51
52pub const KEY_OTEL_SCOPE_NAME: &str = "otel.scope.name";
54pub const KEY_OTEL_SCOPE_VERSION: &str = "otel.scope.version";
55pub const KEY_OTEL_STATUS_CODE: &str = "otel.status_code";
56pub const KEY_OTEL_STATUS_MESSAGE: &str = "otel.status_description";
57pub const KEY_OTEL_STATUS_ERROR_KEY: &str = "error";
58pub const KEY_OTEL_TRACE_STATE: &str = "w3c.tracestate";
59
60pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
63
64pub const SPAN_STATUS_PREFIX: &str = "STATUS_CODE_";
66pub const SPAN_STATUS_UNSET: &str = "STATUS_CODE_UNSET";
67pub const SPAN_STATUS_ERROR: &str = "STATUS_CODE_ERROR";
68
69#[derive(Debug, Default)]
76pub struct TraceAuxData {
77 pub services: HashSet<String>,
78 pub operations: HashSet<(String, String, String)>,
79}
80
81impl TraceAuxData {
82 pub fn observe_span(&mut self, span: &TraceSpan) {
85 if let Some(service_name) = &span.service_name {
86 self.services.insert(service_name.clone());
87 self.operations.insert((
88 service_name.clone(),
89 span.span_name.clone(),
90 span.span_kind.clone(),
91 ));
92 }
93 }
94
95 pub fn is_empty(&self) -> bool {
97 self.services.is_empty() && self.operations.is_empty()
98 }
99}
100
101pub fn to_grpc_insert_requests_from_spans(
103 spans: &[TraceSpan],
104 pipeline: &PipelineWay,
105 pipeline_params: &GreptimePipelineParams,
106 table_name: &str,
107 query_ctx: &QueryContextRef,
108 pipeline_handler: PipelineHandlerRef,
109) -> Result<(RowInsertRequests, usize)> {
110 match pipeline {
111 PipelineWay::OtlpTraceDirectV0 => v0::v0_to_grpc_main_insert_requests(
112 spans,
113 pipeline,
114 pipeline_params,
115 table_name,
116 query_ctx,
117 pipeline_handler,
118 ),
119 PipelineWay::OtlpTraceDirectV1 => v1::v1_to_grpc_main_insert_requests(
120 spans,
121 pipeline,
122 pipeline_params,
123 table_name,
124 query_ctx,
125 pipeline_handler,
126 ),
127 _ => NotSupportedSnafu {
128 feat: "Unsupported pipeline for trace",
129 }
130 .fail(),
131 }
132}
133
134pub fn to_grpc_insert_requests_for_aux_tables(
140 aux_data: TraceAuxData,
141 pipeline: &PipelineWay,
142 table_name: &str,
143) -> Result<(RowInsertRequests, usize)> {
144 match pipeline {
145 PipelineWay::OtlpTraceDirectV0 => v0::build_aux_table_requests(aux_data, table_name),
146 PipelineWay::OtlpTraceDirectV1 => v1::build_aux_table_requests(aux_data, table_name),
147 _ => NotSupportedSnafu {
148 feat: "Unsupported pipeline for trace",
149 }
150 .fail(),
151 }
152}