frontend/instance/
influxdb.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
17use client::Output;
18use common_error::ext::BoxedError;
19use servers::error::{AuthSnafu, Error, OtherSnafu};
20use servers::influxdb::InfluxdbRequest;
21use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
22use servers::query_handler::InfluxdbLineProtocolHandler;
23use session::context::QueryContextRef;
24use snafu::ResultExt;
25
26use crate::instance::Instance;
27
28#[async_trait]
29impl InfluxdbLineProtocolHandler for Instance {
30    async fn exec(
31        &self,
32        request: InfluxdbRequest,
33        ctx: QueryContextRef,
34    ) -> servers::error::Result<Output> {
35        self.plugins
36            .get::<PermissionCheckerRef>()
37            .as_ref()
38            .check_permission(ctx.current_user(), PermissionReq::LineProtocol)
39            .context(AuthSnafu)?;
40
41        let interceptor_ref = self.plugins.get::<LineProtocolInterceptorRef<Error>>();
42        interceptor_ref.pre_execute(&request.lines, ctx.clone())?;
43
44        let requests = request.try_into()?;
45        let requests = interceptor_ref
46            .post_lines_conversion(requests, ctx.clone())
47            .await?;
48
49        let _guard = if let Some(limiter) = &self.limiter {
50            Some(
51                limiter
52                    .limit_row_inserts(&requests)
53                    .await
54                    .map_err(BoxedError::new)
55                    .context(OtherSnafu)?,
56            )
57        } else {
58            None
59        };
60
61        self.handle_influx_row_inserts(requests, ctx)
62            .await
63            .map_err(BoxedError::new)
64            .context(servers::error::ExecuteGrpcQuerySnafu)
65    }
66}