1use api::v1::greptime_database_server::GreptimeDatabase;
16use api::v1::greptime_response::Response as RawResponse;
17use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
18use async_trait::async_trait;
19use common_error::status_code::StatusCode;
20use common_query::OutputData;
21use common_telemetry::{debug, warn};
22use futures::StreamExt;
23use tonic::{Request, Response, Status, Streaming};
24
25use crate::grpc::greptime_handler::GreptimeRequestHandler;
26use crate::grpc::{cancellation, TonicResult};
27use crate::hint_headers;
28
29pub(crate) struct DatabaseService {
30 handler: GreptimeRequestHandler,
31}
32
33impl DatabaseService {
34 pub(crate) fn new(handler: GreptimeRequestHandler) -> Self {
35 Self { handler }
36 }
37}
38
39#[async_trait]
40impl GreptimeDatabase for DatabaseService {
41 async fn handle(
42 &self,
43 request: Request<GreptimeRequest>,
44 ) -> TonicResult<Response<GreptimeResponse>> {
45 let remote_addr = request.remote_addr();
46 let hints = hint_headers::extract_hints(request.metadata());
47 debug!(
48 "GreptimeDatabase::Handle: request from {:?} with hints: {:?}",
49 remote_addr, hints
50 );
51 let handler = self.handler.clone();
52 let request_future = async move {
53 let request = request.into_inner();
54 let output = handler.handle_request(request, hints).await?;
55 let message = match output.data {
56 OutputData::AffectedRows(rows) => GreptimeResponse {
57 header: Some(ResponseHeader {
58 status: Some(api::v1::Status {
59 status_code: StatusCode::Success as _,
60 ..Default::default()
61 }),
62 }),
63 response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
64 },
65 OutputData::Stream(_) | OutputData::RecordBatches(_) => {
66 return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
67 }
68 };
69
70 Ok(Response::new(message))
71 };
72
73 let cancellation_future = async move {
74 warn!(
75 "GreptimeDatabase::Handle: request from {:?} cancelled by client",
76 remote_addr
77 );
78 Err(Status::cancelled(
81 "GreptimeDatabase::Handle: request cancelled by client",
82 ))
83 };
84 cancellation::with_cancellation_handler(request_future, cancellation_future).await
85 }
86
87 async fn handle_requests(
88 &self,
89 request: Request<Streaming<GreptimeRequest>>,
90 ) -> Result<Response<GreptimeResponse>, Status> {
91 let remote_addr = request.remote_addr();
92 let hints = hint_headers::extract_hints(request.metadata());
93 debug!(
94 "GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}",
95 remote_addr, hints
96 );
97 let handler = self.handler.clone();
98 let request_future = async move {
99 let mut affected_rows = 0;
100
101 let mut stream = request.into_inner();
102 while let Some(request) = stream.next().await {
103 let request = request?;
104 let output = handler.handle_request(request, hints.clone()).await?;
105 match output.data {
106 OutputData::AffectedRows(rows) => affected_rows += rows,
107 OutputData::Stream(_) | OutputData::RecordBatches(_) => {
108 return Err(Status::unimplemented(
109 "GreptimeDatabase::HandleRequests for query",
110 ));
111 }
112 }
113 }
114 let message = GreptimeResponse {
115 header: Some(ResponseHeader {
116 status: Some(api::v1::Status {
117 status_code: StatusCode::Success as _,
118 ..Default::default()
119 }),
120 }),
121 response: Some(RawResponse::AffectedRows(AffectedRows {
122 value: affected_rows as u32,
123 })),
124 };
125
126 Ok(Response::new(message))
127 };
128
129 let cancellation_future = async move {
130 warn!(
131 "GreptimeDatabase::HandleRequests: request from {:?} cancelled by client",
132 remote_addr
133 );
134 Err(Status::cancelled(
137 "GreptimeDatabase::HandleRequests: request cancelled by client",
138 ))
139 };
140 cancellation::with_cancellation_handler(request_future, cancellation_future).await
141 }
142}