servers/http/
otlp.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use axum::extract::State;
18use axum::http::header;
19use axum::response::IntoResponse;
20use axum::Extension;
21use bytes::Bytes;
22use common_catalog::consts::{TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY};
23use common_telemetry::tracing;
24use opentelemetry_proto::tonic::collector::logs::v1::{
25    ExportLogsServiceRequest, ExportLogsServiceResponse,
26};
27use opentelemetry_proto::tonic::collector::metrics::v1::{
28    ExportMetricsServiceRequest, ExportMetricsServiceResponse,
29};
30use opentelemetry_proto::tonic::collector::trace::v1::{
31    ExportTraceServiceRequest, ExportTraceServiceResponse,
32};
33use pipeline::PipelineWay;
34use prost::Message;
35use session::context::{Channel, QueryContext};
36use snafu::prelude::*;
37
38use crate::error::{self, PipelineSnafu, Result};
39use crate::http::extractor::{LogTableName, PipelineInfo, SelectInfoWrapper, TraceTableName};
40use crate::http::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
41use crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED;
42use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineHandler};
43
44#[axum_macros::debug_handler]
45#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "metrics"))]
46pub async fn metrics(
47    State(handler): State<OpenTelemetryProtocolHandlerRef>,
48    Extension(mut query_ctx): Extension<QueryContext>,
49
50    bytes: Bytes,
51) -> Result<OtlpResponse<ExportMetricsServiceResponse>> {
52    let db = query_ctx.get_db_string();
53    query_ctx.set_channel(Channel::Otlp);
54    let query_ctx = Arc::new(query_ctx);
55    let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED
56        .with_label_values(&[db.as_str()])
57        .start_timer();
58    let request =
59        ExportMetricsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
60
61    handler
62        .metrics(request, query_ctx)
63        .await
64        .map(|o| OtlpResponse {
65            resp_body: ExportMetricsServiceResponse {
66                partial_success: None,
67            },
68            write_cost: o.meta.cost,
69        })
70}
71
72#[axum_macros::debug_handler]
73#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))]
74pub async fn traces(
75    State(handler): State<OpenTelemetryProtocolHandlerRef>,
76    TraceTableName(table_name): TraceTableName,
77    pipeline_info: PipelineInfo,
78    Extension(mut query_ctx): Extension<QueryContext>,
79    bytes: Bytes,
80) -> Result<OtlpResponse<ExportTraceServiceResponse>> {
81    let db = query_ctx.get_db_string();
82    let table_name = table_name.unwrap_or_else(|| TRACE_TABLE_NAME.to_string());
83
84    query_ctx.set_channel(Channel::Otlp);
85    query_ctx.set_extension(TRACE_TABLE_NAME_SESSION_KEY, &table_name);
86
87    let query_ctx = Arc::new(query_ctx);
88    let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED
89        .with_label_values(&[db.as_str()])
90        .start_timer();
91    let request =
92        ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
93
94    let pipeline = PipelineWay::from_name_and_default(
95        pipeline_info.pipeline_name.as_deref(),
96        pipeline_info.pipeline_version.as_deref(),
97        None,
98    )
99    .context(PipelineSnafu)?;
100
101    let pipeline_params = pipeline_info.pipeline_params;
102
103    // here we use nightly feature `trait_upcasting` to convert handler to
104    // pipeline_handler
105    let pipeline_handler: Arc<dyn PipelineHandler + Send + Sync> = handler.clone();
106
107    handler
108        .traces(
109            pipeline_handler,
110            request,
111            pipeline,
112            pipeline_params,
113            table_name,
114            query_ctx,
115        )
116        .await
117        .map(|o| OtlpResponse {
118            resp_body: ExportTraceServiceResponse {
119                partial_success: None,
120            },
121            write_cost: o.meta.cost,
122        })
123}
124
125#[axum_macros::debug_handler]
126#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "logs"))]
127pub async fn logs(
128    State(handler): State<OpenTelemetryProtocolHandlerRef>,
129    Extension(mut query_ctx): Extension<QueryContext>,
130    pipeline_info: PipelineInfo,
131    LogTableName(tablename): LogTableName,
132    SelectInfoWrapper(select_info): SelectInfoWrapper,
133    bytes: Bytes,
134) -> Result<OtlpResponse<ExportLogsServiceResponse>> {
135    let tablename = tablename.unwrap_or_else(|| "opentelemetry_logs".to_string());
136    let db = query_ctx.get_db_string();
137    query_ctx.set_channel(Channel::Otlp);
138    let query_ctx = Arc::new(query_ctx);
139    let _timer = METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED
140        .with_label_values(&[db.as_str()])
141        .start_timer();
142    let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
143
144    let pipeline = PipelineWay::from_name_and_default(
145        pipeline_info.pipeline_name.as_deref(),
146        pipeline_info.pipeline_version.as_deref(),
147        Some(PipelineWay::OtlpLogDirect(Box::new(select_info))),
148    )
149    .context(PipelineSnafu)?;
150    let pipeline_params = pipeline_info.pipeline_params;
151
152    // here we use nightly feature `trait_upcasting` to convert handler to
153    // pipeline_handler
154    let pipeline_handler: Arc<dyn PipelineHandler + Send + Sync> = handler.clone();
155    handler
156        .logs(
157            pipeline_handler,
158            request,
159            pipeline,
160            pipeline_params,
161            tablename,
162            query_ctx,
163        )
164        .await
165        .map(|o| OtlpResponse {
166            resp_body: ExportLogsServiceResponse {
167                partial_success: None,
168            },
169            write_cost: o.meta.cost,
170        })
171}
172
173pub struct OtlpResponse<T: Message> {
174    resp_body: T,
175    write_cost: usize,
176}
177
178impl<T: Message> IntoResponse for OtlpResponse<T> {
179    fn into_response(self) -> axum::response::Response {
180        let mut header_map = write_cost_header_map(self.write_cost);
181        header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone());
182
183        (header_map, self.resp_body.encode_to_vec()).into_response()
184    }
185}