servers/grpc/
database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::greptime_database_server::GreptimeDatabase;
16use api::v1::greptime_response::Response as RawResponse;
17use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
18use async_trait::async_trait;
19use common_error::status_code::StatusCode;
20use common_query::OutputData;
21use common_telemetry::{debug, warn};
22use futures::StreamExt;
23use tonic::{Request, Response, Status, Streaming};
24
25use crate::grpc::greptime_handler::GreptimeRequestHandler;
26use crate::grpc::{cancellation, TonicResult};
27use crate::hint_headers;
28
29pub(crate) struct DatabaseService {
30    handler: GreptimeRequestHandler,
31}
32
33impl DatabaseService {
34    pub(crate) fn new(handler: GreptimeRequestHandler) -> Self {
35        Self { handler }
36    }
37}
38
39#[async_trait]
40impl GreptimeDatabase for DatabaseService {
41    async fn handle(
42        &self,
43        request: Request<GreptimeRequest>,
44    ) -> TonicResult<Response<GreptimeResponse>> {
45        let remote_addr = request.remote_addr();
46        let hints = hint_headers::extract_hints(request.metadata());
47        debug!(
48            "GreptimeDatabase::Handle: request from {:?} with hints: {:?}",
49            remote_addr, hints
50        );
51        let handler = self.handler.clone();
52        let request_future = async move {
53            let request = request.into_inner();
54            let output = handler.handle_request(request, hints).await?;
55            let message = match output.data {
56                OutputData::AffectedRows(rows) => GreptimeResponse {
57                    header: Some(ResponseHeader {
58                        status: Some(api::v1::Status {
59                            status_code: StatusCode::Success as _,
60                            ..Default::default()
61                        }),
62                    }),
63                    response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
64                },
65                OutputData::Stream(_) | OutputData::RecordBatches(_) => {
66                    return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
67                }
68            };
69
70            Ok(Response::new(message))
71        };
72
73        let cancellation_future = async move {
74            warn!(
75                "GreptimeDatabase::Handle: request from {:?} cancelled by client",
76                remote_addr
77            );
78            // If this future is executed it means the request future was dropped,
79            // so it doesn't actually matter what is returned here
80            Err(Status::cancelled(
81                "GreptimeDatabase::Handle: request cancelled by client",
82            ))
83        };
84        cancellation::with_cancellation_handler(request_future, cancellation_future).await
85    }
86
87    async fn handle_requests(
88        &self,
89        request: Request<Streaming<GreptimeRequest>>,
90    ) -> Result<Response<GreptimeResponse>, Status> {
91        let remote_addr = request.remote_addr();
92        let hints = hint_headers::extract_hints(request.metadata());
93        debug!(
94            "GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}",
95            remote_addr, hints
96        );
97        let handler = self.handler.clone();
98        let request_future = async move {
99            let mut affected_rows = 0;
100
101            let mut stream = request.into_inner();
102            while let Some(request) = stream.next().await {
103                let request = request?;
104                let output = handler.handle_request(request, hints.clone()).await?;
105                match output.data {
106                    OutputData::AffectedRows(rows) => affected_rows += rows,
107                    OutputData::Stream(_) | OutputData::RecordBatches(_) => {
108                        return Err(Status::unimplemented(
109                            "GreptimeDatabase::HandleRequests for query",
110                        ));
111                    }
112                }
113            }
114            let message = GreptimeResponse {
115                header: Some(ResponseHeader {
116                    status: Some(api::v1::Status {
117                        status_code: StatusCode::Success as _,
118                        ..Default::default()
119                    }),
120                }),
121                response: Some(RawResponse::AffectedRows(AffectedRows {
122                    value: affected_rows as u32,
123                })),
124            };
125
126            Ok(Response::new(message))
127        };
128
129        let cancellation_future = async move {
130            warn!(
131                "GreptimeDatabase::HandleRequests: request from {:?} cancelled by client",
132                remote_addr
133            );
134            // If this future is executed it means the request future was dropped,
135            // so it doesn't actually matter what is returned here
136            Err(Status::cancelled(
137                "GreptimeDatabase::HandleRequests: request cancelled by client",
138            ))
139        };
140        cancellation::with_cancellation_handler(request_future, cancellation_future).await
141    }
142}