servers/
otel_arrow.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_error::ext::ErrorExt;
use common_error::status_code::status_to_tonic_code;
use common_telemetry::error;
use futures::SinkExt;
use otel_arrow_rust::opentelemetry::{ArrowMetricsService, BatchArrowRecords, BatchStatus};
use otel_arrow_rust::Consumer;
use session::context::QueryContext;
use tonic::metadata::{Entry, MetadataValue};
use tonic::service::Interceptor;
use tonic::{Request, Response, Status, Streaming};

use crate::error;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;

pub struct OtelArrowServiceHandler<T>(pub T);

impl<T> OtelArrowServiceHandler<T> {
    pub fn new(handler: T) -> Self {
        Self(handler)
    }
}

#[async_trait::async_trait]
impl ArrowMetricsService for OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef> {
    type ArrowMetricsStream = futures::channel::mpsc::Receiver<Result<BatchStatus, Status>>;
    async fn arrow_metrics(
        &self,
        request: Request<Streaming<BatchArrowRecords>>,
    ) -> Result<Response<Self::ArrowMetricsStream>, Status> {
        let (mut sender, receiver) = futures::channel::mpsc::channel(100);
        let mut incoming_requests = request.into_inner();
        let handler = self.0.clone();
        let query_context = QueryContext::arc();
        // handles incoming requests
        common_runtime::spawn_global(async move {
            let mut consumer = Consumer::default();
            while let Some(batch_res) = incoming_requests.message().await.transpose() {
                let mut batch = match batch_res {
                    Ok(batch) => batch,
                    Err(e) => {
                        error!(
                            "Failed to receive batch from otel-arrow client, error: {}",
                            e
                        );
                        let _ = sender.send(Err(e)).await;
                        return;
                    }
                };
                let batch_status = BatchStatus {
                    batch_id: batch.batch_id,
                    status_code: 0,
                    status_message: Default::default(),
                };
                let request = match consumer.consume_batches(&mut batch).map_err(|e| {
                    error::HandleOtelArrowRequestSnafu {
                        err_msg: e.to_string(),
                    }
                    .build()
                }) {
                    Ok(request) => request,
                    Err(e) => {
                        let _ = sender
                            .send(Err(Status::new(
                                status_to_tonic_code(e.status_code()),
                                e.to_string(),
                            )))
                            .await;
                        error!(e;
                            "Failed to consume batch from otel-arrow client"
                        );
                        return;
                    }
                };
                if let Err(e) = handler.metrics(request, query_context.clone()).await {
                    let _ = sender
                        .send(Err(Status::new(
                            status_to_tonic_code(e.status_code()),
                            e.to_string(),
                        )))
                        .await;
                    error!(e; "Failed to ingest metrics from otel-arrow");
                    return;
                }
                let _ = sender.send(Ok(batch_status)).await;
            }
        });
        Ok(Response::new(receiver))
    }
}

/// This serves as a workaround for otel-arrow collector's custom header.
#[derive(Clone)]
pub struct HeaderInterceptor;

impl Interceptor for HeaderInterceptor {
    fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
        if let Ok(Entry::Occupied(mut e)) = request.metadata_mut().entry("grpc-encoding") {
            // This works as a workaround to handle customized compression type (zstdarrow*) in otel-arrow.
            if e.get().as_bytes().starts_with(b"zstdarrow") {
                e.insert(MetadataValue::from_static("zstd"));
            }
        }
        Ok(request)
    }
}