frontend/instance/
otlp.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
19use client::Output;
20use common_error::ext::BoxedError;
21use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
22use common_telemetry::tracing;
23use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
24use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
25use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
26use pipeline::{GreptimePipelineParams, PipelineWay};
27use servers::error::{self, AuthSnafu, Result as ServerResult};
28use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
29use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
30use servers::otlp;
31use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
32use session::context::QueryContextRef;
33use snafu::ResultExt;
34use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
35
36use crate::instance::Instance;
37use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
38
39#[async_trait]
40impl OpenTelemetryProtocolHandler for Instance {
41    #[tracing::instrument(skip_all)]
42    async fn metrics(
43        &self,
44        request: ExportMetricsServiceRequest,
45        ctx: QueryContextRef,
46    ) -> ServerResult<Output> {
47        self.plugins
48            .get::<PermissionCheckerRef>()
49            .as_ref()
50            .check_permission(ctx.current_user(), PermissionReq::Otlp)
51            .context(AuthSnafu)?;
52
53        let interceptor_ref = self
54            .plugins
55            .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
56        interceptor_ref.pre_execute(ctx.clone())?;
57
58        let input_names = request
59            .resource_metrics
60            .iter()
61            .flat_map(|r| r.scope_metrics.iter())
62            .flat_map(|s| s.metrics.iter().map(|m| &m.name))
63            .collect::<Vec<_>>();
64
65        // See [`OtlpMetricCtx`] for details
66        let is_legacy = self.check_otlp_legacy(&input_names, ctx.clone()).await?;
67
68        let mut metric_ctx = ctx
69            .protocol_ctx()
70            .get_otlp_metric_ctx()
71            .cloned()
72            .unwrap_or_default();
73        metric_ctx.is_legacy = is_legacy;
74
75        let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?;
76        OTLP_METRICS_ROWS.inc_by(rows as u64);
77
78        let ctx = if !is_legacy {
79            let mut c = (*ctx).clone();
80            c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string());
81            Arc::new(c)
82        } else {
83            ctx
84        };
85
86        // If the user uses the legacy path, it is by default without metric engine.
87        if metric_ctx.is_legacy || !metric_ctx.with_metric_engine {
88            self.handle_row_inserts(requests, ctx, false, false)
89                .await
90                .map_err(BoxedError::new)
91                .context(error::ExecuteGrpcQuerySnafu)
92        } else {
93            let physical_table = ctx
94                .extension(PHYSICAL_TABLE_PARAM)
95                .unwrap_or(GREPTIME_PHYSICAL_TABLE)
96                .to_string();
97            self.handle_metric_row_inserts(requests, ctx, physical_table.clone())
98                .await
99                .map_err(BoxedError::new)
100                .context(error::ExecuteGrpcQuerySnafu)
101        }
102    }
103
104    #[tracing::instrument(skip_all)]
105    async fn traces(
106        &self,
107        pipeline_handler: PipelineHandlerRef,
108        request: ExportTraceServiceRequest,
109        pipeline: PipelineWay,
110        pipeline_params: GreptimePipelineParams,
111        table_name: String,
112        ctx: QueryContextRef,
113    ) -> ServerResult<Output> {
114        self.plugins
115            .get::<PermissionCheckerRef>()
116            .as_ref()
117            .check_permission(ctx.current_user(), PermissionReq::Otlp)
118            .context(AuthSnafu)?;
119
120        let interceptor_ref = self
121            .plugins
122            .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
123        interceptor_ref.pre_execute(ctx.clone())?;
124
125        let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
126
127        let (requests, rows) = otlp::trace::to_grpc_insert_requests(
128            request,
129            pipeline,
130            pipeline_params,
131            table_name,
132            &ctx,
133            pipeline_handler,
134        )?;
135
136        OTLP_TRACES_ROWS.inc_by(rows as u64);
137
138        if is_trace_v1_model {
139            self.handle_trace_inserts(requests, ctx)
140                .await
141                .map_err(BoxedError::new)
142                .context(error::ExecuteGrpcQuerySnafu)
143        } else {
144            self.handle_log_inserts(requests, ctx)
145                .await
146                .map_err(BoxedError::new)
147                .context(error::ExecuteGrpcQuerySnafu)
148        }
149    }
150
151    #[tracing::instrument(skip_all)]
152    async fn logs(
153        &self,
154        pipeline_handler: PipelineHandlerRef,
155        request: ExportLogsServiceRequest,
156        pipeline: PipelineWay,
157        pipeline_params: GreptimePipelineParams,
158        table_name: String,
159        ctx: QueryContextRef,
160    ) -> ServerResult<Vec<Output>> {
161        self.plugins
162            .get::<PermissionCheckerRef>()
163            .as_ref()
164            .check_permission(ctx.current_user(), PermissionReq::Otlp)
165            .context(AuthSnafu)?;
166
167        let interceptor_ref = self
168            .plugins
169            .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
170        interceptor_ref.pre_execute(ctx.clone())?;
171
172        let opt_req = otlp::logs::to_grpc_insert_requests(
173            request,
174            pipeline,
175            pipeline_params,
176            table_name,
177            &ctx,
178            pipeline_handler,
179        )
180        .await?;
181
182        let mut outputs = vec![];
183
184        for (temp_ctx, requests) in opt_req.as_req_iter(ctx) {
185            let cnt = requests
186                .inserts
187                .iter()
188                .filter_map(|r| r.rows.as_ref().map(|r| r.rows.len()))
189                .sum::<usize>();
190
191            let o = self
192                .handle_log_inserts(requests, temp_ctx)
193                .await
194                .inspect(|_| OTLP_LOGS_ROWS.inc_by(cnt as u64))
195                .map_err(BoxedError::new)
196                .context(error::ExecuteGrpcQuerySnafu)?;
197            outputs.push(o);
198        }
199
200        Ok(outputs)
201    }
202}