servers/
otel_arrow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use auth::UserProviderRef;
16use common_error::ext::ErrorExt;
17use common_error::status_code::status_to_tonic_code;
18use common_telemetry::error;
19use futures::SinkExt;
20use otel_arrow_rust::Consumer;
21use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsService;
22use otel_arrow_rust::proto::opentelemetry::arrow::v1::{BatchArrowRecords, BatchStatus};
23use tonic::metadata::{Entry, MetadataValue};
24use tonic::service::Interceptor;
25use tonic::{Request, Response, Status, Streaming};
26
27use crate::error;
28use crate::grpc::context_auth;
29use crate::query_handler::OpenTelemetryProtocolHandlerRef;
30
31pub struct OtelArrowServiceHandler<T> {
32    handler: T,
33    user_provider: Option<UserProviderRef>,
34}
35
36impl<T> OtelArrowServiceHandler<T> {
37    pub fn new(handler: T, user_provider: Option<UserProviderRef>) -> Self {
38        Self {
39            handler,
40            user_provider,
41        }
42    }
43}
44
45#[async_trait::async_trait]
46impl ArrowMetricsService for OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef> {
47    type ArrowMetricsStream = futures::channel::mpsc::Receiver<Result<BatchStatus, Status>>;
48    async fn arrow_metrics(
49        &self,
50        request: Request<Streaming<BatchArrowRecords>>,
51    ) -> Result<Response<Self::ArrowMetricsStream>, Status> {
52        let (mut sender, receiver) = futures::channel::mpsc::channel(100);
53
54        let (headers, _, mut incoming_requests) = request.into_parts();
55
56        let query_ctx = context_auth::create_query_context_from_grpc_metadata(&headers)?;
57        context_auth::check_auth(self.user_provider.clone(), &headers, query_ctx.clone()).await?;
58
59        let handler = self.handler.clone();
60
61        // handles incoming requests
62        common_runtime::spawn_global(async move {
63            let mut consumer = Consumer::default();
64            while let Some(batch_res) = incoming_requests.message().await.transpose() {
65                let mut batch = match batch_res {
66                    Ok(batch) => batch,
67                    Err(e) => {
68                        error!(
69                            "Failed to receive batch from otel-arrow client, error: {}",
70                            e
71                        );
72                        let _ = sender.send(Err(e)).await;
73                        return;
74                    }
75                };
76                let batch_status = BatchStatus {
77                    batch_id: batch.batch_id,
78                    status_code: 0,
79                    status_message: Default::default(),
80                };
81                let request = match consumer.consume_metrics_batches(&mut batch).map_err(|e| {
82                    error::HandleOtelArrowRequestSnafu {
83                        err_msg: e.to_string(),
84                    }
85                    .build()
86                }) {
87                    Ok(request) => request,
88                    Err(e) => {
89                        let _ = sender
90                            .send(Err(Status::new(
91                                status_to_tonic_code(e.status_code()),
92                                e.to_string(),
93                            )))
94                            .await;
95                        error!(e;
96                            "Failed to consume batch from otel-arrow client"
97                        );
98                        return;
99                    }
100                };
101                // use metric engine by default
102                if let Err(e) = handler.metrics(request, query_ctx.clone()).await {
103                    let _ = sender
104                        .send(Err(Status::new(
105                            status_to_tonic_code(e.status_code()),
106                            e.to_string(),
107                        )))
108                        .await;
109                    error!(e; "Failed to ingest metrics from otel-arrow");
110                    return;
111                }
112                let _ = sender.send(Ok(batch_status)).await;
113            }
114        });
115        Ok(Response::new(receiver))
116    }
117}
118
119/// This serves as a workaround for otel-arrow collector's custom header.
120#[derive(Clone)]
121pub struct HeaderInterceptor;
122
123impl Interceptor for HeaderInterceptor {
124    fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
125        if let Ok(Entry::Occupied(mut e)) = request.metadata_mut().entry("grpc-encoding") {
126            // This works as a workaround to handle customized compression type (zstdarrow*) in otel-arrow.
127            if e.get().as_bytes().starts_with(b"zstdarrow") {
128                e.insert(MetadataValue::from_static("zstd"));
129            }
130        }
131        Ok(request)
132    }
133}