servers/query_handler/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17
18use api::v1::greptime_request::Request;
19use async_trait::async_trait;
20use common_base::AffectedRows;
21use common_error::ext::{BoxedError, ErrorExt};
22use common_grpc::flight::do_put::DoPutResponse;
23use common_query::Output;
24use futures::Stream;
25use session::context::QueryContextRef;
26use snafu::ResultExt;
27use table::TableRef;
28
29use crate::error::{self, Result};
30use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
31
32pub type GrpcQueryHandlerRef<E> = Arc<dyn GrpcQueryHandler<Error = E> + Send + Sync>;
33pub type ServerGrpcQueryHandlerRef = GrpcQueryHandlerRef<error::Error>;
34
35pub type RawRecordBatch = bytes::Bytes;
36
37#[async_trait]
38pub trait GrpcQueryHandler {
39    type Error: ErrorExt;
40
41    async fn do_query(
42        &self,
43        query: Request,
44        ctx: QueryContextRef,
45    ) -> std::result::Result<Output, Self::Error>;
46
47    async fn put_record_batch(
48        &self,
49        request: PutRecordBatchRequest,
50        table_ref: &mut Option<TableRef>,
51        ctx: QueryContextRef,
52    ) -> std::result::Result<AffectedRows, Self::Error>;
53
54    fn handle_put_record_batch_stream(
55        &self,
56        stream: PutRecordBatchRequestStream,
57        ctx: QueryContextRef,
58    ) -> Pin<Box<dyn Stream<Item = std::result::Result<DoPutResponse, Self::Error>> + Send>>;
59}
60
61pub struct ServerGrpcQueryHandlerAdapter<E>(GrpcQueryHandlerRef<E>);
62
63impl<E> ServerGrpcQueryHandlerAdapter<E> {
64    pub fn arc(handler: GrpcQueryHandlerRef<E>) -> Arc<Self> {
65        Arc::new(Self(handler))
66    }
67}
68
69#[async_trait]
70impl<E> GrpcQueryHandler for ServerGrpcQueryHandlerAdapter<E>
71where
72    E: ErrorExt + Send + Sync + 'static,
73{
74    type Error = error::Error;
75
76    async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result<Output> {
77        self.0
78            .do_query(query, ctx)
79            .await
80            .map_err(BoxedError::new)
81            .context(error::ExecuteGrpcQuerySnafu)
82    }
83
84    async fn put_record_batch(
85        &self,
86        request: PutRecordBatchRequest,
87        table_ref: &mut Option<TableRef>,
88        ctx: QueryContextRef,
89    ) -> Result<AffectedRows> {
90        self.0
91            .put_record_batch(request, table_ref, ctx)
92            .await
93            .map_err(BoxedError::new)
94            .context(error::ExecuteGrpcRequestSnafu)
95    }
96
97    fn handle_put_record_batch_stream(
98        &self,
99        stream: PutRecordBatchRequestStream,
100        ctx: QueryContextRef,
101    ) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
102        use futures_util::StreamExt;
103        Box::pin(
104            self.0
105                .handle_put_record_batch_stream(stream, ctx)
106                .map(|result| {
107                    result
108                        .map_err(|e| BoxedError::new(e))
109                        .context(error::ExecuteGrpcRequestSnafu)
110                }),
111        )
112    }
113}