frontend/instance/
opentsdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
17use common_error::ext::BoxedError;
18use common_telemetry::tracing;
19use servers::error as server_error;
20use servers::error::{AuthSnafu, InFlightWriteBytesExceededSnafu};
21use servers::opentsdb::codec::DataPoint;
22use servers::opentsdb::data_point_to_grpc_row_insert_requests;
23use servers::query_handler::OpentsdbProtocolHandler;
24use session::context::QueryContextRef;
25use snafu::prelude::*;
26
27use crate::instance::Instance;
28
29#[async_trait]
30impl OpentsdbProtocolHandler for Instance {
31    #[tracing::instrument(skip_all, fields(protocol = "opentsdb"))]
32    async fn exec(
33        &self,
34        data_points: Vec<DataPoint>,
35        ctx: QueryContextRef,
36    ) -> server_error::Result<usize> {
37        self.plugins
38            .get::<PermissionCheckerRef>()
39            .as_ref()
40            .check_permission(ctx.current_user(), PermissionReq::Opentsdb)
41            .context(AuthSnafu)?;
42
43        let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
44
45        let _guard = if let Some(limiter) = &self.limiter {
46            let result = limiter.limit_row_inserts(&requests);
47            if result.is_none() {
48                return InFlightWriteBytesExceededSnafu.fail();
49            }
50            result
51        } else {
52            None
53        };
54
55        let output = self
56            .handle_row_inserts(requests, ctx)
57            .await
58            .map_err(BoxedError::new)
59            .context(servers::error::ExecuteGrpcQuerySnafu)?;
60
61        Ok(match output.data {
62            common_query::OutputData::AffectedRows(rows) => rows,
63            _ => unreachable!(),
64        })
65    }
66}