frontend/instance/
opentsdb.rs1use async_trait::async_trait;
16use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
17use common_error::ext::BoxedError;
18use common_telemetry::tracing;
19use servers::error::{self as server_error, AuthSnafu, ExecuteGrpcQuerySnafu, OtherSnafu};
20use servers::opentsdb::codec::DataPoint;
21use servers::opentsdb::data_point_to_grpc_row_insert_requests;
22use servers::query_handler::OpentsdbProtocolHandler;
23use session::context::QueryContextRef;
24use snafu::prelude::*;
25
26use crate::instance::Instance;
27
28#[async_trait]
29impl OpentsdbProtocolHandler for Instance {
30 #[tracing::instrument(skip_all, fields(protocol = "opentsdb"))]
31 async fn exec(
32 &self,
33 data_points: Vec<DataPoint>,
34 ctx: QueryContextRef,
35 ) -> server_error::Result<usize> {
36 self.plugins
37 .get::<PermissionCheckerRef>()
38 .as_ref()
39 .check_permission(ctx.current_user(), PermissionReq::Opentsdb)
40 .context(AuthSnafu)?;
41
42 let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
43
44 let _guard = if let Some(limiter) = &self.limiter {
45 Some(
46 limiter
47 .limit_row_inserts(&requests)
48 .await
49 .map_err(BoxedError::new)
50 .context(OtherSnafu)?,
51 )
52 } else {
53 None
54 };
55
56 let output = self
58 .handle_row_inserts(requests, ctx, true, true)
59 .await
60 .map_err(BoxedError::new)
61 .context(ExecuteGrpcQuerySnafu)?;
62
63 Ok(match output.data {
64 common_query::OutputData::AffectedRows(rows) => rows,
65 _ => unreachable!(),
66 })
67 }
68}