servers/grpc/
region_server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::region::region_server::Region as RegionServer;
18use api::v1::region::{region_request, RegionRequest, RegionResponse};
19use async_trait::async_trait;
20use common_error::ext::ErrorExt;
21use common_runtime::runtime::RuntimeTrait;
22use common_runtime::Runtime;
23use common_telemetry::tracing::info_span;
24use common_telemetry::tracing_context::{FutureExt, TracingContext};
25use common_telemetry::{debug, error, warn};
26use snafu::{OptionExt, ResultExt};
27use tonic::{Request, Response, Status};
28
29use crate::error::{InvalidQuerySnafu, JoinTaskSnafu, Result};
30use crate::grpc::{cancellation, TonicResult};
31
32#[async_trait]
33pub trait RegionServerHandler: Send + Sync {
34    async fn handle(&self, request: region_request::Body) -> Result<RegionResponse>;
35}
36
37pub type RegionServerHandlerRef = Arc<dyn RegionServerHandler>;
38
39#[derive(Clone)]
40pub struct RegionServerRequestHandler {
41    handler: Arc<dyn RegionServerHandler>,
42    runtime: Runtime,
43}
44
45impl RegionServerRequestHandler {
46    pub fn new(handler: Arc<dyn RegionServerHandler>, runtime: Runtime) -> Self {
47        Self { handler, runtime }
48    }
49
50    async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
51        let tracing_context = TracingContext::from_w3c(
52            &request
53                .header
54                .context(InvalidQuerySnafu {
55                    reason: "Expecting non-empty region request header.",
56                })?
57                .tracing_context,
58        );
59        let query = request.body.context(InvalidQuerySnafu {
60            reason: "Expecting non-empty region request body.",
61        })?;
62
63        let handler = self.handler.clone();
64
65        // Executes requests in another runtime to
66        // 1. prevent the execution from being cancelled unexpected by Tonic runtime;
67        //   - Refer to our blog for the rational behind it:
68        //     https://www.greptime.com/blogs/2023-01-12-hidden-control-flow.html
69        //   - Obtaining a `JoinHandle` to get the panic message (if there's any).
70        //     From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
71        // 2. avoid the handler blocks the gRPC runtime incidentally.
72        let handle = self.runtime.spawn(async move {
73            handler
74                .handle(query)
75                .trace(tracing_context.attach(info_span!("RegionServerRequestHandler::handle")))
76                .await
77                .map_err(|e| {
78                    if e.status_code().should_log_error() {
79                        error!(e; "Failed to handle request");
80                    } else {
81                        // Currently, we still print a debug log.
82                        debug!("Failed to handle request, err: {}", e);
83                    }
84                    e
85                })
86        });
87
88        handle.await.context(JoinTaskSnafu)?
89    }
90}
91
92#[async_trait]
93impl RegionServer for RegionServerRequestHandler {
94    async fn handle(
95        &self,
96        request: Request<RegionRequest>,
97    ) -> TonicResult<Response<RegionResponse>> {
98        let remote_addr = request.remote_addr();
99        let self_cloned = self.clone();
100        let request_future = async move {
101            let request = request.into_inner();
102            let response = self_cloned.handle(request).await?;
103
104            Ok(Response::new(response))
105        };
106
107        let cancellation_future = async move {
108            warn!("Region request from {:?} cancelled by client", remote_addr);
109            // If this future is executed it means the request future was dropped,
110            // so it doesn't actually matter what is returned here
111            Err(Status::cancelled("Region request cancelled by client"))
112        };
113        cancellation::with_cancellation_handler(request_future, cancellation_future).await
114    }
115}