servers/
otel_arrow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::ErrorExt;
16use common_error::status_code::status_to_tonic_code;
17use common_telemetry::error;
18use futures::SinkExt;
19use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsService;
20use otel_arrow_rust::proto::opentelemetry::arrow::v1::{BatchArrowRecords, BatchStatus};
21use otel_arrow_rust::Consumer;
22use session::context::QueryContext;
23use tonic::metadata::{Entry, MetadataValue};
24use tonic::service::Interceptor;
25use tonic::{Request, Response, Status, Streaming};
26
27use crate::error;
28use crate::query_handler::OpenTelemetryProtocolHandlerRef;
29
30pub struct OtelArrowServiceHandler<T>(pub T);
31
32impl<T> OtelArrowServiceHandler<T> {
33    pub fn new(handler: T) -> Self {
34        Self(handler)
35    }
36}
37
38#[async_trait::async_trait]
39impl ArrowMetricsService for OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef> {
40    type ArrowMetricsStream = futures::channel::mpsc::Receiver<Result<BatchStatus, Status>>;
41    async fn arrow_metrics(
42        &self,
43        request: Request<Streaming<BatchArrowRecords>>,
44    ) -> Result<Response<Self::ArrowMetricsStream>, Status> {
45        let (mut sender, receiver) = futures::channel::mpsc::channel(100);
46        let mut incoming_requests = request.into_inner();
47        let handler = self.0.clone();
48        let query_context = QueryContext::arc();
49        // handles incoming requests
50        common_runtime::spawn_global(async move {
51            let mut consumer = Consumer::default();
52            while let Some(batch_res) = incoming_requests.message().await.transpose() {
53                let mut batch = match batch_res {
54                    Ok(batch) => batch,
55                    Err(e) => {
56                        error!(
57                            "Failed to receive batch from otel-arrow client, error: {}",
58                            e
59                        );
60                        let _ = sender.send(Err(e)).await;
61                        return;
62                    }
63                };
64                let batch_status = BatchStatus {
65                    batch_id: batch.batch_id,
66                    status_code: 0,
67                    status_message: Default::default(),
68                };
69                let request = match consumer.consume_metrics_batches(&mut batch).map_err(|e| {
70                    error::HandleOtelArrowRequestSnafu {
71                        err_msg: e.to_string(),
72                    }
73                    .build()
74                }) {
75                    Ok(request) => request,
76                    Err(e) => {
77                        let _ = sender
78                            .send(Err(Status::new(
79                                status_to_tonic_code(e.status_code()),
80                                e.to_string(),
81                            )))
82                            .await;
83                        error!(e;
84                            "Failed to consume batch from otel-arrow client"
85                        );
86                        return;
87                    }
88                };
89                // use metric engine by default
90                if let Err(e) = handler.metrics(request, query_context.clone()).await {
91                    let _ = sender
92                        .send(Err(Status::new(
93                            status_to_tonic_code(e.status_code()),
94                            e.to_string(),
95                        )))
96                        .await;
97                    error!(e; "Failed to ingest metrics from otel-arrow");
98                    return;
99                }
100                let _ = sender.send(Ok(batch_status)).await;
101            }
102        });
103        Ok(Response::new(receiver))
104    }
105}
106
107/// This serves as a workaround for otel-arrow collector's custom header.
108#[derive(Clone)]
109pub struct HeaderInterceptor;
110
111impl Interceptor for HeaderInterceptor {
112    fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
113        if let Ok(Entry::Occupied(mut e)) = request.metadata_mut().entry("grpc-encoding") {
114            // This works as a workaround to handle customized compression type (zstdarrow*) in otel-arrow.
115            if e.get().as_bytes().starts_with(b"zstdarrow") {
116                e.insert(MetadataValue::from_static("zstd"));
117            }
118        }
119        Ok(request)
120    }
121}