frontend/instance/
opentsdb.rs1use async_trait::async_trait;
16use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
17use common_error::ext::BoxedError;
18use common_telemetry::tracing;
19use servers::error as server_error;
20use servers::error::{AuthSnafu, InFlightWriteBytesExceededSnafu};
21use servers::opentsdb::codec::DataPoint;
22use servers::opentsdb::data_point_to_grpc_row_insert_requests;
23use servers::query_handler::OpentsdbProtocolHandler;
24use session::context::QueryContextRef;
25use snafu::prelude::*;
26
27use crate::instance::Instance;
28
29#[async_trait]
30impl OpentsdbProtocolHandler for Instance {
31 #[tracing::instrument(skip_all, fields(protocol = "opentsdb"))]
32 async fn exec(
33 &self,
34 data_points: Vec<DataPoint>,
35 ctx: QueryContextRef,
36 ) -> server_error::Result<usize> {
37 self.plugins
38 .get::<PermissionCheckerRef>()
39 .as_ref()
40 .check_permission(ctx.current_user(), PermissionReq::Opentsdb)
41 .context(AuthSnafu)?;
42
43 let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
44
45 let _guard = if let Some(limiter) = &self.limiter {
46 let result = limiter.limit_row_inserts(&requests);
47 if result.is_none() {
48 return InFlightWriteBytesExceededSnafu.fail();
49 }
50 result
51 } else {
52 None
53 };
54
55 let output = self
56 .handle_row_inserts(requests, ctx)
57 .await
58 .map_err(BoxedError::new)
59 .context(servers::error::ExecuteGrpcQuerySnafu)?;
60
61 Ok(match output.data {
62 common_query::OutputData::AffectedRows(rows) => rows,
63 _ => unreachable!(),
64 })
65 }
66}