frontend/instance/
opentsdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
17use common_error::ext::BoxedError;
18use common_telemetry::tracing;
19use servers::error::{self as server_error, AuthSnafu, ExecuteGrpcQuerySnafu, OtherSnafu};
20use servers::opentsdb::codec::DataPoint;
21use servers::opentsdb::data_point_to_grpc_row_insert_requests;
22use servers::query_handler::OpentsdbProtocolHandler;
23use session::context::QueryContextRef;
24use snafu::prelude::*;
25
26use crate::instance::Instance;
27
28#[async_trait]
29impl OpentsdbProtocolHandler for Instance {
30    #[tracing::instrument(skip_all, fields(protocol = "opentsdb"))]
31    async fn exec(
32        &self,
33        data_points: Vec<DataPoint>,
34        ctx: QueryContextRef,
35    ) -> server_error::Result<usize> {
36        self.plugins
37            .get::<PermissionCheckerRef>()
38            .as_ref()
39            .check_permission(ctx.current_user(), PermissionReq::Opentsdb)
40            .context(AuthSnafu)?;
41
42        let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
43
44        let _guard = if let Some(limiter) = &self.limiter {
45            Some(
46                limiter
47                    .limit_row_inserts(&requests)
48                    .await
49                    .map_err(BoxedError::new)
50                    .context(OtherSnafu)?,
51            )
52        } else {
53            None
54        };
55
56        // OpenTSDB is single value.
57        let output = self
58            .handle_row_inserts(requests, ctx, true, true)
59            .await
60            .map_err(BoxedError::new)
61            .context(ExecuteGrpcQuerySnafu)?;
62
63        Ok(match output.data {
64            common_query::OutputData::AffectedRows(rows) => rows,
65            _ => unreachable!(),
66        })
67    }
68}