servers/grpc/
database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::greptime_database_server::GreptimeDatabase;
16use api::v1::greptime_response::Response as RawResponse;
17use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
18use async_trait::async_trait;
19use common_error::status_code::StatusCode;
20use common_query::OutputData;
21use common_telemetry::{debug, warn};
22use futures::StreamExt;
23use prost::Message;
24use tonic::{Request, Response, Status, Streaming};
25
26use crate::grpc::greptime_handler::GreptimeRequestHandler;
27use crate::grpc::{TonicResult, cancellation};
28use crate::hint_headers;
29use crate::request_memory_limiter::ServerMemoryLimiter;
30
31pub(crate) struct DatabaseService {
32    handler: GreptimeRequestHandler,
33}
34
35impl DatabaseService {
36    pub(crate) fn new(handler: GreptimeRequestHandler) -> Self {
37        Self { handler }
38    }
39}
40
41#[async_trait]
42impl GreptimeDatabase for DatabaseService {
43    async fn handle(
44        &self,
45        request: Request<GreptimeRequest>,
46    ) -> TonicResult<Response<GreptimeResponse>> {
47        let remote_addr = request.remote_addr();
48        let hints = hint_headers::extract_hints(request.metadata());
49        debug!(
50            "GreptimeDatabase::Handle: request from {:?} with hints: {:?}",
51            remote_addr, hints
52        );
53
54        let _guard = if let Some(limiter) = request.extensions().get::<ServerMemoryLimiter>() {
55            let message_size = request.get_ref().encoded_len() as u64;
56            Some(limiter.acquire(message_size).await?)
57        } else {
58            None
59        };
60
61        let handler = self.handler.clone();
62        let request_future = async move {
63            let request = request.into_inner();
64            let output = handler.handle_request(request, hints).await?;
65            let message = match output.data {
66                OutputData::AffectedRows(rows) => GreptimeResponse {
67                    header: Some(ResponseHeader {
68                        status: Some(api::v1::Status {
69                            status_code: StatusCode::Success as _,
70                            ..Default::default()
71                        }),
72                    }),
73                    response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
74                },
75                OutputData::Stream(_) | OutputData::RecordBatches(_) => {
76                    return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
77                }
78            };
79
80            Ok(Response::new(message))
81        };
82
83        let cancellation_future = async move {
84            warn!(
85                "GreptimeDatabase::Handle: request from {:?} cancelled by client",
86                remote_addr
87            );
88            // If this future is executed it means the request future was dropped,
89            // so it doesn't actually matter what is returned here
90            Err(Status::cancelled(
91                "GreptimeDatabase::Handle: request cancelled by client",
92            ))
93        };
94        cancellation::with_cancellation_handler(request_future, cancellation_future).await
95    }
96
97    async fn handle_requests(
98        &self,
99        request: Request<Streaming<GreptimeRequest>>,
100    ) -> Result<Response<GreptimeResponse>, Status> {
101        let remote_addr = request.remote_addr();
102        let hints = hint_headers::extract_hints(request.metadata());
103        debug!(
104            "GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}",
105            remote_addr, hints
106        );
107
108        let limiter = request.extensions().get::<ServerMemoryLimiter>().cloned();
109
110        let handler = self.handler.clone();
111        let request_future = async move {
112            let mut affected_rows = 0;
113
114            let mut stream = request.into_inner();
115            while let Some(request) = stream.next().await {
116                let request = request?;
117
118                let _guard = if let Some(limiter_ref) = &limiter {
119                    let message_size = request.encoded_len() as u64;
120                    Some(limiter_ref.acquire(message_size).await?)
121                } else {
122                    None
123                };
124                let output = handler.handle_request(request, hints.clone()).await?;
125                match output.data {
126                    OutputData::AffectedRows(rows) => affected_rows += rows,
127                    OutputData::Stream(_) | OutputData::RecordBatches(_) => {
128                        return Err(Status::unimplemented(
129                            "GreptimeDatabase::HandleRequests for query",
130                        ));
131                    }
132                }
133            }
134            let message = GreptimeResponse {
135                header: Some(ResponseHeader {
136                    status: Some(api::v1::Status {
137                        status_code: StatusCode::Success as _,
138                        ..Default::default()
139                    }),
140                }),
141                response: Some(RawResponse::AffectedRows(AffectedRows {
142                    value: affected_rows as u32,
143                })),
144            };
145
146            Ok(Response::new(message))
147        };
148
149        let cancellation_future = async move {
150            warn!(
151                "GreptimeDatabase::HandleRequests: request from {:?} cancelled by client",
152                remote_addr
153            );
154            // If this future is executed it means the request future was dropped,
155            // so it doesn't actually matter what is returned here
156            Err(Status::cancelled(
157                "GreptimeDatabase::HandleRequests: request cancelled by client",
158            ))
159        };
160        cancellation::with_cancellation_handler(request_future, cancellation_future).await
161    }
162}