1use auth::UserProviderRef;
16use common_error::ext::ErrorExt;
17use common_error::status_code::status_to_tonic_code;
18use common_telemetry::error;
19use futures::SinkExt;
20use otel_arrow_rust::Consumer;
21use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsService;
22use otel_arrow_rust::proto::opentelemetry::arrow::v1::{BatchArrowRecords, BatchStatus};
23use tonic::metadata::{Entry, MetadataValue};
24use tonic::service::Interceptor;
25use tonic::{Request, Response, Status, Streaming};
26
27use crate::error;
28use crate::grpc::context_auth;
29use crate::query_handler::OpenTelemetryProtocolHandlerRef;
30
31pub struct OtelArrowServiceHandler<T> {
32 handler: T,
33 user_provider: Option<UserProviderRef>,
34}
35
36impl<T> OtelArrowServiceHandler<T> {
37 pub fn new(handler: T, user_provider: Option<UserProviderRef>) -> Self {
38 Self {
39 handler,
40 user_provider,
41 }
42 }
43}
44
45#[async_trait::async_trait]
46impl ArrowMetricsService for OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef> {
47 type ArrowMetricsStream = futures::channel::mpsc::Receiver<Result<BatchStatus, Status>>;
48 async fn arrow_metrics(
49 &self,
50 request: Request<Streaming<BatchArrowRecords>>,
51 ) -> Result<Response<Self::ArrowMetricsStream>, Status> {
52 let (mut sender, receiver) = futures::channel::mpsc::channel(100);
53
54 let (headers, _, mut incoming_requests) = request.into_parts();
55
56 let query_ctx = context_auth::create_query_context_from_grpc_metadata(&headers)?;
57 context_auth::check_auth(self.user_provider.clone(), &headers, query_ctx.clone()).await?;
58
59 let handler = self.handler.clone();
60
61 common_runtime::spawn_global(async move {
63 let mut consumer = Consumer::default();
64 while let Some(batch_res) = incoming_requests.message().await.transpose() {
65 let mut batch = match batch_res {
66 Ok(batch) => batch,
67 Err(e) => {
68 error!(
69 "Failed to receive batch from otel-arrow client, error: {}",
70 e
71 );
72 let _ = sender.send(Err(e)).await;
73 return;
74 }
75 };
76 let batch_status = BatchStatus {
77 batch_id: batch.batch_id,
78 status_code: 0,
79 status_message: Default::default(),
80 };
81 let request = match consumer.consume_metrics_batches(&mut batch).map_err(|e| {
82 error::HandleOtelArrowRequestSnafu {
83 err_msg: e.to_string(),
84 }
85 .build()
86 }) {
87 Ok(request) => request,
88 Err(e) => {
89 let _ = sender
90 .send(Err(Status::new(
91 status_to_tonic_code(e.status_code()),
92 e.to_string(),
93 )))
94 .await;
95 error!(e;
96 "Failed to consume batch from otel-arrow client"
97 );
98 return;
99 }
100 };
101 if let Err(e) = handler.metrics(request, query_ctx.clone()).await {
103 let _ = sender
104 .send(Err(Status::new(
105 status_to_tonic_code(e.status_code()),
106 e.to_string(),
107 )))
108 .await;
109 error!(e; "Failed to ingest metrics from otel-arrow");
110 return;
111 }
112 let _ = sender.send(Ok(batch_status)).await;
113 }
114 });
115 Ok(Response::new(receiver))
116 }
117}
118
119#[derive(Clone)]
121pub struct HeaderInterceptor;
122
123impl Interceptor for HeaderInterceptor {
124 fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
125 if let Ok(Entry::Occupied(mut e)) = request.metadata_mut().entry("grpc-encoding") {
126 if e.get().as_bytes().starts_with(b"zstdarrow") {
128 e.insert(MetadataValue::from_static("zstd"));
129 }
130 }
131 Ok(request)
132 }
133}