frontend/instance/
otlp.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
19use client::Output;
20use common_error::ext::BoxedError;
21use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
22use common_telemetry::tracing;
23use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
24use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
25use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
26use pipeline::{GreptimePipelineParams, PipelineWay};
27use servers::error::{self, AuthSnafu, OtherSnafu, Result as ServerResult};
28use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
29use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
30use servers::otlp;
31use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
32use session::context::QueryContextRef;
33use snafu::ResultExt;
34use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
35
36use crate::instance::Instance;
37use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
38
39#[async_trait]
40impl OpenTelemetryProtocolHandler for Instance {
41    #[tracing::instrument(skip_all)]
42    async fn metrics(
43        &self,
44        request: ExportMetricsServiceRequest,
45        ctx: QueryContextRef,
46    ) -> ServerResult<Output> {
47        self.plugins
48            .get::<PermissionCheckerRef>()
49            .as_ref()
50            .check_permission(ctx.current_user(), PermissionReq::Otlp)
51            .context(AuthSnafu)?;
52
53        let interceptor_ref = self
54            .plugins
55            .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
56        interceptor_ref.pre_execute(ctx.clone())?;
57
58        let input_names = request
59            .resource_metrics
60            .iter()
61            .flat_map(|r| r.scope_metrics.iter())
62            .flat_map(|s| s.metrics.iter().map(|m| &m.name))
63            .collect::<Vec<_>>();
64
65        // See [`OtlpMetricCtx`] for details
66        let is_legacy = self.check_otlp_legacy(&input_names, ctx.clone()).await?;
67
68        let mut metric_ctx = ctx
69            .protocol_ctx()
70            .get_otlp_metric_ctx()
71            .cloned()
72            .unwrap_or_default();
73        metric_ctx.is_legacy = is_legacy;
74
75        let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?;
76        OTLP_METRICS_ROWS.inc_by(rows as u64);
77
78        let ctx = if !is_legacy {
79            let mut c = (*ctx).clone();
80            c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string());
81            Arc::new(c)
82        } else {
83            ctx
84        };
85
86        let _guard = if let Some(limiter) = &self.limiter {
87            Some(
88                limiter
89                    .limit_row_inserts(&requests)
90                    .await
91                    .map_err(BoxedError::new)
92                    .context(OtherSnafu)?,
93            )
94        } else {
95            None
96        };
97
98        // If the user uses the legacy path, it is by default without metric engine.
99        if metric_ctx.is_legacy || !metric_ctx.with_metric_engine {
100            self.handle_row_inserts(requests, ctx, false, false)
101                .await
102                .map_err(BoxedError::new)
103                .context(error::ExecuteGrpcQuerySnafu)
104        } else {
105            let physical_table = ctx
106                .extension(PHYSICAL_TABLE_PARAM)
107                .unwrap_or(GREPTIME_PHYSICAL_TABLE)
108                .to_string();
109            self.handle_metric_row_inserts(requests, ctx, physical_table.to_string())
110                .await
111                .map_err(BoxedError::new)
112                .context(error::ExecuteGrpcQuerySnafu)
113        }
114    }
115
116    #[tracing::instrument(skip_all)]
117    async fn traces(
118        &self,
119        pipeline_handler: PipelineHandlerRef,
120        request: ExportTraceServiceRequest,
121        pipeline: PipelineWay,
122        pipeline_params: GreptimePipelineParams,
123        table_name: String,
124        ctx: QueryContextRef,
125    ) -> ServerResult<Output> {
126        self.plugins
127            .get::<PermissionCheckerRef>()
128            .as_ref()
129            .check_permission(ctx.current_user(), PermissionReq::Otlp)
130            .context(AuthSnafu)?;
131
132        let interceptor_ref = self
133            .plugins
134            .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
135        interceptor_ref.pre_execute(ctx.clone())?;
136
137        let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
138
139        let (requests, rows) = otlp::trace::to_grpc_insert_requests(
140            request,
141            pipeline,
142            pipeline_params,
143            table_name,
144            &ctx,
145            pipeline_handler,
146        )?;
147
148        OTLP_TRACES_ROWS.inc_by(rows as u64);
149
150        if is_trace_v1_model {
151            self.handle_trace_inserts(requests, ctx)
152                .await
153                .map_err(BoxedError::new)
154                .context(error::ExecuteGrpcQuerySnafu)
155        } else {
156            self.handle_log_inserts(requests, ctx)
157                .await
158                .map_err(BoxedError::new)
159                .context(error::ExecuteGrpcQuerySnafu)
160        }
161    }
162
163    #[tracing::instrument(skip_all)]
164    async fn logs(
165        &self,
166        pipeline_handler: PipelineHandlerRef,
167        request: ExportLogsServiceRequest,
168        pipeline: PipelineWay,
169        pipeline_params: GreptimePipelineParams,
170        table_name: String,
171        ctx: QueryContextRef,
172    ) -> ServerResult<Vec<Output>> {
173        self.plugins
174            .get::<PermissionCheckerRef>()
175            .as_ref()
176            .check_permission(ctx.current_user(), PermissionReq::Otlp)
177            .context(AuthSnafu)?;
178
179        let interceptor_ref = self
180            .plugins
181            .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
182        interceptor_ref.pre_execute(ctx.clone())?;
183
184        let opt_req = otlp::logs::to_grpc_insert_requests(
185            request,
186            pipeline,
187            pipeline_params,
188            table_name,
189            &ctx,
190            pipeline_handler,
191        )
192        .await?;
193
194        let _guard = if let Some(limiter) = &self.limiter {
195            Some(
196                limiter
197                    .limit_ctx_req(&opt_req)
198                    .await
199                    .map_err(BoxedError::new)
200                    .context(OtherSnafu)?,
201            )
202        } else {
203            None
204        };
205
206        let mut outputs = vec![];
207
208        for (temp_ctx, requests) in opt_req.as_req_iter(ctx) {
209            let cnt = requests
210                .inserts
211                .iter()
212                .filter_map(|r| r.rows.as_ref().map(|r| r.rows.len()))
213                .sum::<usize>();
214
215            let o = self
216                .handle_log_inserts(requests, temp_ctx)
217                .await
218                .inspect(|_| OTLP_LOGS_ROWS.inc_by(cnt as u64))
219                .map_err(BoxedError::new)
220                .context(error::ExecuteGrpcQuerySnafu)?;
221            outputs.push(o);
222        }
223
224        Ok(outputs)
225    }
226}