frontend/instance/
otlp.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
19use client::Output;
20use common_error::ext::BoxedError;
21use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
22use common_telemetry::tracing;
23use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
24use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
25use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
26use pipeline::{GreptimePipelineParams, PipelineWay};
27use servers::error::{self, AuthSnafu, Result as ServerResult};
28use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
29use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
30use servers::otlp;
31use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
32use session::context::QueryContextRef;
33use snafu::ResultExt;
34use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
35
36use crate::instance::Instance;
37use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
38
39#[async_trait]
40impl OpenTelemetryProtocolHandler for Instance {
41 #[tracing::instrument(skip_all)]
42 async fn metrics(
43 &self,
44 request: ExportMetricsServiceRequest,
45 ctx: QueryContextRef,
46 ) -> ServerResult<Output> {
47 self.plugins
48 .get::<PermissionCheckerRef>()
49 .as_ref()
50 .check_permission(ctx.current_user(), PermissionReq::Otlp)
51 .context(AuthSnafu)?;
52
53 let interceptor_ref = self
54 .plugins
55 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
56 interceptor_ref.pre_execute(ctx.clone())?;
57
58 let input_names = request
59 .resource_metrics
60 .iter()
61 .flat_map(|r| r.scope_metrics.iter())
62 .flat_map(|s| s.metrics.iter().map(|m| &m.name))
63 .collect::<Vec<_>>();
64
65 let is_legacy = self.check_otlp_legacy(&input_names, ctx.clone()).await?;
67
68 let mut metric_ctx = ctx
69 .protocol_ctx()
70 .get_otlp_metric_ctx()
71 .cloned()
72 .unwrap_or_default();
73 metric_ctx.is_legacy = is_legacy;
74
75 let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?;
76 OTLP_METRICS_ROWS.inc_by(rows as u64);
77
78 let ctx = if !is_legacy {
79 let mut c = (*ctx).clone();
80 c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string());
81 Arc::new(c)
82 } else {
83 ctx
84 };
85
86 if metric_ctx.is_legacy || !metric_ctx.with_metric_engine {
88 self.handle_row_inserts(requests, ctx, false, false)
89 .await
90 .map_err(BoxedError::new)
91 .context(error::ExecuteGrpcQuerySnafu)
92 } else {
93 let physical_table = ctx
94 .extension(PHYSICAL_TABLE_PARAM)
95 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
96 .to_string();
97 self.handle_metric_row_inserts(requests, ctx, physical_table.clone())
98 .await
99 .map_err(BoxedError::new)
100 .context(error::ExecuteGrpcQuerySnafu)
101 }
102 }
103
104 #[tracing::instrument(skip_all)]
105 async fn traces(
106 &self,
107 pipeline_handler: PipelineHandlerRef,
108 request: ExportTraceServiceRequest,
109 pipeline: PipelineWay,
110 pipeline_params: GreptimePipelineParams,
111 table_name: String,
112 ctx: QueryContextRef,
113 ) -> ServerResult<Output> {
114 self.plugins
115 .get::<PermissionCheckerRef>()
116 .as_ref()
117 .check_permission(ctx.current_user(), PermissionReq::Otlp)
118 .context(AuthSnafu)?;
119
120 let interceptor_ref = self
121 .plugins
122 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
123 interceptor_ref.pre_execute(ctx.clone())?;
124
125 let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
126
127 let (requests, rows) = otlp::trace::to_grpc_insert_requests(
128 request,
129 pipeline,
130 pipeline_params,
131 table_name,
132 &ctx,
133 pipeline_handler,
134 )?;
135
136 OTLP_TRACES_ROWS.inc_by(rows as u64);
137
138 if is_trace_v1_model {
139 self.handle_trace_inserts(requests, ctx)
140 .await
141 .map_err(BoxedError::new)
142 .context(error::ExecuteGrpcQuerySnafu)
143 } else {
144 self.handle_log_inserts(requests, ctx)
145 .await
146 .map_err(BoxedError::new)
147 .context(error::ExecuteGrpcQuerySnafu)
148 }
149 }
150
151 #[tracing::instrument(skip_all)]
152 async fn logs(
153 &self,
154 pipeline_handler: PipelineHandlerRef,
155 request: ExportLogsServiceRequest,
156 pipeline: PipelineWay,
157 pipeline_params: GreptimePipelineParams,
158 table_name: String,
159 ctx: QueryContextRef,
160 ) -> ServerResult<Vec<Output>> {
161 self.plugins
162 .get::<PermissionCheckerRef>()
163 .as_ref()
164 .check_permission(ctx.current_user(), PermissionReq::Otlp)
165 .context(AuthSnafu)?;
166
167 let interceptor_ref = self
168 .plugins
169 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
170 interceptor_ref.pre_execute(ctx.clone())?;
171
172 let opt_req = otlp::logs::to_grpc_insert_requests(
173 request,
174 pipeline,
175 pipeline_params,
176 table_name,
177 &ctx,
178 pipeline_handler,
179 )
180 .await?;
181
182 let mut outputs = vec![];
183
184 for (temp_ctx, requests) in opt_req.as_req_iter(ctx) {
185 let cnt = requests
186 .inserts
187 .iter()
188 .filter_map(|r| r.rows.as_ref().map(|r| r.rows.len()))
189 .sum::<usize>();
190
191 let o = self
192 .handle_log_inserts(requests, temp_ctx)
193 .await
194 .inspect(|_| OTLP_LOGS_ROWS.inc_by(cnt as u64))
195 .map_err(BoxedError::new)
196 .context(error::ExecuteGrpcQuerySnafu)?;
197 outputs.push(o);
198 }
199
200 Ok(outputs)
201 }
202}