frontend/instance/
otlp.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
17use client::Output;
18use common_error::ext::BoxedError;
19use common_telemetry::tracing;
20use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
21use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
22use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
23use pipeline::{GreptimePipelineParams, PipelineWay};
24use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
25use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
26use servers::otlp;
27use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
28use session::context::QueryContextRef;
29use snafu::ResultExt;
30
31use crate::instance::Instance;
32use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
33
34#[async_trait]
35impl OpenTelemetryProtocolHandler for Instance {
36    #[tracing::instrument(skip_all)]
37    async fn metrics(
38        &self,
39        request: ExportMetricsServiceRequest,
40        ctx: QueryContextRef,
41    ) -> ServerResult<Output> {
42        self.plugins
43            .get::<PermissionCheckerRef>()
44            .as_ref()
45            .check_permission(ctx.current_user(), PermissionReq::Otlp)
46            .context(AuthSnafu)?;
47
48        let interceptor_ref = self
49            .plugins
50            .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
51        interceptor_ref.pre_execute(ctx.clone())?;
52
53        let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?;
54        OTLP_METRICS_ROWS.inc_by(rows as u64);
55
56        let _guard = if let Some(limiter) = &self.limiter {
57            let result = limiter.limit_row_inserts(&requests);
58            if result.is_none() {
59                return InFlightWriteBytesExceededSnafu.fail();
60            }
61            result
62        } else {
63            None
64        };
65
66        self.handle_row_inserts(requests, ctx)
67            .await
68            .map_err(BoxedError::new)
69            .context(error::ExecuteGrpcQuerySnafu)
70    }
71
72    #[tracing::instrument(skip_all)]
73    async fn traces(
74        &self,
75        pipeline_handler: PipelineHandlerRef,
76        request: ExportTraceServiceRequest,
77        pipeline: PipelineWay,
78        pipeline_params: GreptimePipelineParams,
79        table_name: String,
80        ctx: QueryContextRef,
81    ) -> ServerResult<Output> {
82        self.plugins
83            .get::<PermissionCheckerRef>()
84            .as_ref()
85            .check_permission(ctx.current_user(), PermissionReq::Otlp)
86            .context(AuthSnafu)?;
87
88        let interceptor_ref = self
89            .plugins
90            .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
91        interceptor_ref.pre_execute(ctx.clone())?;
92
93        let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
94
95        let (requests, rows) = otlp::trace::to_grpc_insert_requests(
96            request,
97            pipeline,
98            pipeline_params,
99            table_name,
100            &ctx,
101            pipeline_handler,
102        )?;
103
104        OTLP_TRACES_ROWS.inc_by(rows as u64);
105
106        if is_trace_v1_model {
107            self.handle_trace_inserts(requests, ctx)
108                .await
109                .map_err(BoxedError::new)
110                .context(error::ExecuteGrpcQuerySnafu)
111        } else {
112            self.handle_log_inserts(requests, ctx)
113                .await
114                .map_err(BoxedError::new)
115                .context(error::ExecuteGrpcQuerySnafu)
116        }
117    }
118
119    #[tracing::instrument(skip_all)]
120    async fn logs(
121        &self,
122        pipeline_handler: PipelineHandlerRef,
123        request: ExportLogsServiceRequest,
124        pipeline: PipelineWay,
125        pipeline_params: GreptimePipelineParams,
126        table_name: String,
127        ctx: QueryContextRef,
128    ) -> ServerResult<Output> {
129        self.plugins
130            .get::<PermissionCheckerRef>()
131            .as_ref()
132            .check_permission(ctx.current_user(), PermissionReq::Otlp)
133            .context(AuthSnafu)?;
134
135        let interceptor_ref = self
136            .plugins
137            .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
138        interceptor_ref.pre_execute(ctx.clone())?;
139
140        let (requests, rows) = otlp::logs::to_grpc_insert_requests(
141            request,
142            pipeline,
143            pipeline_params,
144            table_name,
145            &ctx,
146            pipeline_handler,
147        )
148        .await?;
149
150        let _guard = if let Some(limiter) = &self.limiter {
151            let result = limiter.limit_row_inserts(&requests);
152            if result.is_none() {
153                return InFlightWriteBytesExceededSnafu.fail();
154            }
155            result
156        } else {
157            None
158        };
159
160        self.handle_log_inserts(requests, ctx)
161            .await
162            .inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64))
163            .map_err(BoxedError::new)
164            .context(error::ExecuteGrpcQuerySnafu)
165    }
166}