servers/http/
otlp.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use axum::extract::State;
18use axum::http::header;
19use axum::response::IntoResponse;
20use axum::Extension;
21use bytes::Bytes;
22use common_catalog::consts::{TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY};
23use common_telemetry::tracing;
24use opentelemetry_proto::tonic::collector::logs::v1::{
25    ExportLogsServiceRequest, ExportLogsServiceResponse,
26};
27use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceResponse;
28use opentelemetry_proto::tonic::collector::trace::v1::{
29    ExportTraceServiceRequest, ExportTraceServiceResponse,
30};
31use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
32use pipeline::PipelineWay;
33use prost::Message;
34use session::context::{Channel, QueryContext};
35use session::protocol_ctx::{MetricType, OtlpMetricCtx, ProtocolCtx};
36use snafu::prelude::*;
37
38use crate::error::{self, PipelineSnafu, Result};
39use crate::http::extractor::{
40    LogTableName, OtlpMetricOptions, PipelineInfo, SelectInfoWrapper, TraceTableName,
41};
42// use crate::http::header::constants::GREPTIME_METRICS_LEGACY_MODE_HEADER_NAME;
43use crate::http::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
44use crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED;
45use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineHandler};
46
47#[derive(Clone)]
48pub struct OtlpState {
49    pub with_metric_engine: bool,
50    pub handler: OpenTelemetryProtocolHandlerRef,
51}
52
53#[axum_macros::debug_handler]
54#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "metrics"))]
55pub async fn metrics(
56    State(state): State<OtlpState>,
57    Extension(mut query_ctx): Extension<QueryContext>,
58    http_opts: OtlpMetricOptions,
59    bytes: Bytes,
60) -> Result<OtlpResponse<ExportMetricsServiceResponse>> {
61    let db = query_ctx.get_db_string();
62    query_ctx.set_channel(Channel::Otlp);
63
64    let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED
65        .with_label_values(&[db.as_str()])
66        .start_timer();
67    let request =
68        ExportMetricsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
69
70    let OtlpState {
71        with_metric_engine,
72        handler,
73    } = state;
74
75    query_ctx.set_protocol_ctx(ProtocolCtx::OtlpMetric(OtlpMetricCtx {
76        promote_all_resource_attrs: http_opts.promote_all_resource_attrs,
77        resource_attrs: http_opts.resource_attrs,
78        promote_scope_attrs: http_opts.promote_scope_attrs,
79        with_metric_engine,
80        // set is_legacy later
81        is_legacy: false,
82        metric_type: MetricType::Init,
83    }));
84    let query_ctx = Arc::new(query_ctx);
85
86    handler
87        .metrics(request, query_ctx)
88        .await
89        .map(|o| OtlpResponse {
90            resp_body: ExportMetricsServiceResponse {
91                partial_success: None,
92            },
93            write_cost: o.meta.cost,
94        })
95}
96
97#[axum_macros::debug_handler]
98#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))]
99pub async fn traces(
100    State(state): State<OtlpState>,
101    TraceTableName(table_name): TraceTableName,
102    pipeline_info: PipelineInfo,
103    Extension(mut query_ctx): Extension<QueryContext>,
104    bytes: Bytes,
105) -> Result<OtlpResponse<ExportTraceServiceResponse>> {
106    let db = query_ctx.get_db_string();
107    let table_name = table_name.unwrap_or_else(|| TRACE_TABLE_NAME.to_string());
108
109    query_ctx.set_channel(Channel::Otlp);
110    query_ctx.set_extension(TRACE_TABLE_NAME_SESSION_KEY, &table_name);
111
112    let query_ctx = Arc::new(query_ctx);
113    let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED
114        .with_label_values(&[db.as_str()])
115        .start_timer();
116    let request =
117        ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
118
119    let pipeline = PipelineWay::from_name_and_default(
120        pipeline_info.pipeline_name.as_deref(),
121        pipeline_info.pipeline_version.as_deref(),
122        None,
123    )
124    .context(PipelineSnafu)?;
125
126    let pipeline_params = pipeline_info.pipeline_params;
127
128    let OtlpState { handler, .. } = state;
129
130    // here we use nightly feature `trait_upcasting` to convert handler to
131    // pipeline_handler
132    let pipeline_handler: Arc<dyn PipelineHandler + Send + Sync> = handler.clone();
133
134    handler
135        .traces(
136            pipeline_handler,
137            request,
138            pipeline,
139            pipeline_params,
140            table_name,
141            query_ctx,
142        )
143        .await
144        .map(|o| OtlpResponse {
145            resp_body: ExportTraceServiceResponse {
146                partial_success: None,
147            },
148            write_cost: o.meta.cost,
149        })
150}
151
152#[axum_macros::debug_handler]
153#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "logs"))]
154pub async fn logs(
155    State(state): State<OtlpState>,
156    Extension(mut query_ctx): Extension<QueryContext>,
157    pipeline_info: PipelineInfo,
158    LogTableName(tablename): LogTableName,
159    SelectInfoWrapper(select_info): SelectInfoWrapper,
160    bytes: Bytes,
161) -> Result<OtlpResponse<ExportLogsServiceResponse>> {
162    let tablename = tablename.unwrap_or_else(|| "opentelemetry_logs".to_string());
163    let db = query_ctx.get_db_string();
164    query_ctx.set_channel(Channel::Otlp);
165    let query_ctx = Arc::new(query_ctx);
166    let _timer = METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED
167        .with_label_values(&[db.as_str()])
168        .start_timer();
169    let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
170
171    let pipeline = PipelineWay::from_name_and_default(
172        pipeline_info.pipeline_name.as_deref(),
173        pipeline_info.pipeline_version.as_deref(),
174        Some(PipelineWay::OtlpLogDirect(Box::new(select_info))),
175    )
176    .context(PipelineSnafu)?;
177    let pipeline_params = pipeline_info.pipeline_params;
178
179    let OtlpState { handler, .. } = state;
180
181    // here we use nightly feature `trait_upcasting` to convert handler to
182    // pipeline_handler
183    let pipeline_handler: Arc<dyn PipelineHandler + Send + Sync> = handler.clone();
184    handler
185        .logs(
186            pipeline_handler,
187            request,
188            pipeline,
189            pipeline_params,
190            tablename,
191            query_ctx,
192        )
193        .await
194        .map(|o| OtlpResponse {
195            resp_body: ExportLogsServiceResponse {
196                partial_success: None,
197            },
198            write_cost: o.iter().map(|o| o.meta.cost).sum(),
199        })
200}
201
202pub struct OtlpResponse<T: Message> {
203    resp_body: T,
204    write_cost: usize,
205}
206
207impl<T: Message> IntoResponse for OtlpResponse<T> {
208    fn into_response(self) -> axum::response::Response {
209        let mut header_map = write_cost_header_map(self.write_cost);
210        header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone());
211
212        (header_map, self.resp_body.encode_to_vec()).into_response()
213    }
214}