1use std::sync::Arc;
16
17use axum::extract::State;
18use axum::http::header;
19use axum::response::IntoResponse;
20use axum::Extension;
21use bytes::Bytes;
22use common_catalog::consts::{TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY};
23use common_telemetry::tracing;
24use opentelemetry_proto::tonic::collector::logs::v1::{
25 ExportLogsServiceRequest, ExportLogsServiceResponse,
26};
27use opentelemetry_proto::tonic::collector::metrics::v1::{
28 ExportMetricsServiceRequest, ExportMetricsServiceResponse,
29};
30use opentelemetry_proto::tonic::collector::trace::v1::{
31 ExportTraceServiceRequest, ExportTraceServiceResponse,
32};
33use pipeline::PipelineWay;
34use prost::Message;
35use session::context::{Channel, QueryContext};
36use snafu::prelude::*;
37
38use crate::error::{self, PipelineSnafu, Result};
39use crate::http::extractor::{LogTableName, PipelineInfo, SelectInfoWrapper, TraceTableName};
40use crate::http::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
41use crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED;
42use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineHandler};
43
44#[axum_macros::debug_handler]
45#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "metrics"))]
46pub async fn metrics(
47 State(handler): State<OpenTelemetryProtocolHandlerRef>,
48 Extension(mut query_ctx): Extension<QueryContext>,
49
50 bytes: Bytes,
51) -> Result<OtlpResponse<ExportMetricsServiceResponse>> {
52 let db = query_ctx.get_db_string();
53 query_ctx.set_channel(Channel::Otlp);
54 let query_ctx = Arc::new(query_ctx);
55 let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED
56 .with_label_values(&[db.as_str()])
57 .start_timer();
58 let request =
59 ExportMetricsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
60
61 handler
62 .metrics(request, query_ctx)
63 .await
64 .map(|o| OtlpResponse {
65 resp_body: ExportMetricsServiceResponse {
66 partial_success: None,
67 },
68 write_cost: o.meta.cost,
69 })
70}
71
72#[axum_macros::debug_handler]
73#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))]
74pub async fn traces(
75 State(handler): State<OpenTelemetryProtocolHandlerRef>,
76 TraceTableName(table_name): TraceTableName,
77 pipeline_info: PipelineInfo,
78 Extension(mut query_ctx): Extension<QueryContext>,
79 bytes: Bytes,
80) -> Result<OtlpResponse<ExportTraceServiceResponse>> {
81 let db = query_ctx.get_db_string();
82 let table_name = table_name.unwrap_or_else(|| TRACE_TABLE_NAME.to_string());
83
84 query_ctx.set_channel(Channel::Otlp);
85 query_ctx.set_extension(TRACE_TABLE_NAME_SESSION_KEY, &table_name);
86
87 let query_ctx = Arc::new(query_ctx);
88 let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED
89 .with_label_values(&[db.as_str()])
90 .start_timer();
91 let request =
92 ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
93
94 let pipeline = PipelineWay::from_name_and_default(
95 pipeline_info.pipeline_name.as_deref(),
96 pipeline_info.pipeline_version.as_deref(),
97 None,
98 )
99 .context(PipelineSnafu)?;
100
101 let pipeline_params = pipeline_info.pipeline_params;
102
103 let pipeline_handler: Arc<dyn PipelineHandler + Send + Sync> = handler.clone();
106
107 handler
108 .traces(
109 pipeline_handler,
110 request,
111 pipeline,
112 pipeline_params,
113 table_name,
114 query_ctx,
115 )
116 .await
117 .map(|o| OtlpResponse {
118 resp_body: ExportTraceServiceResponse {
119 partial_success: None,
120 },
121 write_cost: o.meta.cost,
122 })
123}
124
125#[axum_macros::debug_handler]
126#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "logs"))]
127pub async fn logs(
128 State(handler): State<OpenTelemetryProtocolHandlerRef>,
129 Extension(mut query_ctx): Extension<QueryContext>,
130 pipeline_info: PipelineInfo,
131 LogTableName(tablename): LogTableName,
132 SelectInfoWrapper(select_info): SelectInfoWrapper,
133 bytes: Bytes,
134) -> Result<OtlpResponse<ExportLogsServiceResponse>> {
135 let tablename = tablename.unwrap_or_else(|| "opentelemetry_logs".to_string());
136 let db = query_ctx.get_db_string();
137 query_ctx.set_channel(Channel::Otlp);
138 let query_ctx = Arc::new(query_ctx);
139 let _timer = METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED
140 .with_label_values(&[db.as_str()])
141 .start_timer();
142 let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
143
144 let pipeline = PipelineWay::from_name_and_default(
145 pipeline_info.pipeline_name.as_deref(),
146 pipeline_info.pipeline_version.as_deref(),
147 Some(PipelineWay::OtlpLogDirect(Box::new(select_info))),
148 )
149 .context(PipelineSnafu)?;
150 let pipeline_params = pipeline_info.pipeline_params;
151
152 let pipeline_handler: Arc<dyn PipelineHandler + Send + Sync> = handler.clone();
155 handler
156 .logs(
157 pipeline_handler,
158 request,
159 pipeline,
160 pipeline_params,
161 tablename,
162 query_ctx,
163 )
164 .await
165 .map(|o| OtlpResponse {
166 resp_body: ExportLogsServiceResponse {
167 partial_success: None,
168 },
169 write_cost: o.meta.cost,
170 })
171}
172
173pub struct OtlpResponse<T: Message> {
174 resp_body: T,
175 write_cost: usize,
176}
177
178impl<T: Message> IntoResponse for OtlpResponse<T> {
179 fn into_response(self) -> axum::response::Response {
180 let mut header_map = write_cost_header_map(self.write_cost);
181 header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone());
182
183 (header_map, self.resp_body.encode_to_vec()).into_response()
184 }
185}