1use common_error::ext::ErrorExt;
16use common_error::status_code::status_to_tonic_code;
17use common_telemetry::error;
18use futures::SinkExt;
19use otel_arrow_rust::opentelemetry::{ArrowMetricsService, BatchArrowRecords, BatchStatus};
20use otel_arrow_rust::Consumer;
21use session::context::QueryContext;
22use tonic::metadata::{Entry, MetadataValue};
23use tonic::service::Interceptor;
24use tonic::{Request, Response, Status, Streaming};
25
26use crate::error;
27use crate::query_handler::OpenTelemetryProtocolHandlerRef;
28
29pub struct OtelArrowServiceHandler<T>(pub T);
30
31impl<T> OtelArrowServiceHandler<T> {
32 pub fn new(handler: T) -> Self {
33 Self(handler)
34 }
35}
36
37#[async_trait::async_trait]
38impl ArrowMetricsService for OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef> {
39 type ArrowMetricsStream = futures::channel::mpsc::Receiver<Result<BatchStatus, Status>>;
40 async fn arrow_metrics(
41 &self,
42 request: Request<Streaming<BatchArrowRecords>>,
43 ) -> Result<Response<Self::ArrowMetricsStream>, Status> {
44 let (mut sender, receiver) = futures::channel::mpsc::channel(100);
45 let mut incoming_requests = request.into_inner();
46 let handler = self.0.clone();
47 let query_context = QueryContext::arc();
48 common_runtime::spawn_global(async move {
50 let mut consumer = Consumer::default();
51 while let Some(batch_res) = incoming_requests.message().await.transpose() {
52 let mut batch = match batch_res {
53 Ok(batch) => batch,
54 Err(e) => {
55 error!(
56 "Failed to receive batch from otel-arrow client, error: {}",
57 e
58 );
59 let _ = sender.send(Err(e)).await;
60 return;
61 }
62 };
63 let batch_status = BatchStatus {
64 batch_id: batch.batch_id,
65 status_code: 0,
66 status_message: Default::default(),
67 };
68 let request = match consumer.consume_batches(&mut batch).map_err(|e| {
69 error::HandleOtelArrowRequestSnafu {
70 err_msg: e.to_string(),
71 }
72 .build()
73 }) {
74 Ok(request) => request,
75 Err(e) => {
76 let _ = sender
77 .send(Err(Status::new(
78 status_to_tonic_code(e.status_code()),
79 e.to_string(),
80 )))
81 .await;
82 error!(e;
83 "Failed to consume batch from otel-arrow client"
84 );
85 return;
86 }
87 };
88 if let Err(e) = handler.metrics(request, query_context.clone()).await {
89 let _ = sender
90 .send(Err(Status::new(
91 status_to_tonic_code(e.status_code()),
92 e.to_string(),
93 )))
94 .await;
95 error!(e; "Failed to ingest metrics from otel-arrow");
96 return;
97 }
98 let _ = sender.send(Ok(batch_status)).await;
99 }
100 });
101 Ok(Response::new(receiver))
102 }
103}
104
105#[derive(Clone)]
107pub struct HeaderInterceptor;
108
109impl Interceptor for HeaderInterceptor {
110 fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
111 if let Ok(Entry::Occupied(mut e)) = request.metadata_mut().entry("grpc-encoding") {
112 if e.get().as_bytes().starts_with(b"zstdarrow") {
114 e.insert(MetadataValue::from_static("zstd"));
115 }
116 }
117 Ok(request)
118 }
119}