servers/
otel_arrow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::ErrorExt;
16use common_error::status_code::status_to_tonic_code;
17use common_telemetry::error;
18use futures::SinkExt;
19use otel_arrow_rust::opentelemetry::{ArrowMetricsService, BatchArrowRecords, BatchStatus};
20use otel_arrow_rust::Consumer;
21use session::context::QueryContext;
22use tonic::metadata::{Entry, MetadataValue};
23use tonic::service::Interceptor;
24use tonic::{Request, Response, Status, Streaming};
25
26use crate::error;
27use crate::query_handler::OpenTelemetryProtocolHandlerRef;
28
29pub struct OtelArrowServiceHandler<T>(pub T);
30
31impl<T> OtelArrowServiceHandler<T> {
32    pub fn new(handler: T) -> Self {
33        Self(handler)
34    }
35}
36
37#[async_trait::async_trait]
38impl ArrowMetricsService for OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef> {
39    type ArrowMetricsStream = futures::channel::mpsc::Receiver<Result<BatchStatus, Status>>;
40    async fn arrow_metrics(
41        &self,
42        request: Request<Streaming<BatchArrowRecords>>,
43    ) -> Result<Response<Self::ArrowMetricsStream>, Status> {
44        let (mut sender, receiver) = futures::channel::mpsc::channel(100);
45        let mut incoming_requests = request.into_inner();
46        let handler = self.0.clone();
47        let query_context = QueryContext::arc();
48        // handles incoming requests
49        common_runtime::spawn_global(async move {
50            let mut consumer = Consumer::default();
51            while let Some(batch_res) = incoming_requests.message().await.transpose() {
52                let mut batch = match batch_res {
53                    Ok(batch) => batch,
54                    Err(e) => {
55                        error!(
56                            "Failed to receive batch from otel-arrow client, error: {}",
57                            e
58                        );
59                        let _ = sender.send(Err(e)).await;
60                        return;
61                    }
62                };
63                let batch_status = BatchStatus {
64                    batch_id: batch.batch_id,
65                    status_code: 0,
66                    status_message: Default::default(),
67                };
68                let request = match consumer.consume_batches(&mut batch).map_err(|e| {
69                    error::HandleOtelArrowRequestSnafu {
70                        err_msg: e.to_string(),
71                    }
72                    .build()
73                }) {
74                    Ok(request) => request,
75                    Err(e) => {
76                        let _ = sender
77                            .send(Err(Status::new(
78                                status_to_tonic_code(e.status_code()),
79                                e.to_string(),
80                            )))
81                            .await;
82                        error!(e;
83                            "Failed to consume batch from otel-arrow client"
84                        );
85                        return;
86                    }
87                };
88                if let Err(e) = handler.metrics(request, query_context.clone()).await {
89                    let _ = sender
90                        .send(Err(Status::new(
91                            status_to_tonic_code(e.status_code()),
92                            e.to_string(),
93                        )))
94                        .await;
95                    error!(e; "Failed to ingest metrics from otel-arrow");
96                    return;
97                }
98                let _ = sender.send(Ok(batch_status)).await;
99            }
100        });
101        Ok(Response::new(receiver))
102    }
103}
104
105/// This serves as a workaround for otel-arrow collector's custom header.
106#[derive(Clone)]
107pub struct HeaderInterceptor;
108
109impl Interceptor for HeaderInterceptor {
110    fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
111        if let Ok(Entry::Occupied(mut e)) = request.metadata_mut().entry("grpc-encoding") {
112            // This works as a workaround to handle customized compression type (zstdarrow*) in otel-arrow.
113            if e.get().as_bytes().starts_with(b"zstdarrow") {
114                e.insert(MetadataValue::from_static("zstd"));
115            }
116        }
117        Ok(request)
118    }
119}