frontend/instance/
opentsdb.rs1use async_trait::async_trait;
16use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
17use common_error::ext::BoxedError;
18use common_telemetry::tracing;
19use servers::error::{self as server_error, AuthSnafu, ExecuteGrpcQuerySnafu};
20use servers::opentsdb::codec::DataPoint;
21use servers::opentsdb::data_point_to_grpc_row_insert_requests;
22use servers::query_handler::OpentsdbProtocolHandler;
23use session::context::QueryContextRef;
24use snafu::prelude::*;
25
26use crate::instance::Instance;
27
28#[async_trait]
29impl OpentsdbProtocolHandler for Instance {
30 #[tracing::instrument(skip_all, fields(protocol = "opentsdb"))]
31 async fn exec(
32 &self,
33 data_points: Vec<DataPoint>,
34 ctx: QueryContextRef,
35 ) -> server_error::Result<usize> {
36 self.plugins
37 .get::<PermissionCheckerRef>()
38 .as_ref()
39 .check_permission(ctx.current_user(), PermissionReq::Opentsdb)
40 .context(AuthSnafu)?;
41
42 let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
43
44 let output = self
46 .handle_row_inserts(requests, ctx, true, true)
47 .await
48 .map_err(BoxedError::new)
49 .context(ExecuteGrpcQuerySnafu)?;
50
51 Ok(match output.data {
52 common_query::OutputData::AffectedRows(rows) => rows,
53 _ => unreachable!(),
54 })
55 }
56}