frontend/instance/
otlp.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
19use client::Output;
20use common_error::ext::BoxedError;
21use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
22use common_telemetry::tracing;
23use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
24use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
25use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
26use pipeline::{GreptimePipelineParams, PipelineWay};
27use servers::error::{self, AuthSnafu, OtherSnafu, Result as ServerResult};
28use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
29use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
30use servers::otlp;
31use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
32use session::context::QueryContextRef;
33use snafu::ResultExt;
34use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
35
36use crate::instance::Instance;
37use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
38
39#[async_trait]
40impl OpenTelemetryProtocolHandler for Instance {
41 #[tracing::instrument(skip_all)]
42 async fn metrics(
43 &self,
44 request: ExportMetricsServiceRequest,
45 ctx: QueryContextRef,
46 ) -> ServerResult<Output> {
47 self.plugins
48 .get::<PermissionCheckerRef>()
49 .as_ref()
50 .check_permission(ctx.current_user(), PermissionReq::Otlp)
51 .context(AuthSnafu)?;
52
53 let interceptor_ref = self
54 .plugins
55 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
56 interceptor_ref.pre_execute(ctx.clone())?;
57
58 let input_names = request
59 .resource_metrics
60 .iter()
61 .flat_map(|r| r.scope_metrics.iter())
62 .flat_map(|s| s.metrics.iter().map(|m| &m.name))
63 .collect::<Vec<_>>();
64
65 let is_legacy = self.check_otlp_legacy(&input_names, ctx.clone()).await?;
67
68 let mut metric_ctx = ctx
69 .protocol_ctx()
70 .get_otlp_metric_ctx()
71 .cloned()
72 .unwrap_or_default();
73 metric_ctx.is_legacy = is_legacy;
74
75 let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?;
76 OTLP_METRICS_ROWS.inc_by(rows as u64);
77
78 let ctx = if !is_legacy {
79 let mut c = (*ctx).clone();
80 c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string());
81 Arc::new(c)
82 } else {
83 ctx
84 };
85
86 let _guard = if let Some(limiter) = &self.limiter {
87 Some(
88 limiter
89 .limit_row_inserts(&requests)
90 .await
91 .map_err(BoxedError::new)
92 .context(OtherSnafu)?,
93 )
94 } else {
95 None
96 };
97
98 if metric_ctx.is_legacy || !metric_ctx.with_metric_engine {
100 self.handle_row_inserts(requests, ctx, false, false)
101 .await
102 .map_err(BoxedError::new)
103 .context(error::ExecuteGrpcQuerySnafu)
104 } else {
105 let physical_table = ctx
106 .extension(PHYSICAL_TABLE_PARAM)
107 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
108 .to_string();
109 self.handle_metric_row_inserts(requests, ctx, physical_table.to_string())
110 .await
111 .map_err(BoxedError::new)
112 .context(error::ExecuteGrpcQuerySnafu)
113 }
114 }
115
116 #[tracing::instrument(skip_all)]
117 async fn traces(
118 &self,
119 pipeline_handler: PipelineHandlerRef,
120 request: ExportTraceServiceRequest,
121 pipeline: PipelineWay,
122 pipeline_params: GreptimePipelineParams,
123 table_name: String,
124 ctx: QueryContextRef,
125 ) -> ServerResult<Output> {
126 self.plugins
127 .get::<PermissionCheckerRef>()
128 .as_ref()
129 .check_permission(ctx.current_user(), PermissionReq::Otlp)
130 .context(AuthSnafu)?;
131
132 let interceptor_ref = self
133 .plugins
134 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
135 interceptor_ref.pre_execute(ctx.clone())?;
136
137 let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
138
139 let (requests, rows) = otlp::trace::to_grpc_insert_requests(
140 request,
141 pipeline,
142 pipeline_params,
143 table_name,
144 &ctx,
145 pipeline_handler,
146 )?;
147
148 OTLP_TRACES_ROWS.inc_by(rows as u64);
149
150 if is_trace_v1_model {
151 self.handle_trace_inserts(requests, ctx)
152 .await
153 .map_err(BoxedError::new)
154 .context(error::ExecuteGrpcQuerySnafu)
155 } else {
156 self.handle_log_inserts(requests, ctx)
157 .await
158 .map_err(BoxedError::new)
159 .context(error::ExecuteGrpcQuerySnafu)
160 }
161 }
162
163 #[tracing::instrument(skip_all)]
164 async fn logs(
165 &self,
166 pipeline_handler: PipelineHandlerRef,
167 request: ExportLogsServiceRequest,
168 pipeline: PipelineWay,
169 pipeline_params: GreptimePipelineParams,
170 table_name: String,
171 ctx: QueryContextRef,
172 ) -> ServerResult<Vec<Output>> {
173 self.plugins
174 .get::<PermissionCheckerRef>()
175 .as_ref()
176 .check_permission(ctx.current_user(), PermissionReq::Otlp)
177 .context(AuthSnafu)?;
178
179 let interceptor_ref = self
180 .plugins
181 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
182 interceptor_ref.pre_execute(ctx.clone())?;
183
184 let opt_req = otlp::logs::to_grpc_insert_requests(
185 request,
186 pipeline,
187 pipeline_params,
188 table_name,
189 &ctx,
190 pipeline_handler,
191 )
192 .await?;
193
194 let _guard = if let Some(limiter) = &self.limiter {
195 Some(
196 limiter
197 .limit_ctx_req(&opt_req)
198 .await
199 .map_err(BoxedError::new)
200 .context(OtherSnafu)?,
201 )
202 } else {
203 None
204 };
205
206 let mut outputs = vec![];
207
208 for (temp_ctx, requests) in opt_req.as_req_iter(ctx) {
209 let cnt = requests
210 .inserts
211 .iter()
212 .filter_map(|r| r.rows.as_ref().map(|r| r.rows.len()))
213 .sum::<usize>();
214
215 let o = self
216 .handle_log_inserts(requests, temp_ctx)
217 .await
218 .inspect(|_| OTLP_LOGS_ROWS.inc_by(cnt as u64))
219 .map_err(BoxedError::new)
220 .context(error::ExecuteGrpcQuerySnafu)?;
221 outputs.push(o);
222 }
223
224 Ok(outputs)
225 }
226}