servers/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod authorize;
16pub mod builder;
17mod cancellation;
18mod database;
19pub mod flight;
20pub mod frontend_grpc_handler;
21pub mod greptime_handler;
22pub mod prom_query_gateway;
23pub mod region_server;
24
25use std::net::SocketAddr;
26
27use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
28use api::v1::{HealthCheckRequest, HealthCheckResponse};
29use async_trait::async_trait;
30use common_base::readable_size::ReadableSize;
31use common_grpc::channel_manager::{
32    DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
33};
34use common_telemetry::{error, info, warn};
35use futures::FutureExt;
36use otel_arrow_rust::opentelemetry::ArrowMetricsServiceServer;
37use serde::{Deserialize, Serialize};
38use snafu::{ensure, OptionExt, ResultExt};
39use tokio::net::TcpListener;
40use tokio::sync::oneshot::{self, Receiver, Sender};
41use tokio::sync::Mutex;
42use tonic::service::interceptor::InterceptedService;
43use tonic::service::Routes;
44use tonic::transport::server::TcpIncoming;
45use tonic::transport::ServerTlsConfig;
46use tonic::{Request, Response, Status};
47use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
48
49use crate::error::{
50    AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu,
51};
52use crate::metrics::MetricsMiddlewareLayer;
53use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
54use crate::query_handler::OpenTelemetryProtocolHandlerRef;
55use crate::server::Server;
56use crate::tls::TlsOption;
57
58type TonicResult<T> = std::result::Result<T, Status>;
59
60#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
61pub struct GrpcOptions {
62    /// The address to bind the gRPC server.
63    pub bind_addr: String,
64    /// The address to advertise to clients.
65    pub server_addr: String,
66    /// Max gRPC receiving(decoding) message size
67    pub max_recv_message_size: ReadableSize,
68    /// Max gRPC sending(encoding) message size
69    pub max_send_message_size: ReadableSize,
70    /// Compression mode in Arrow Flight service.
71    pub flight_compression: FlightCompression,
72    pub runtime_size: usize,
73    #[serde(default = "Default::default")]
74    pub tls: TlsOption,
75}
76
77impl GrpcOptions {
78    /// Detect the server address.
79    #[cfg(not(target_os = "android"))]
80    pub fn detect_server_addr(&mut self) {
81        if self.server_addr.is_empty() {
82            match local_ip_address::local_ip() {
83                Ok(ip) => {
84                    let detected_addr = format!(
85                        "{}:{}",
86                        ip,
87                        self.bind_addr
88                            .split(':')
89                            .nth(1)
90                            .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
91                    );
92                    info!("Using detected: {} as server address", detected_addr);
93                    self.server_addr = detected_addr;
94                }
95                Err(e) => {
96                    error!("Failed to detect local ip address: {}", e);
97                }
98            }
99        }
100    }
101
102    #[cfg(target_os = "android")]
103    pub fn detect_server_addr(&mut self) {
104        if self.server_addr.is_empty() {
105            common_telemetry::debug!("detect local IP is not supported on Android");
106        }
107    }
108
109    /// Create a [GrpcServerConfig] from self's options.
110    pub fn as_config(&self) -> GrpcServerConfig {
111        GrpcServerConfig {
112            max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
113            max_send_message_size: self.max_send_message_size.as_bytes() as usize,
114            tls: self.tls.clone(),
115        }
116    }
117}
118
119const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
120
121impl Default for GrpcOptions {
122    fn default() -> Self {
123        Self {
124            bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
125            // If hostname is not set, the server will use the local ip address as the hostname.
126            server_addr: String::new(),
127            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
128            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
129            flight_compression: FlightCompression::ArrowIpc,
130            runtime_size: 8,
131            tls: TlsOption::default(),
132        }
133    }
134}
135
136impl GrpcOptions {
137    pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
138        self.bind_addr = bind_addr.to_string();
139        self
140    }
141
142    pub fn with_server_addr(mut self, server_addr: &str) -> Self {
143        self.server_addr = server_addr.to_string();
144        self
145    }
146}
147
148#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
149#[serde(rename_all = "snake_case")]
150pub enum FlightCompression {
151    /// Disable all compression in Arrow Flight service.
152    #[default]
153    None,
154    /// Enable only transport layer compression (zstd).
155    Transport,
156    /// Enable only payload compression (lz4)
157    ArrowIpc,
158    /// Enable all compression.
159    All,
160}
161
162impl FlightCompression {
163    pub fn transport_compression(&self) -> bool {
164        self == &FlightCompression::Transport || self == &FlightCompression::All
165    }
166
167    pub fn arrow_compression(&self) -> bool {
168        self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
169    }
170}
171
172pub struct GrpcServer {
173    // states
174    shutdown_tx: Mutex<Option<Sender<()>>>,
175    /// gRPC serving state receiver. Only present if the gRPC server is started.
176    /// Used to wait for the server to stop, performing the old blocking fashion.
177    serve_state: Mutex<Option<Receiver<Result<()>>>>,
178    // handlers
179    routes: Mutex<Option<Routes>>,
180    // tls config
181    tls_config: Option<ServerTlsConfig>,
182    // Otel arrow service
183    otel_arrow_service: Mutex<
184        Option<
185            InterceptedService<
186                ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
187                HeaderInterceptor,
188            >,
189        >,
190    >,
191    bind_addr: Option<SocketAddr>,
192}
193
194/// Grpc Server configuration
195#[derive(Debug, Clone)]
196pub struct GrpcServerConfig {
197    // Max gRPC receiving(decoding) message size
198    pub max_recv_message_size: usize,
199    // Max gRPC sending(encoding) message size
200    pub max_send_message_size: usize,
201    pub tls: TlsOption,
202}
203
204impl Default for GrpcServerConfig {
205    fn default() -> Self {
206        Self {
207            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
208            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
209            tls: TlsOption::default(),
210        }
211    }
212}
213
214impl GrpcServer {
215    pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
216        HealthCheckServer::new(HealthCheckHandler)
217    }
218
219    pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
220        tonic_reflection::server::Builder::configure()
221            .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
222            .with_service_name("greptime.v1.GreptimeDatabase")
223            .with_service_name("greptime.v1.HealthCheck")
224            .with_service_name("greptime.v1.RegionServer")
225            .build_v1()
226            .inspect_err(|e| {
227                common_telemetry::error!(e; "Failed to build gRPC reflection server");
228            })
229            .unwrap()
230    }
231
232    pub async fn wait_for_serve(&self) -> Result<()> {
233        let mut serve_state = self.serve_state.lock().await;
234        let rx = serve_state.take().context(InternalSnafu {
235            err_msg: "gRPC serving state is unknown, maybe the server is not started, \
236                      or we have already waited for the serve result before.",
237        })?;
238        let Ok(result) = rx.await else {
239            warn!("Background gRPC serving task is quited before we can receive the serve result.");
240            return Ok(());
241        };
242        if let Err(e) = result {
243            error!(e; "GRPC serve error");
244        }
245        Ok(())
246    }
247}
248
249pub struct HealthCheckHandler;
250
251#[async_trait]
252impl HealthCheck for HealthCheckHandler {
253    async fn health_check(
254        &self,
255        _req: Request<HealthCheckRequest>,
256    ) -> TonicResult<Response<HealthCheckResponse>> {
257        Ok(Response::new(HealthCheckResponse {}))
258    }
259}
260
261pub const GRPC_SERVER: &str = "GRPC_SERVER";
262
263#[async_trait]
264impl Server for GrpcServer {
265    async fn shutdown(&self) -> Result<()> {
266        let mut shutdown_tx = self.shutdown_tx.lock().await;
267        if let Some(tx) = shutdown_tx.take() {
268            if tx.send(()).is_err() {
269                info!("Receiver dropped, the grpc server has already existed");
270            }
271        }
272        info!("Shutdown grpc server");
273
274        Ok(())
275    }
276
277    async fn start(&mut self, addr: SocketAddr) -> Result<()> {
278        let routes = {
279            let mut routes = self.routes.lock().await;
280            let Some(routes) = routes.take() else {
281                return AlreadyStartedSnafu {
282                    server: self.name(),
283                }
284                .fail();
285            };
286            routes
287        };
288
289        let (tx, rx) = oneshot::channel();
290        let (incoming, addr) = {
291            let mut shutdown_tx = self.shutdown_tx.lock().await;
292            ensure!(
293                shutdown_tx.is_none(),
294                AlreadyStartedSnafu { server: "gRPC" }
295            );
296
297            let listener = TcpListener::bind(addr)
298                .await
299                .context(TcpBindSnafu { addr })?;
300            let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
301            let incoming =
302                TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
303            info!("gRPC server is bound to {}", addr);
304
305            *shutdown_tx = Some(tx);
306
307            (incoming, addr)
308        };
309
310        let metrics_layer = tower::ServiceBuilder::new()
311            .layer(MetricsMiddlewareLayer)
312            .into_inner();
313
314        let mut builder = tonic::transport::Server::builder().layer(metrics_layer);
315        if let Some(tls_config) = self.tls_config.clone() {
316            builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
317        }
318
319        let mut builder = builder
320            .add_routes(routes)
321            .add_service(self.create_healthcheck_service())
322            .add_service(self.create_reflection_service());
323
324        if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
325            builder = builder.add_service(otel_arrow_service);
326        }
327
328        let (serve_state_tx, serve_state_rx) = oneshot::channel();
329        let mut serve_state = self.serve_state.lock().await;
330        *serve_state = Some(serve_state_rx);
331
332        let _handle = common_runtime::spawn_global(async move {
333            let result = builder
334                .serve_with_incoming_shutdown(incoming, rx.map(drop))
335                .await
336                .context(StartGrpcSnafu);
337            serve_state_tx.send(result)
338        });
339
340        self.bind_addr = Some(addr);
341        Ok(())
342    }
343
344    fn name(&self) -> &str {
345        GRPC_SERVER
346    }
347
348    fn bind_addr(&self) -> Option<SocketAddr> {
349        self.bind_addr
350    }
351}