frontend/instance/
otlp.rs1use async_trait::async_trait;
16use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
17use client::Output;
18use common_error::ext::BoxedError;
19use common_telemetry::tracing;
20use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
21use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
22use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
23use pipeline::{GreptimePipelineParams, PipelineWay};
24use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
25use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
26use servers::otlp;
27use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
28use session::context::QueryContextRef;
29use snafu::ResultExt;
30
31use crate::instance::Instance;
32use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
33
34#[async_trait]
35impl OpenTelemetryProtocolHandler for Instance {
36 #[tracing::instrument(skip_all)]
37 async fn metrics(
38 &self,
39 request: ExportMetricsServiceRequest,
40 ctx: QueryContextRef,
41 ) -> ServerResult<Output> {
42 self.plugins
43 .get::<PermissionCheckerRef>()
44 .as_ref()
45 .check_permission(ctx.current_user(), PermissionReq::Otlp)
46 .context(AuthSnafu)?;
47
48 let interceptor_ref = self
49 .plugins
50 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
51 interceptor_ref.pre_execute(ctx.clone())?;
52
53 let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?;
54 OTLP_METRICS_ROWS.inc_by(rows as u64);
55
56 let _guard = if let Some(limiter) = &self.limiter {
57 let result = limiter.limit_row_inserts(&requests);
58 if result.is_none() {
59 return InFlightWriteBytesExceededSnafu.fail();
60 }
61 result
62 } else {
63 None
64 };
65
66 self.handle_row_inserts(requests, ctx)
67 .await
68 .map_err(BoxedError::new)
69 .context(error::ExecuteGrpcQuerySnafu)
70 }
71
72 #[tracing::instrument(skip_all)]
73 async fn traces(
74 &self,
75 pipeline_handler: PipelineHandlerRef,
76 request: ExportTraceServiceRequest,
77 pipeline: PipelineWay,
78 pipeline_params: GreptimePipelineParams,
79 table_name: String,
80 ctx: QueryContextRef,
81 ) -> ServerResult<Output> {
82 self.plugins
83 .get::<PermissionCheckerRef>()
84 .as_ref()
85 .check_permission(ctx.current_user(), PermissionReq::Otlp)
86 .context(AuthSnafu)?;
87
88 let interceptor_ref = self
89 .plugins
90 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
91 interceptor_ref.pre_execute(ctx.clone())?;
92
93 let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
94
95 let (requests, rows) = otlp::trace::to_grpc_insert_requests(
96 request,
97 pipeline,
98 pipeline_params,
99 table_name,
100 &ctx,
101 pipeline_handler,
102 )?;
103
104 OTLP_TRACES_ROWS.inc_by(rows as u64);
105
106 if is_trace_v1_model {
107 self.handle_trace_inserts(requests, ctx)
108 .await
109 .map_err(BoxedError::new)
110 .context(error::ExecuteGrpcQuerySnafu)
111 } else {
112 self.handle_log_inserts(requests, ctx)
113 .await
114 .map_err(BoxedError::new)
115 .context(error::ExecuteGrpcQuerySnafu)
116 }
117 }
118
119 #[tracing::instrument(skip_all)]
120 async fn logs(
121 &self,
122 pipeline_handler: PipelineHandlerRef,
123 request: ExportLogsServiceRequest,
124 pipeline: PipelineWay,
125 pipeline_params: GreptimePipelineParams,
126 table_name: String,
127 ctx: QueryContextRef,
128 ) -> ServerResult<Output> {
129 self.plugins
130 .get::<PermissionCheckerRef>()
131 .as_ref()
132 .check_permission(ctx.current_user(), PermissionReq::Otlp)
133 .context(AuthSnafu)?;
134
135 let interceptor_ref = self
136 .plugins
137 .get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
138 interceptor_ref.pre_execute(ctx.clone())?;
139
140 let (requests, rows) = otlp::logs::to_grpc_insert_requests(
141 request,
142 pipeline,
143 pipeline_params,
144 table_name,
145 &ctx,
146 pipeline_handler,
147 )
148 .await?;
149
150 let _guard = if let Some(limiter) = &self.limiter {
151 let result = limiter.limit_row_inserts(&requests);
152 if result.is_none() {
153 return InFlightWriteBytesExceededSnafu.fail();
154 }
155 result
156 } else {
157 None
158 };
159
160 self.handle_log_inserts(requests, ctx)
161 .await
162 .inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64))
163 .map_err(BoxedError::new)
164 .context(error::ExecuteGrpcQuerySnafu)
165 }
166}