servers/query_handler/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::greptime_request::Request;
18use arrow_flight::FlightData;
19use async_trait::async_trait;
20use common_base::AffectedRows;
21use common_error::ext::{BoxedError, ErrorExt};
22use common_grpc::flight::FlightDecoder;
23use common_query::Output;
24use session::context::QueryContextRef;
25use snafu::ResultExt;
26use table::metadata::TableId;
27use table::table_name::TableName;
28
29use crate::error::{self, Result};
30
31pub type GrpcQueryHandlerRef<E> = Arc<dyn GrpcQueryHandler<Error = E> + Send + Sync>;
32pub type ServerGrpcQueryHandlerRef = GrpcQueryHandlerRef<error::Error>;
33
34pub type RawRecordBatch = bytes::Bytes;
35
36#[async_trait]
37pub trait GrpcQueryHandler {
38    type Error: ErrorExt;
39
40    async fn do_query(
41        &self,
42        query: Request,
43        ctx: QueryContextRef,
44    ) -> std::result::Result<Output, Self::Error>;
45
46    async fn put_record_batch(
47        &self,
48        table: &TableName,
49        table_id: &mut Option<TableId>,
50        decoder: &mut FlightDecoder,
51        flight_data: FlightData,
52    ) -> std::result::Result<AffectedRows, Self::Error>;
53}
54
55pub struct ServerGrpcQueryHandlerAdapter<E>(GrpcQueryHandlerRef<E>);
56
57impl<E> ServerGrpcQueryHandlerAdapter<E> {
58    pub fn arc(handler: GrpcQueryHandlerRef<E>) -> Arc<Self> {
59        Arc::new(Self(handler))
60    }
61}
62
63#[async_trait]
64impl<E> GrpcQueryHandler for ServerGrpcQueryHandlerAdapter<E>
65where
66    E: ErrorExt + Send + Sync + 'static,
67{
68    type Error = error::Error;
69
70    async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result<Output> {
71        self.0
72            .do_query(query, ctx)
73            .await
74            .map_err(BoxedError::new)
75            .context(error::ExecuteGrpcQuerySnafu)
76    }
77
78    async fn put_record_batch(
79        &self,
80        table: &TableName,
81        table_id: &mut Option<TableId>,
82        decoder: &mut FlightDecoder,
83        data: FlightData,
84    ) -> Result<AffectedRows> {
85        self.0
86            .put_record_batch(table, table_id, decoder, data)
87            .await
88            .map_err(BoxedError::new)
89            .context(error::ExecuteGrpcRequestSnafu)
90    }
91}