1use api::v1::greptime_database_server::GreptimeDatabase;
16use api::v1::greptime_response::Response as RawResponse;
17use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
18use async_trait::async_trait;
19use common_error::status_code::StatusCode;
20use common_query::OutputData;
21use common_telemetry::{debug, warn};
22use futures::StreamExt;
23use prost::Message;
24use tonic::{Request, Response, Status, Streaming};
25
26use crate::grpc::greptime_handler::GreptimeRequestHandler;
27use crate::grpc::{TonicResult, cancellation};
28use crate::hint_headers;
29use crate::metrics::{METRIC_GRPC_MEMORY_USAGE_BYTES, METRIC_GRPC_REQUESTS_REJECTED_TOTAL};
30use crate::request_limiter::RequestMemoryLimiter;
31
32pub(crate) struct DatabaseService {
33 handler: GreptimeRequestHandler,
34}
35
36impl DatabaseService {
37 pub(crate) fn new(handler: GreptimeRequestHandler) -> Self {
38 Self { handler }
39 }
40}
41
42#[async_trait]
43impl GreptimeDatabase for DatabaseService {
44 async fn handle(
45 &self,
46 request: Request<GreptimeRequest>,
47 ) -> TonicResult<Response<GreptimeResponse>> {
48 let remote_addr = request.remote_addr();
49 let hints = hint_headers::extract_hints(request.metadata());
50 debug!(
51 "GreptimeDatabase::Handle: request from {:?} with hints: {:?}",
52 remote_addr, hints
53 );
54
55 let _guard = request
56 .extensions()
57 .get::<RequestMemoryLimiter>()
58 .filter(|limiter| limiter.is_enabled())
59 .and_then(|limiter| {
60 let message_size = request.get_ref().encoded_len();
61 limiter
62 .try_acquire(message_size)
63 .map(|guard| {
64 guard.inspect(|g| {
65 METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
66 })
67 })
68 .inspect_err(|_| {
69 METRIC_GRPC_REQUESTS_REJECTED_TOTAL.inc();
70 })
71 .transpose()
72 })
73 .transpose()?;
74
75 let handler = self.handler.clone();
76 let request_future = async move {
77 let request = request.into_inner();
78 let output = handler.handle_request(request, hints).await?;
79 let message = match output.data {
80 OutputData::AffectedRows(rows) => GreptimeResponse {
81 header: Some(ResponseHeader {
82 status: Some(api::v1::Status {
83 status_code: StatusCode::Success as _,
84 ..Default::default()
85 }),
86 }),
87 response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
88 },
89 OutputData::Stream(_) | OutputData::RecordBatches(_) => {
90 return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
91 }
92 };
93
94 Ok(Response::new(message))
95 };
96
97 let cancellation_future = async move {
98 warn!(
99 "GreptimeDatabase::Handle: request from {:?} cancelled by client",
100 remote_addr
101 );
102 Err(Status::cancelled(
105 "GreptimeDatabase::Handle: request cancelled by client",
106 ))
107 };
108 cancellation::with_cancellation_handler(request_future, cancellation_future).await
109 }
110
111 async fn handle_requests(
112 &self,
113 request: Request<Streaming<GreptimeRequest>>,
114 ) -> Result<Response<GreptimeResponse>, Status> {
115 let remote_addr = request.remote_addr();
116 let hints = hint_headers::extract_hints(request.metadata());
117 debug!(
118 "GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}",
119 remote_addr, hints
120 );
121
122 let limiter = request.extensions().get::<RequestMemoryLimiter>().cloned();
123
124 let handler = self.handler.clone();
125 let request_future = async move {
126 let mut affected_rows = 0;
127
128 let mut stream = request.into_inner();
129 while let Some(request) = stream.next().await {
130 let request = request?;
131
132 let _guard = limiter
133 .as_ref()
134 .filter(|limiter| limiter.is_enabled())
135 .and_then(|limiter| {
136 let message_size = request.encoded_len();
137 limiter
138 .try_acquire(message_size)
139 .map(|guard| {
140 guard.inspect(|g| {
141 METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
142 })
143 })
144 .inspect_err(|_| {
145 METRIC_GRPC_REQUESTS_REJECTED_TOTAL.inc();
146 })
147 .transpose()
148 })
149 .transpose()?;
150 let output = handler.handle_request(request, hints.clone()).await?;
151 match output.data {
152 OutputData::AffectedRows(rows) => affected_rows += rows,
153 OutputData::Stream(_) | OutputData::RecordBatches(_) => {
154 return Err(Status::unimplemented(
155 "GreptimeDatabase::HandleRequests for query",
156 ));
157 }
158 }
159 }
160 let message = GreptimeResponse {
161 header: Some(ResponseHeader {
162 status: Some(api::v1::Status {
163 status_code: StatusCode::Success as _,
164 ..Default::default()
165 }),
166 }),
167 response: Some(RawResponse::AffectedRows(AffectedRows {
168 value: affected_rows as u32,
169 })),
170 };
171
172 Ok(Response::new(message))
173 };
174
175 let cancellation_future = async move {
176 warn!(
177 "GreptimeDatabase::HandleRequests: request from {:?} cancelled by client",
178 remote_addr
179 );
180 Err(Status::cancelled(
183 "GreptimeDatabase::HandleRequests: request cancelled by client",
184 ))
185 };
186 cancellation::with_cancellation_handler(request_future, cancellation_future).await
187 }
188}