servers/query_handler/
grpc.rs1use std::sync::Arc;
16
17use api::v1::greptime_request::Request;
18use arrow_flight::FlightData;
19use async_trait::async_trait;
20use common_base::AffectedRows;
21use common_error::ext::{BoxedError, ErrorExt};
22use common_grpc::flight::FlightDecoder;
23use common_query::Output;
24use session::context::QueryContextRef;
25use snafu::ResultExt;
26use table::metadata::TableId;
27use table::table_name::TableName;
28
29use crate::error::{self, Result};
30
31pub type GrpcQueryHandlerRef<E> = Arc<dyn GrpcQueryHandler<Error = E> + Send + Sync>;
32pub type ServerGrpcQueryHandlerRef = GrpcQueryHandlerRef<error::Error>;
33
34pub type RawRecordBatch = bytes::Bytes;
35
36#[async_trait]
37pub trait GrpcQueryHandler {
38 type Error: ErrorExt;
39
40 async fn do_query(
41 &self,
42 query: Request,
43 ctx: QueryContextRef,
44 ) -> std::result::Result<Output, Self::Error>;
45
46 async fn put_record_batch(
47 &self,
48 table: &TableName,
49 table_id: &mut Option<TableId>,
50 decoder: &mut FlightDecoder,
51 flight_data: FlightData,
52 ) -> std::result::Result<AffectedRows, Self::Error>;
53}
54
55pub struct ServerGrpcQueryHandlerAdapter<E>(GrpcQueryHandlerRef<E>);
56
57impl<E> ServerGrpcQueryHandlerAdapter<E> {
58 pub fn arc(handler: GrpcQueryHandlerRef<E>) -> Arc<Self> {
59 Arc::new(Self(handler))
60 }
61}
62
63#[async_trait]
64impl<E> GrpcQueryHandler for ServerGrpcQueryHandlerAdapter<E>
65where
66 E: ErrorExt + Send + Sync + 'static,
67{
68 type Error = error::Error;
69
70 async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result<Output> {
71 self.0
72 .do_query(query, ctx)
73 .await
74 .map_err(BoxedError::new)
75 .context(error::ExecuteGrpcQuerySnafu)
76 }
77
78 async fn put_record_batch(
79 &self,
80 table: &TableName,
81 table_id: &mut Option<TableId>,
82 decoder: &mut FlightDecoder,
83 data: FlightData,
84 ) -> Result<AffectedRows> {
85 self.0
86 .put_record_batch(table, table_id, decoder, data)
87 .await
88 .map_err(BoxedError::new)
89 .context(error::ExecuteGrpcRequestSnafu)
90 }
91}