servers/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod authorize;
16pub mod builder;
17mod cancellation;
18mod database;
19pub mod flight;
20pub mod greptime_handler;
21pub mod prom_query_gateway;
22pub mod region_server;
23
24use std::net::SocketAddr;
25
26use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
27use api::v1::{HealthCheckRequest, HealthCheckResponse};
28use async_trait::async_trait;
29use common_base::readable_size::ReadableSize;
30use common_grpc::channel_manager::{
31    DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
32};
33use common_telemetry::{error, info, warn};
34use futures::FutureExt;
35use otel_arrow_rust::opentelemetry::ArrowMetricsServiceServer;
36use serde::{Deserialize, Serialize};
37use snafu::{ensure, OptionExt, ResultExt};
38use tokio::net::TcpListener;
39use tokio::sync::oneshot::{self, Receiver, Sender};
40use tokio::sync::Mutex;
41use tonic::service::interceptor::InterceptedService;
42use tonic::service::Routes;
43use tonic::transport::server::TcpIncoming;
44use tonic::transport::ServerTlsConfig;
45use tonic::{Request, Response, Status};
46use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
47
48use crate::error::{
49    AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu,
50};
51use crate::metrics::MetricsMiddlewareLayer;
52use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
53use crate::query_handler::OpenTelemetryProtocolHandlerRef;
54use crate::server::Server;
55use crate::tls::TlsOption;
56
57type TonicResult<T> = std::result::Result<T, Status>;
58
59#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
60pub struct GrpcOptions {
61    /// The address to bind the gRPC server.
62    pub bind_addr: String,
63    /// The address to advertise to clients.
64    pub server_addr: String,
65    /// Max gRPC receiving(decoding) message size
66    pub max_recv_message_size: ReadableSize,
67    /// Max gRPC sending(encoding) message size
68    pub max_send_message_size: ReadableSize,
69    pub runtime_size: usize,
70    #[serde(default = "Default::default")]
71    pub tls: TlsOption,
72}
73
74impl GrpcOptions {
75    /// Detect the server address.
76    #[cfg(not(target_os = "android"))]
77    pub fn detect_server_addr(&mut self) {
78        if self.server_addr.is_empty() {
79            match local_ip_address::local_ip() {
80                Ok(ip) => {
81                    let detected_addr = format!(
82                        "{}:{}",
83                        ip,
84                        self.bind_addr
85                            .split(':')
86                            .nth(1)
87                            .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
88                    );
89                    info!("Using detected: {} as server address", detected_addr);
90                    self.server_addr = detected_addr;
91                }
92                Err(e) => {
93                    error!("Failed to detect local ip address: {}", e);
94                }
95            }
96        }
97    }
98
99    #[cfg(target_os = "android")]
100    pub fn detect_server_addr(&mut self) {
101        if self.server_addr.is_empty() {
102            common_telemetry::debug!("detect local IP is not supported on Android");
103        }
104    }
105}
106
107const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
108
109impl Default for GrpcOptions {
110    fn default() -> Self {
111        Self {
112            bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
113            // If hostname is not set, the server will use the local ip address as the hostname.
114            server_addr: String::new(),
115            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
116            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
117            runtime_size: 8,
118            tls: TlsOption::default(),
119        }
120    }
121}
122
123impl GrpcOptions {
124    pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
125        self.bind_addr = bind_addr.to_string();
126        self
127    }
128
129    pub fn with_server_addr(mut self, server_addr: &str) -> Self {
130        self.server_addr = server_addr.to_string();
131        self
132    }
133}
134
135pub struct GrpcServer {
136    // states
137    shutdown_tx: Mutex<Option<Sender<()>>>,
138    /// gRPC serving state receiver. Only present if the gRPC server is started.
139    /// Used to wait for the server to stop, performing the old blocking fashion.
140    serve_state: Mutex<Option<Receiver<Result<()>>>>,
141    // handlers
142    routes: Mutex<Option<Routes>>,
143    // tls config
144    tls_config: Option<ServerTlsConfig>,
145    // Otel arrow service
146    otel_arrow_service: Mutex<
147        Option<
148            InterceptedService<
149                ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
150                HeaderInterceptor,
151            >,
152        >,
153    >,
154    bind_addr: Option<SocketAddr>,
155}
156
157/// Grpc Server configuration
158#[derive(Debug, Clone)]
159pub struct GrpcServerConfig {
160    // Max gRPC receiving(decoding) message size
161    pub max_recv_message_size: usize,
162    // Max gRPC sending(encoding) message size
163    pub max_send_message_size: usize,
164    pub tls: TlsOption,
165}
166
167impl Default for GrpcServerConfig {
168    fn default() -> Self {
169        Self {
170            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
171            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
172            tls: TlsOption::default(),
173        }
174    }
175}
176
177impl GrpcServer {
178    pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
179        HealthCheckServer::new(HealthCheckHandler)
180    }
181
182    pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
183        tonic_reflection::server::Builder::configure()
184            .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
185            .with_service_name("greptime.v1.GreptimeDatabase")
186            .with_service_name("greptime.v1.HealthCheck")
187            .with_service_name("greptime.v1.RegionServer")
188            .build_v1()
189            .inspect_err(|e| {
190                common_telemetry::error!(e; "Failed to build gRPC reflection server");
191            })
192            .unwrap()
193    }
194
195    pub async fn wait_for_serve(&self) -> Result<()> {
196        let mut serve_state = self.serve_state.lock().await;
197        let rx = serve_state.take().context(InternalSnafu {
198            err_msg: "gRPC serving state is unknown, maybe the server is not started, \
199                      or we have already waited for the serve result before.",
200        })?;
201        let Ok(result) = rx.await else {
202            warn!("Background gRPC serving task is quited before we can receive the serve result.");
203            return Ok(());
204        };
205        if let Err(e) = result {
206            error!(e; "GRPC serve error");
207        }
208        Ok(())
209    }
210}
211
212pub struct HealthCheckHandler;
213
214#[async_trait]
215impl HealthCheck for HealthCheckHandler {
216    async fn health_check(
217        &self,
218        _req: Request<HealthCheckRequest>,
219    ) -> TonicResult<Response<HealthCheckResponse>> {
220        Ok(Response::new(HealthCheckResponse {}))
221    }
222}
223
224pub const GRPC_SERVER: &str = "GRPC_SERVER";
225
226#[async_trait]
227impl Server for GrpcServer {
228    async fn shutdown(&self) -> Result<()> {
229        let mut shutdown_tx = self.shutdown_tx.lock().await;
230        if let Some(tx) = shutdown_tx.take() {
231            if tx.send(()).is_err() {
232                info!("Receiver dropped, the grpc server has already existed");
233            }
234        }
235        info!("Shutdown grpc server");
236
237        Ok(())
238    }
239
240    async fn start(&mut self, addr: SocketAddr) -> Result<()> {
241        let routes = {
242            let mut routes = self.routes.lock().await;
243            let Some(routes) = routes.take() else {
244                return AlreadyStartedSnafu {
245                    server: self.name(),
246                }
247                .fail();
248            };
249            routes
250        };
251
252        let (tx, rx) = oneshot::channel();
253        let (incoming, addr) = {
254            let mut shutdown_tx = self.shutdown_tx.lock().await;
255            ensure!(
256                shutdown_tx.is_none(),
257                AlreadyStartedSnafu { server: "gRPC" }
258            );
259
260            let listener = TcpListener::bind(addr)
261                .await
262                .context(TcpBindSnafu { addr })?;
263            let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
264            let incoming =
265                TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
266            info!("gRPC server is bound to {}", addr);
267
268            *shutdown_tx = Some(tx);
269
270            (incoming, addr)
271        };
272
273        let metrics_layer = tower::ServiceBuilder::new()
274            .layer(MetricsMiddlewareLayer)
275            .into_inner();
276
277        let mut builder = tonic::transport::Server::builder().layer(metrics_layer);
278        if let Some(tls_config) = self.tls_config.clone() {
279            builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
280        }
281
282        let mut builder = builder
283            .add_routes(routes)
284            .add_service(self.create_healthcheck_service())
285            .add_service(self.create_reflection_service());
286
287        if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
288            builder = builder.add_service(otel_arrow_service);
289        }
290
291        let (serve_state_tx, serve_state_rx) = oneshot::channel();
292        let mut serve_state = self.serve_state.lock().await;
293        *serve_state = Some(serve_state_rx);
294
295        let _handle = common_runtime::spawn_global(async move {
296            let result = builder
297                .serve_with_incoming_shutdown(incoming, rx.map(drop))
298                .await
299                .context(StartGrpcSnafu);
300            serve_state_tx.send(result)
301        });
302
303        self.bind_addr = Some(addr);
304        Ok(())
305    }
306
307    fn name(&self) -> &str {
308        GRPC_SERVER
309    }
310
311    fn bind_addr(&self) -> Option<SocketAddr> {
312        self.bind_addr
313    }
314}