servers/query_handler/
grpc.rs1use std::pin::Pin;
16use std::sync::Arc;
17
18use api::v1::greptime_request::Request;
19use async_trait::async_trait;
20use common_base::AffectedRows;
21use common_error::ext::{BoxedError, ErrorExt};
22use common_grpc::flight::do_put::DoPutResponse;
23use common_query::Output;
24use futures::Stream;
25use session::context::QueryContextRef;
26use snafu::ResultExt;
27use table::TableRef;
28
29use crate::error::{self, Result};
30use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
31
32pub type GrpcQueryHandlerRef<E> = Arc<dyn GrpcQueryHandler<Error = E> + Send + Sync>;
33pub type ServerGrpcQueryHandlerRef = GrpcQueryHandlerRef<error::Error>;
34
35pub type RawRecordBatch = bytes::Bytes;
36
37#[async_trait]
38pub trait GrpcQueryHandler {
39 type Error: ErrorExt;
40
41 async fn do_query(
42 &self,
43 query: Request,
44 ctx: QueryContextRef,
45 ) -> std::result::Result<Output, Self::Error>;
46
47 async fn put_record_batch(
48 &self,
49 request: PutRecordBatchRequest,
50 table_ref: &mut Option<TableRef>,
51 ctx: QueryContextRef,
52 ) -> std::result::Result<AffectedRows, Self::Error>;
53
54 fn handle_put_record_batch_stream(
55 &self,
56 stream: PutRecordBatchRequestStream,
57 ctx: QueryContextRef,
58 ) -> Pin<Box<dyn Stream<Item = std::result::Result<DoPutResponse, Self::Error>> + Send>>;
59}
60
61pub struct ServerGrpcQueryHandlerAdapter<E>(GrpcQueryHandlerRef<E>);
62
63impl<E> ServerGrpcQueryHandlerAdapter<E> {
64 pub fn arc(handler: GrpcQueryHandlerRef<E>) -> Arc<Self> {
65 Arc::new(Self(handler))
66 }
67}
68
69#[async_trait]
70impl<E> GrpcQueryHandler for ServerGrpcQueryHandlerAdapter<E>
71where
72 E: ErrorExt + Send + Sync + 'static,
73{
74 type Error = error::Error;
75
76 async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result<Output> {
77 self.0
78 .do_query(query, ctx)
79 .await
80 .map_err(BoxedError::new)
81 .context(error::ExecuteGrpcQuerySnafu)
82 }
83
84 async fn put_record_batch(
85 &self,
86 request: PutRecordBatchRequest,
87 table_ref: &mut Option<TableRef>,
88 ctx: QueryContextRef,
89 ) -> Result<AffectedRows> {
90 self.0
91 .put_record_batch(request, table_ref, ctx)
92 .await
93 .map_err(BoxedError::new)
94 .context(error::ExecuteGrpcRequestSnafu)
95 }
96
97 fn handle_put_record_batch_stream(
98 &self,
99 stream: PutRecordBatchRequestStream,
100 ctx: QueryContextRef,
101 ) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
102 use futures_util::StreamExt;
103 Box::pin(
104 self.0
105 .handle_put_record_batch_stream(stream, ctx)
106 .map(|result| {
107 result
108 .map_err(|e| BoxedError::new(e))
109 .context(error::ExecuteGrpcRequestSnafu)
110 }),
111 )
112 }
113}