1use api::v1::greptime_database_server::GreptimeDatabase;
16use api::v1::greptime_response::Response as RawResponse;
17use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
18use async_trait::async_trait;
19use common_error::status_code::StatusCode;
20use common_query::OutputData;
21use common_telemetry::{debug, warn};
22use futures::StreamExt;
23use prost::Message;
24use tonic::{Request, Response, Status, Streaming};
25
26use crate::grpc::greptime_handler::GreptimeRequestHandler;
27use crate::grpc::{TonicResult, cancellation};
28use crate::hint_headers;
29use crate::request_memory_limiter::ServerMemoryLimiter;
30
31pub(crate) struct DatabaseService {
32 handler: GreptimeRequestHandler,
33}
34
35impl DatabaseService {
36 pub(crate) fn new(handler: GreptimeRequestHandler) -> Self {
37 Self { handler }
38 }
39}
40
41#[async_trait]
42impl GreptimeDatabase for DatabaseService {
43 async fn handle(
44 &self,
45 request: Request<GreptimeRequest>,
46 ) -> TonicResult<Response<GreptimeResponse>> {
47 let remote_addr = request.remote_addr();
48 let hints = hint_headers::extract_hints(request.metadata());
49 debug!(
50 "GreptimeDatabase::Handle: request from {:?} with hints: {:?}",
51 remote_addr, hints
52 );
53
54 let _guard = if let Some(limiter) = request.extensions().get::<ServerMemoryLimiter>() {
55 let message_size = request.get_ref().encoded_len() as u64;
56 Some(limiter.acquire(message_size).await?)
57 } else {
58 None
59 };
60
61 let handler = self.handler.clone();
62 let request_future = async move {
63 let request = request.into_inner();
64 let output = handler.handle_request(request, hints).await?;
65 let message = match output.data {
66 OutputData::AffectedRows(rows) => GreptimeResponse {
67 header: Some(ResponseHeader {
68 status: Some(api::v1::Status {
69 status_code: StatusCode::Success as _,
70 ..Default::default()
71 }),
72 }),
73 response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
74 },
75 OutputData::Stream(_) | OutputData::RecordBatches(_) => {
76 return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
77 }
78 };
79
80 Ok(Response::new(message))
81 };
82
83 let cancellation_future = async move {
84 warn!(
85 "GreptimeDatabase::Handle: request from {:?} cancelled by client",
86 remote_addr
87 );
88 Err(Status::cancelled(
91 "GreptimeDatabase::Handle: request cancelled by client",
92 ))
93 };
94 cancellation::with_cancellation_handler(request_future, cancellation_future).await
95 }
96
97 async fn handle_requests(
98 &self,
99 request: Request<Streaming<GreptimeRequest>>,
100 ) -> Result<Response<GreptimeResponse>, Status> {
101 let remote_addr = request.remote_addr();
102 let hints = hint_headers::extract_hints(request.metadata());
103 debug!(
104 "GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}",
105 remote_addr, hints
106 );
107
108 let limiter = request.extensions().get::<ServerMemoryLimiter>().cloned();
109
110 let handler = self.handler.clone();
111 let request_future = async move {
112 let mut affected_rows = 0;
113
114 let mut stream = request.into_inner();
115 while let Some(request) = stream.next().await {
116 let request = request?;
117
118 let _guard = if let Some(limiter_ref) = &limiter {
119 let message_size = request.encoded_len() as u64;
120 Some(limiter_ref.acquire(message_size).await?)
121 } else {
122 None
123 };
124 let output = handler.handle_request(request, hints.clone()).await?;
125 match output.data {
126 OutputData::AffectedRows(rows) => affected_rows += rows,
127 OutputData::Stream(_) | OutputData::RecordBatches(_) => {
128 return Err(Status::unimplemented(
129 "GreptimeDatabase::HandleRequests for query",
130 ));
131 }
132 }
133 }
134 let message = GreptimeResponse {
135 header: Some(ResponseHeader {
136 status: Some(api::v1::Status {
137 status_code: StatusCode::Success as _,
138 ..Default::default()
139 }),
140 }),
141 response: Some(RawResponse::AffectedRows(AffectedRows {
142 value: affected_rows as u32,
143 })),
144 };
145
146 Ok(Response::new(message))
147 };
148
149 let cancellation_future = async move {
150 warn!(
151 "GreptimeDatabase::HandleRequests: request from {:?} cancelled by client",
152 remote_addr
153 );
154 Err(Status::cancelled(
157 "GreptimeDatabase::HandleRequests: request cancelled by client",
158 ))
159 };
160 cancellation::with_cancellation_handler(request_future, cancellation_future).await
161 }
162}