frontend/instance/
influxdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
17use client::Output;
18use common_error::ext::BoxedError;
19use servers::error::{AuthSnafu, Error, InFlightWriteBytesExceededSnafu};
20use servers::influxdb::InfluxdbRequest;
21use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
22use servers::query_handler::InfluxdbLineProtocolHandler;
23use session::context::QueryContextRef;
24use snafu::ResultExt;
25
26use crate::instance::Instance;
27
28#[async_trait]
29impl InfluxdbLineProtocolHandler for Instance {
30    async fn exec(
31        &self,
32        request: InfluxdbRequest,
33        ctx: QueryContextRef,
34    ) -> servers::error::Result<Output> {
35        self.plugins
36            .get::<PermissionCheckerRef>()
37            .as_ref()
38            .check_permission(ctx.current_user(), PermissionReq::LineProtocol)
39            .context(AuthSnafu)?;
40
41        let interceptor_ref = self.plugins.get::<LineProtocolInterceptorRef<Error>>();
42        interceptor_ref.pre_execute(&request.lines, ctx.clone())?;
43
44        let requests = request.try_into()?;
45        let requests = interceptor_ref
46            .post_lines_conversion(requests, ctx.clone())
47            .await?;
48
49        let _guard = if let Some(limiter) = &self.limiter {
50            let result = limiter.limit_row_inserts(&requests);
51            if result.is_none() {
52                return InFlightWriteBytesExceededSnafu.fail();
53            }
54            result
55        } else {
56            None
57        };
58
59        self.handle_influx_row_inserts(requests, ctx)
60            .await
61            .map_err(BoxedError::new)
62            .context(servers::error::ExecuteGrpcQuerySnafu)
63    }
64}