servers/grpc/
region_server.rs1use std::sync::Arc;
16
17use api::v1::region::region_server::Region as RegionServer;
18use api::v1::region::{region_request, RegionRequest, RegionResponse};
19use async_trait::async_trait;
20use common_error::ext::ErrorExt;
21use common_runtime::runtime::RuntimeTrait;
22use common_runtime::Runtime;
23use common_telemetry::tracing::info_span;
24use common_telemetry::tracing_context::{FutureExt, TracingContext};
25use common_telemetry::{debug, error, warn};
26use snafu::{OptionExt, ResultExt};
27use tonic::{Request, Response, Status};
28
29use crate::error::{InvalidQuerySnafu, JoinTaskSnafu, Result};
30use crate::grpc::{cancellation, TonicResult};
31
32#[async_trait]
33pub trait RegionServerHandler: Send + Sync {
34 async fn handle(&self, request: region_request::Body) -> Result<RegionResponse>;
35}
36
37pub type RegionServerHandlerRef = Arc<dyn RegionServerHandler>;
38
39#[derive(Clone)]
40pub struct RegionServerRequestHandler {
41 handler: Arc<dyn RegionServerHandler>,
42 runtime: Runtime,
43}
44
45impl RegionServerRequestHandler {
46 pub fn new(handler: Arc<dyn RegionServerHandler>, runtime: Runtime) -> Self {
47 Self { handler, runtime }
48 }
49
50 async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
51 let tracing_context = TracingContext::from_w3c(
52 &request
53 .header
54 .context(InvalidQuerySnafu {
55 reason: "Expecting non-empty region request header.",
56 })?
57 .tracing_context,
58 );
59 let query = request.body.context(InvalidQuerySnafu {
60 reason: "Expecting non-empty region request body.",
61 })?;
62
63 let handler = self.handler.clone();
64
65 let handle = self.runtime.spawn(async move {
73 handler
74 .handle(query)
75 .trace(tracing_context.attach(info_span!("RegionServerRequestHandler::handle")))
76 .await
77 .map_err(|e| {
78 if e.status_code().should_log_error() {
79 error!(e; "Failed to handle request");
80 } else {
81 debug!("Failed to handle request, err: {}", e);
83 }
84 e
85 })
86 });
87
88 handle.await.context(JoinTaskSnafu)?
89 }
90}
91
92#[async_trait]
93impl RegionServer for RegionServerRequestHandler {
94 async fn handle(
95 &self,
96 request: Request<RegionRequest>,
97 ) -> TonicResult<Response<RegionResponse>> {
98 let remote_addr = request.remote_addr();
99 let self_cloned = self.clone();
100 let request_future = async move {
101 let request = request.into_inner();
102 let response = self_cloned.handle(request).await?;
103
104 Ok(Response::new(response))
105 };
106
107 let cancellation_future = async move {
108 warn!("Region request from {:?} cancelled by client", remote_addr);
109 Err(Status::cancelled("Region request cancelled by client"))
112 };
113 cancellation::with_cancellation_handler(request_future, cancellation_future).await
114 }
115}