1use common_error::ext::ErrorExt;
16use common_error::status_code::status_to_tonic_code;
17use common_telemetry::error;
18use futures::SinkExt;
19use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsService;
20use otel_arrow_rust::proto::opentelemetry::arrow::v1::{BatchArrowRecords, BatchStatus};
21use otel_arrow_rust::Consumer;
22use session::context::QueryContext;
23use tonic::metadata::{Entry, MetadataValue};
24use tonic::service::Interceptor;
25use tonic::{Request, Response, Status, Streaming};
26
27use crate::error;
28use crate::query_handler::OpenTelemetryProtocolHandlerRef;
29
30pub struct OtelArrowServiceHandler<T>(pub T);
31
32impl<T> OtelArrowServiceHandler<T> {
33 pub fn new(handler: T) -> Self {
34 Self(handler)
35 }
36}
37
38#[async_trait::async_trait]
39impl ArrowMetricsService for OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef> {
40 type ArrowMetricsStream = futures::channel::mpsc::Receiver<Result<BatchStatus, Status>>;
41 async fn arrow_metrics(
42 &self,
43 request: Request<Streaming<BatchArrowRecords>>,
44 ) -> Result<Response<Self::ArrowMetricsStream>, Status> {
45 let (mut sender, receiver) = futures::channel::mpsc::channel(100);
46 let mut incoming_requests = request.into_inner();
47 let handler = self.0.clone();
48 let query_context = QueryContext::arc();
49 common_runtime::spawn_global(async move {
51 let mut consumer = Consumer::default();
52 while let Some(batch_res) = incoming_requests.message().await.transpose() {
53 let mut batch = match batch_res {
54 Ok(batch) => batch,
55 Err(e) => {
56 error!(
57 "Failed to receive batch from otel-arrow client, error: {}",
58 e
59 );
60 let _ = sender.send(Err(e)).await;
61 return;
62 }
63 };
64 let batch_status = BatchStatus {
65 batch_id: batch.batch_id,
66 status_code: 0,
67 status_message: Default::default(),
68 };
69 let request = match consumer.consume_metrics_batches(&mut batch).map_err(|e| {
70 error::HandleOtelArrowRequestSnafu {
71 err_msg: e.to_string(),
72 }
73 .build()
74 }) {
75 Ok(request) => request,
76 Err(e) => {
77 let _ = sender
78 .send(Err(Status::new(
79 status_to_tonic_code(e.status_code()),
80 e.to_string(),
81 )))
82 .await;
83 error!(e;
84 "Failed to consume batch from otel-arrow client"
85 );
86 return;
87 }
88 };
89 if let Err(e) = handler.metrics(request, query_context.clone()).await {
91 let _ = sender
92 .send(Err(Status::new(
93 status_to_tonic_code(e.status_code()),
94 e.to_string(),
95 )))
96 .await;
97 error!(e; "Failed to ingest metrics from otel-arrow");
98 return;
99 }
100 let _ = sender.send(Ok(batch_status)).await;
101 }
102 });
103 Ok(Response::new(receiver))
104 }
105}
106
107#[derive(Clone)]
109pub struct HeaderInterceptor;
110
111impl Interceptor for HeaderInterceptor {
112 fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
113 if let Ok(Entry::Occupied(mut e)) = request.metadata_mut().entry("grpc-encoding") {
114 if e.get().as_bytes().starts_with(b"zstdarrow") {
116 e.insert(MetadataValue::from_static("zstd"));
117 }
118 }
119 Ok(request)
120 }
121}