servers/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod cancellation;
17pub mod context_auth;
18mod database;
19pub mod flight;
20pub mod frontend_grpc_handler;
21pub mod greptime_handler;
22pub mod memory_limit;
23pub mod prom_query_gateway;
24pub mod region_server;
25
26use std::any::Any;
27use std::net::SocketAddr;
28use std::time::Duration;
29
30use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
31use api::v1::{HealthCheckRequest, HealthCheckResponse};
32use async_trait::async_trait;
33use common_base::readable_size::ReadableSize;
34use common_grpc::channel_manager::{
35    DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
36};
37use common_telemetry::{error, info, warn};
38use futures::FutureExt;
39use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
40use serde::{Deserialize, Serialize};
41use snafu::{OptionExt, ResultExt, ensure};
42use tokio::net::TcpListener;
43use tokio::sync::Mutex;
44use tokio::sync::oneshot::{self, Receiver, Sender};
45use tonic::service::Routes;
46use tonic::service::interceptor::InterceptedService;
47use tonic::transport::ServerTlsConfig;
48use tonic::transport::server::TcpIncoming;
49use tonic::{Request, Response, Status};
50use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer};
51
52use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
53use crate::metrics::MetricsMiddlewareLayer;
54use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
55use crate::query_handler::OpenTelemetryProtocolHandlerRef;
56use crate::server::Server;
57use crate::tls::TlsOption;
58
59type TonicResult<T> = std::result::Result<T, Status>;
60
61#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
62#[serde(default)]
63pub struct GrpcOptions {
64    /// The address to bind the gRPC server.
65    pub bind_addr: String,
66    /// The address to advertise to clients.
67    pub server_addr: String,
68    /// Max gRPC receiving(decoding) message size
69    pub max_recv_message_size: ReadableSize,
70    /// Max gRPC sending(encoding) message size
71    pub max_send_message_size: ReadableSize,
72    /// Compression mode in Arrow Flight service.
73    pub flight_compression: FlightCompression,
74    pub runtime_size: usize,
75    #[serde(default = "Default::default")]
76    pub tls: TlsOption,
77    /// Maximum time that a channel may exist.
78    /// Useful when the server wants to control the reconnection of its clients.
79    /// Default to `None`, means infinite.
80    #[serde(with = "humantime_serde")]
81    pub max_connection_age: Option<Duration>,
82    /// The HTTP/2 keep-alive interval.
83    #[serde(with = "humantime_serde")]
84    pub http2_keep_alive_interval: Duration,
85    /// The HTTP/2 keep-alive timeout.
86    #[serde(with = "humantime_serde")]
87    pub http2_keep_alive_timeout: Duration,
88}
89
90impl GrpcOptions {
91    /// Detect the server address.
92    #[cfg(not(target_os = "android"))]
93    pub fn detect_server_addr(&mut self) {
94        if self.server_addr.is_empty() {
95            match local_ip_address::local_ip() {
96                Ok(ip) => {
97                    let detected_addr = format!(
98                        "{}:{}",
99                        ip,
100                        self.bind_addr
101                            .split(':')
102                            .nth(1)
103                            .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
104                    );
105                    info!("Using detected: {} as server address", detected_addr);
106                    self.server_addr = detected_addr;
107                }
108                Err(e) => {
109                    error!("Failed to detect local ip address: {}", e);
110                }
111            }
112        }
113    }
114
115    #[cfg(target_os = "android")]
116    pub fn detect_server_addr(&mut self) {
117        if self.server_addr.is_empty() {
118            common_telemetry::debug!("detect local IP is not supported on Android");
119        }
120    }
121
122    /// Create a [GrpcServerConfig] from self's options.
123    pub fn as_config(&self) -> GrpcServerConfig {
124        GrpcServerConfig {
125            max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
126            max_send_message_size: self.max_send_message_size.as_bytes() as usize,
127            tls: self.tls.clone(),
128            max_connection_age: self.max_connection_age,
129        }
130    }
131}
132
133const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
134
135const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
136
137impl Default for GrpcOptions {
138    fn default() -> Self {
139        Self {
140            bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
141            // If hostname is not set, the server will use the local ip address as the hostname.
142            server_addr: String::new(),
143            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
144            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
145            flight_compression: FlightCompression::ArrowIpc,
146            runtime_size: 8,
147            tls: TlsOption::default(),
148            max_connection_age: None,
149            http2_keep_alive_interval: Duration::from_secs(10),
150            http2_keep_alive_timeout: Duration::from_secs(3),
151        }
152    }
153}
154
155impl GrpcOptions {
156    /// Default options for internal gRPC server.
157    /// The internal gRPC server is used for communication between different nodes in cluster.
158    /// It is not exposed to the outside world.
159    pub fn internal_default() -> Self {
160        Self {
161            bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
162            // If hostname is not set, the server will use the local ip address as the hostname.
163            server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
164            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
165            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
166            flight_compression: FlightCompression::ArrowIpc,
167            runtime_size: 8,
168            tls: TlsOption::default(),
169            max_connection_age: None,
170            http2_keep_alive_interval: Duration::from_secs(10),
171            http2_keep_alive_timeout: Duration::from_secs(3),
172        }
173    }
174
175    pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
176        self.bind_addr = bind_addr.to_string();
177        self
178    }
179
180    pub fn with_server_addr(mut self, server_addr: &str) -> Self {
181        self.server_addr = server_addr.to_string();
182        self
183    }
184}
185
186#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
187#[serde(rename_all = "snake_case")]
188pub enum FlightCompression {
189    /// Disable all compression in Arrow Flight service.
190    #[default]
191    None,
192    /// Enable only transport layer compression (zstd).
193    Transport,
194    /// Enable only payload compression (lz4)
195    ArrowIpc,
196    /// Enable all compression.
197    All,
198}
199
200impl FlightCompression {
201    pub fn transport_compression(&self) -> bool {
202        self == &FlightCompression::Transport || self == &FlightCompression::All
203    }
204
205    pub fn arrow_compression(&self) -> bool {
206        self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
207    }
208}
209
210pub struct GrpcServer {
211    // states
212    shutdown_tx: Mutex<Option<Sender<()>>>,
213    /// gRPC serving state receiver. Only present if the gRPC server is started.
214    /// Used to wait for the server to stop, performing the old blocking fashion.
215    serve_state: Mutex<Option<Receiver<Result<()>>>>,
216    // handlers
217    routes: Mutex<Option<Routes>>,
218    // tls config
219    tls_config: Option<ServerTlsConfig>,
220    // Otel arrow service
221    otel_arrow_service: Mutex<
222        Option<
223            InterceptedService<
224                ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
225                HeaderInterceptor,
226            >,
227        >,
228    >,
229    bind_addr: Option<SocketAddr>,
230    name: Option<String>,
231    config: GrpcServerConfig,
232}
233
234/// Grpc Server configuration
235#[derive(Debug, Clone)]
236pub struct GrpcServerConfig {
237    // Max gRPC receiving(decoding) message size
238    pub max_recv_message_size: usize,
239    // Max gRPC sending(encoding) message size
240    pub max_send_message_size: usize,
241    pub tls: TlsOption,
242    /// Maximum time that a channel may exist.
243    /// Useful when the server wants to control the reconnection of its clients.
244    /// Default to `None`, means infinite.
245    pub max_connection_age: Option<Duration>,
246}
247
248impl Default for GrpcServerConfig {
249    fn default() -> Self {
250        Self {
251            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
252            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
253            tls: TlsOption::default(),
254            max_connection_age: None,
255        }
256    }
257}
258
259impl GrpcServer {
260    pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
261        HealthCheckServer::new(HealthCheckHandler)
262    }
263
264    pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
265        tonic_reflection::server::Builder::configure()
266            .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
267            .with_service_name("greptime.v1.GreptimeDatabase")
268            .with_service_name("greptime.v1.HealthCheck")
269            .with_service_name("greptime.v1.RegionServer")
270            .build_v1()
271            .inspect_err(|e| {
272                common_telemetry::error!(e; "Failed to build gRPC reflection server");
273            })
274            .unwrap()
275    }
276
277    pub async fn wait_for_serve(&self) -> Result<()> {
278        let mut serve_state = self.serve_state.lock().await;
279        let rx = serve_state.take().context(InternalSnafu {
280            err_msg: "gRPC serving state is unknown, maybe the server is not started, \
281                      or we have already waited for the serve result before.",
282        })?;
283        let Ok(result) = rx.await else {
284            warn!("Background gRPC serving task is quited before we can receive the serve result.");
285            return Ok(());
286        };
287        if let Err(e) = result {
288            error!(e; "GRPC serve error");
289        }
290        Ok(())
291    }
292}
293
294pub struct HealthCheckHandler;
295
296#[async_trait]
297impl HealthCheck for HealthCheckHandler {
298    async fn health_check(
299        &self,
300        _req: Request<HealthCheckRequest>,
301    ) -> TonicResult<Response<HealthCheckResponse>> {
302        Ok(Response::new(HealthCheckResponse {}))
303    }
304}
305
306pub const GRPC_SERVER: &str = "GRPC_SERVER";
307
308#[async_trait]
309impl Server for GrpcServer {
310    async fn shutdown(&self) -> Result<()> {
311        let mut shutdown_tx = self.shutdown_tx.lock().await;
312        if let Some(tx) = shutdown_tx.take()
313            && tx.send(()).is_err()
314        {
315            info!("Receiver dropped, the grpc server has already exited");
316        }
317        info!("Shutdown grpc server");
318
319        Ok(())
320    }
321
322    async fn start(&mut self, addr: SocketAddr) -> Result<()> {
323        let routes = {
324            let mut routes = self.routes.lock().await;
325            let Some(routes) = routes.take() else {
326                return AlreadyStartedSnafu {
327                    server: self.name(),
328                }
329                .fail();
330            };
331            routes
332        };
333
334        let (tx, rx) = oneshot::channel();
335        let (incoming, addr) = {
336            let mut shutdown_tx = self.shutdown_tx.lock().await;
337            ensure!(
338                shutdown_tx.is_none(),
339                AlreadyStartedSnafu { server: "gRPC" }
340            );
341
342            let listener = TcpListener::bind(addr)
343                .await
344                .context(TcpBindSnafu { addr })?;
345            let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
346            let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
347            info!("gRPC server(name={}) is bound to {}", self.name(), addr);
348
349            *shutdown_tx = Some(tx);
350
351            (incoming, addr)
352        };
353
354        let metrics_layer = tower::ServiceBuilder::new()
355            .layer(MetricsMiddlewareLayer)
356            .into_inner();
357
358        let mut builder = tonic::transport::Server::builder().layer(metrics_layer);
359        if let Some(tls_config) = self.tls_config.clone() {
360            builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
361        }
362
363        if let Some(max_connection_age) = self.config.max_connection_age {
364            builder = builder.max_connection_age(max_connection_age);
365        }
366
367        let mut builder = builder
368            .add_routes(routes)
369            .add_service(self.create_healthcheck_service())
370            .add_service(self.create_reflection_service());
371
372        if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
373            builder = builder.add_service(otel_arrow_service);
374        }
375
376        let (serve_state_tx, serve_state_rx) = oneshot::channel();
377        let mut serve_state = self.serve_state.lock().await;
378        *serve_state = Some(serve_state_rx);
379
380        let _handle = common_runtime::spawn_global(async move {
381            let result = builder
382                .serve_with_incoming_shutdown(incoming, rx.map(drop))
383                .await
384                .context(StartGrpcSnafu);
385            serve_state_tx.send(result)
386        });
387
388        self.bind_addr = Some(addr);
389        Ok(())
390    }
391
392    fn name(&self) -> &str {
393        if let Some(name) = &self.name {
394            name
395        } else {
396            GRPC_SERVER
397        }
398    }
399
400    fn bind_addr(&self) -> Option<SocketAddr> {
401        self.bind_addr
402    }
403
404    fn as_any(&self) -> &dyn Any {
405        self
406    }
407}