frontend/instance/
influxdb.rs1use async_trait::async_trait;
16use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
17use client::Output;
18use common_error::ext::BoxedError;
19use servers::error::{AuthSnafu, Error, OtherSnafu};
20use servers::influxdb::InfluxdbRequest;
21use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
22use servers::query_handler::InfluxdbLineProtocolHandler;
23use session::context::QueryContextRef;
24use snafu::ResultExt;
25
26use crate::instance::Instance;
27
28#[async_trait]
29impl InfluxdbLineProtocolHandler for Instance {
30 async fn exec(
31 &self,
32 request: InfluxdbRequest,
33 ctx: QueryContextRef,
34 ) -> servers::error::Result<Output> {
35 self.plugins
36 .get::<PermissionCheckerRef>()
37 .as_ref()
38 .check_permission(ctx.current_user(), PermissionReq::LineProtocol)
39 .context(AuthSnafu)?;
40
41 let interceptor_ref = self.plugins.get::<LineProtocolInterceptorRef<Error>>();
42 interceptor_ref.pre_execute(&request.lines, ctx.clone())?;
43
44 let requests = request.try_into()?;
45 let requests = interceptor_ref
46 .post_lines_conversion(requests, ctx.clone())
47 .await?;
48
49 let _guard = if let Some(limiter) = &self.limiter {
50 Some(
51 limiter
52 .limit_row_inserts(&requests)
53 .await
54 .map_err(BoxedError::new)
55 .context(OtherSnafu)?,
56 )
57 } else {
58 None
59 };
60
61 self.handle_influx_row_inserts(requests, ctx)
62 .await
63 .map_err(BoxedError::new)
64 .context(servers::error::ExecuteGrpcQuerySnafu)
65 }
66}