servers/grpc/
database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::greptime_database_server::GreptimeDatabase;
16use api::v1::greptime_response::Response as RawResponse;
17use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
18use async_trait::async_trait;
19use common_error::status_code::StatusCode;
20use common_query::OutputData;
21use common_telemetry::{debug, warn};
22use futures::StreamExt;
23use prost::Message;
24use tonic::{Request, Response, Status, Streaming};
25
26use crate::grpc::greptime_handler::GreptimeRequestHandler;
27use crate::grpc::{TonicResult, cancellation};
28use crate::hint_headers;
29use crate::metrics::{METRIC_GRPC_MEMORY_USAGE_BYTES, METRIC_GRPC_REQUESTS_REJECTED_TOTAL};
30use crate::request_limiter::RequestMemoryLimiter;
31
32pub(crate) struct DatabaseService {
33    handler: GreptimeRequestHandler,
34}
35
36impl DatabaseService {
37    pub(crate) fn new(handler: GreptimeRequestHandler) -> Self {
38        Self { handler }
39    }
40}
41
42#[async_trait]
43impl GreptimeDatabase for DatabaseService {
44    async fn handle(
45        &self,
46        request: Request<GreptimeRequest>,
47    ) -> TonicResult<Response<GreptimeResponse>> {
48        let remote_addr = request.remote_addr();
49        let hints = hint_headers::extract_hints(request.metadata());
50        debug!(
51            "GreptimeDatabase::Handle: request from {:?} with hints: {:?}",
52            remote_addr, hints
53        );
54
55        let _guard = request
56            .extensions()
57            .get::<RequestMemoryLimiter>()
58            .filter(|limiter| limiter.is_enabled())
59            .and_then(|limiter| {
60                let message_size = request.get_ref().encoded_len();
61                limiter
62                    .try_acquire(message_size)
63                    .map(|guard| {
64                        guard.inspect(|g| {
65                            METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
66                        })
67                    })
68                    .inspect_err(|_| {
69                        METRIC_GRPC_REQUESTS_REJECTED_TOTAL.inc();
70                    })
71                    .transpose()
72            })
73            .transpose()?;
74
75        let handler = self.handler.clone();
76        let request_future = async move {
77            let request = request.into_inner();
78            let output = handler.handle_request(request, hints).await?;
79            let message = match output.data {
80                OutputData::AffectedRows(rows) => GreptimeResponse {
81                    header: Some(ResponseHeader {
82                        status: Some(api::v1::Status {
83                            status_code: StatusCode::Success as _,
84                            ..Default::default()
85                        }),
86                    }),
87                    response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
88                },
89                OutputData::Stream(_) | OutputData::RecordBatches(_) => {
90                    return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
91                }
92            };
93
94            Ok(Response::new(message))
95        };
96
97        let cancellation_future = async move {
98            warn!(
99                "GreptimeDatabase::Handle: request from {:?} cancelled by client",
100                remote_addr
101            );
102            // If this future is executed it means the request future was dropped,
103            // so it doesn't actually matter what is returned here
104            Err(Status::cancelled(
105                "GreptimeDatabase::Handle: request cancelled by client",
106            ))
107        };
108        cancellation::with_cancellation_handler(request_future, cancellation_future).await
109    }
110
111    async fn handle_requests(
112        &self,
113        request: Request<Streaming<GreptimeRequest>>,
114    ) -> Result<Response<GreptimeResponse>, Status> {
115        let remote_addr = request.remote_addr();
116        let hints = hint_headers::extract_hints(request.metadata());
117        debug!(
118            "GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}",
119            remote_addr, hints
120        );
121
122        let limiter = request.extensions().get::<RequestMemoryLimiter>().cloned();
123
124        let handler = self.handler.clone();
125        let request_future = async move {
126            let mut affected_rows = 0;
127
128            let mut stream = request.into_inner();
129            while let Some(request) = stream.next().await {
130                let request = request?;
131
132                let _guard = limiter
133                    .as_ref()
134                    .filter(|limiter| limiter.is_enabled())
135                    .and_then(|limiter| {
136                        let message_size = request.encoded_len();
137                        limiter
138                            .try_acquire(message_size)
139                            .map(|guard| {
140                                guard.inspect(|g| {
141                                    METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
142                                })
143                            })
144                            .inspect_err(|_| {
145                                METRIC_GRPC_REQUESTS_REJECTED_TOTAL.inc();
146                            })
147                            .transpose()
148                    })
149                    .transpose()?;
150                let output = handler.handle_request(request, hints.clone()).await?;
151                match output.data {
152                    OutputData::AffectedRows(rows) => affected_rows += rows,
153                    OutputData::Stream(_) | OutputData::RecordBatches(_) => {
154                        return Err(Status::unimplemented(
155                            "GreptimeDatabase::HandleRequests for query",
156                        ));
157                    }
158                }
159            }
160            let message = GreptimeResponse {
161                header: Some(ResponseHeader {
162                    status: Some(api::v1::Status {
163                        status_code: StatusCode::Success as _,
164                        ..Default::default()
165                    }),
166                }),
167                response: Some(RawResponse::AffectedRows(AffectedRows {
168                    value: affected_rows as u32,
169                })),
170            };
171
172            Ok(Response::new(message))
173        };
174
175        let cancellation_future = async move {
176            warn!(
177                "GreptimeDatabase::HandleRequests: request from {:?} cancelled by client",
178                remote_addr
179            );
180            // If this future is executed it means the request future was dropped,
181            // so it doesn't actually matter what is returned here
182            Err(Status::cancelled(
183                "GreptimeDatabase::HandleRequests: request cancelled by client",
184            ))
185        };
186        cancellation::with_cancellation_handler(request_future, cancellation_future).await
187    }
188}