frontend/instance/
influxdb.rs1use async_trait::async_trait;
16use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
17use client::Output;
18use common_error::ext::BoxedError;
19use servers::error::{AuthSnafu, Error, InFlightWriteBytesExceededSnafu};
20use servers::influxdb::InfluxdbRequest;
21use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
22use servers::query_handler::InfluxdbLineProtocolHandler;
23use session::context::QueryContextRef;
24use snafu::ResultExt;
25
26use crate::instance::Instance;
27
28#[async_trait]
29impl InfluxdbLineProtocolHandler for Instance {
30 async fn exec(
31 &self,
32 request: InfluxdbRequest,
33 ctx: QueryContextRef,
34 ) -> servers::error::Result<Output> {
35 self.plugins
36 .get::<PermissionCheckerRef>()
37 .as_ref()
38 .check_permission(ctx.current_user(), PermissionReq::LineProtocol)
39 .context(AuthSnafu)?;
40
41 let interceptor_ref = self.plugins.get::<LineProtocolInterceptorRef<Error>>();
42 interceptor_ref.pre_execute(&request.lines, ctx.clone())?;
43
44 let requests = request.try_into()?;
45 let requests = interceptor_ref
46 .post_lines_conversion(requests, ctx.clone())
47 .await?;
48
49 let _guard = if let Some(limiter) = &self.limiter {
50 let result = limiter.limit_row_inserts(&requests);
51 if result.is_none() {
52 return InFlightWriteBytesExceededSnafu.fail();
53 }
54 result
55 } else {
56 None
57 };
58
59 self.handle_influx_row_inserts(requests, ctx)
60 .await
61 .map_err(BoxedError::new)
62 .context(servers::error::ExecuteGrpcQuerySnafu)
63 }
64}