Skip to main content

servers/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod cancellation;
17pub mod context_auth;
18mod database;
19pub mod flight;
20pub mod frontend_grpc_handler;
21pub mod greptime_handler;
22pub mod memory_limit;
23pub mod prom_query_gateway;
24pub mod region_server;
25
26use std::any::Any;
27use std::net::SocketAddr;
28use std::time::Duration;
29
30use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
31use api::v1::{HealthCheckRequest, HealthCheckResponse};
32use async_trait::async_trait;
33use common_base::readable_size::ReadableSize;
34use common_grpc::channel_manager::{
35    DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
36};
37use common_telemetry::{error, info, warn};
38use futures::FutureExt;
39use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
40use serde::{Deserialize, Serialize};
41use snafu::{OptionExt, ResultExt, ensure};
42use tokio::net::TcpListener;
43use tokio::sync::Mutex;
44use tokio::sync::oneshot::{self, Receiver, Sender};
45use tonic::service::Routes;
46use tonic::service::interceptor::InterceptedService;
47use tonic::transport::ServerTlsConfig;
48use tonic::transport::server::TcpIncoming;
49use tonic::{Request, Response, Status};
50use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer};
51
52use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
53use crate::install_default_crypto_provider;
54use crate::metrics::MetricsMiddlewareLayer;
55use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
56use crate::query_handler::OpenTelemetryProtocolHandlerRef;
57use crate::server::Server;
58use crate::tls::TlsOption;
59
60type TonicResult<T> = std::result::Result<T, Status>;
61
62#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
63#[serde(default)]
64pub struct GrpcOptions {
65    /// The address to bind the gRPC server.
66    pub bind_addr: String,
67    /// The address to advertise to clients.
68    pub server_addr: String,
69    /// Max gRPC receiving(decoding) message size
70    pub max_recv_message_size: ReadableSize,
71    /// Max gRPC sending(encoding) message size
72    pub max_send_message_size: ReadableSize,
73    /// Compression mode in Arrow Flight service.
74    pub flight_compression: FlightCompression,
75    pub runtime_size: usize,
76    #[serde(default = "Default::default")]
77    pub tls: TlsOption,
78    /// Maximum time that a channel may exist.
79    /// Useful when the server wants to control the reconnection of its clients.
80    /// Default to `None`, means infinite.
81    #[serde(with = "humantime_serde")]
82    pub max_connection_age: Option<Duration>,
83    /// The HTTP/2 keep-alive interval.
84    #[serde(with = "humantime_serde")]
85    pub http2_keep_alive_interval: Duration,
86    /// The HTTP/2 keep-alive timeout.
87    #[serde(with = "humantime_serde")]
88    pub http2_keep_alive_timeout: Duration,
89}
90
91impl GrpcOptions {
92    /// Detect the server address.
93    #[cfg(not(target_os = "android"))]
94    pub fn detect_server_addr(&mut self) {
95        if self.server_addr.is_empty() {
96            match local_ip_address::local_ip() {
97                Ok(ip) => {
98                    let detected_addr = format!(
99                        "{}:{}",
100                        ip,
101                        self.bind_addr
102                            .split(':')
103                            .nth(1)
104                            .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
105                    );
106                    info!("Using detected: {} as server address", detected_addr);
107                    self.server_addr = detected_addr;
108                }
109                Err(e) => {
110                    error!("Failed to detect local ip address: {}", e);
111                }
112            }
113        }
114    }
115
116    #[cfg(target_os = "android")]
117    pub fn detect_server_addr(&mut self) {
118        if self.server_addr.is_empty() {
119            common_telemetry::debug!("detect local IP is not supported on Android");
120        }
121    }
122
123    /// Create a [GrpcServerConfig] from self's options.
124    pub fn as_config(&self) -> GrpcServerConfig {
125        GrpcServerConfig {
126            max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
127            max_send_message_size: self.max_send_message_size.as_bytes() as usize,
128            tls: self.tls.clone(),
129            max_connection_age: self.max_connection_age,
130        }
131    }
132}
133
134const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
135
136const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
137
138impl Default for GrpcOptions {
139    fn default() -> Self {
140        Self {
141            bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
142            // If hostname is not set, the server will use the local ip address as the hostname.
143            server_addr: String::new(),
144            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
145            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
146            flight_compression: FlightCompression::ArrowIpc,
147            runtime_size: 8,
148            tls: TlsOption::default(),
149            max_connection_age: None,
150            http2_keep_alive_interval: Duration::from_secs(10),
151            http2_keep_alive_timeout: Duration::from_secs(3),
152        }
153    }
154}
155
156impl GrpcOptions {
157    /// Default options for internal gRPC server.
158    /// The internal gRPC server is used for communication between different nodes in cluster.
159    /// It is not exposed to the outside world.
160    pub fn internal_default() -> Self {
161        Self {
162            bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
163            // If hostname is not set, the server will use the local ip address as the hostname.
164            server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
165            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
166            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
167            flight_compression: FlightCompression::ArrowIpc,
168            runtime_size: 8,
169            tls: TlsOption::default(),
170            max_connection_age: None,
171            http2_keep_alive_interval: Duration::from_secs(10),
172            http2_keep_alive_timeout: Duration::from_secs(3),
173        }
174    }
175
176    pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
177        self.bind_addr = bind_addr.to_string();
178        self
179    }
180
181    pub fn with_server_addr(mut self, server_addr: &str) -> Self {
182        self.server_addr = server_addr.to_string();
183        self
184    }
185}
186
187#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
188#[serde(rename_all = "snake_case")]
189pub enum FlightCompression {
190    /// Disable all compression in Arrow Flight service.
191    #[default]
192    None,
193    /// Enable only transport layer compression (zstd).
194    Transport,
195    /// Enable only payload compression (lz4)
196    ArrowIpc,
197    /// Enable all compression.
198    All,
199}
200
201impl FlightCompression {
202    pub fn transport_compression(&self) -> bool {
203        self == &FlightCompression::Transport || self == &FlightCompression::All
204    }
205
206    pub fn arrow_compression(&self) -> bool {
207        self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
208    }
209}
210
211pub struct GrpcServer {
212    // states
213    shutdown_tx: Mutex<Option<Sender<()>>>,
214    /// gRPC serving state receiver. Only present if the gRPC server is started.
215    /// Used to wait for the server to stop, performing the old blocking fashion.
216    serve_state: Mutex<Option<Receiver<Result<()>>>>,
217    // handlers
218    routes: Mutex<Option<Routes>>,
219    // tls config
220    tls_config: Option<ServerTlsConfig>,
221    // Otel arrow service
222    otel_arrow_service: Mutex<
223        Option<
224            InterceptedService<
225                ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
226                HeaderInterceptor,
227            >,
228        >,
229    >,
230    bind_addr: Option<SocketAddr>,
231    name: Option<String>,
232    config: GrpcServerConfig,
233}
234
235/// Grpc Server configuration
236#[derive(Debug, Clone)]
237pub struct GrpcServerConfig {
238    // Max gRPC receiving(decoding) message size
239    pub max_recv_message_size: usize,
240    // Max gRPC sending(encoding) message size
241    pub max_send_message_size: usize,
242    pub tls: TlsOption,
243    /// Maximum time that a channel may exist.
244    /// Useful when the server wants to control the reconnection of its clients.
245    /// Default to `None`, means infinite.
246    pub max_connection_age: Option<Duration>,
247}
248
249impl Default for GrpcServerConfig {
250    fn default() -> Self {
251        Self {
252            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
253            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
254            tls: TlsOption::default(),
255            max_connection_age: None,
256        }
257    }
258}
259
260impl GrpcServer {
261    pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
262        HealthCheckServer::new(HealthCheckHandler)
263    }
264
265    pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
266        tonic_reflection::server::Builder::configure()
267            .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
268            .with_service_name("greptime.v1.GreptimeDatabase")
269            .with_service_name("greptime.v1.HealthCheck")
270            .with_service_name("greptime.v1.RegionServer")
271            .build_v1()
272            .inspect_err(|e| {
273                common_telemetry::error!(e; "Failed to build gRPC reflection server");
274            })
275            .unwrap()
276    }
277
278    pub async fn wait_for_serve(&self) -> Result<()> {
279        let mut serve_state = self.serve_state.lock().await;
280        let rx = serve_state.take().context(InternalSnafu {
281            err_msg: "gRPC serving state is unknown, maybe the server is not started, \
282                      or we have already waited for the serve result before.",
283        })?;
284        let Ok(result) = rx.await else {
285            warn!("Background gRPC serving task is quited before we can receive the serve result.");
286            return Ok(());
287        };
288        if let Err(e) = result {
289            error!(e; "GRPC serve error");
290        }
291        Ok(())
292    }
293}
294
295pub struct HealthCheckHandler;
296
297#[async_trait]
298impl HealthCheck for HealthCheckHandler {
299    async fn health_check(
300        &self,
301        _req: Request<HealthCheckRequest>,
302    ) -> TonicResult<Response<HealthCheckResponse>> {
303        Ok(Response::new(HealthCheckResponse {}))
304    }
305}
306
307pub const GRPC_SERVER: &str = "GRPC_SERVER";
308
309#[async_trait]
310impl Server for GrpcServer {
311    async fn shutdown(&self) -> Result<()> {
312        let mut shutdown_tx = self.shutdown_tx.lock().await;
313        if let Some(tx) = shutdown_tx.take()
314            && tx.send(()).is_err()
315        {
316            info!("Receiver dropped, the grpc server has already exited");
317        }
318        info!("Shutdown grpc server");
319
320        Ok(())
321    }
322
323    async fn start(&mut self, addr: SocketAddr) -> Result<()> {
324        let routes = {
325            let mut routes = self.routes.lock().await;
326            let Some(routes) = routes.take() else {
327                return AlreadyStartedSnafu {
328                    server: self.name(),
329                }
330                .fail();
331            };
332            routes
333        };
334
335        let (tx, rx) = oneshot::channel();
336        let (incoming, addr) = {
337            let mut shutdown_tx = self.shutdown_tx.lock().await;
338            ensure!(
339                shutdown_tx.is_none(),
340                AlreadyStartedSnafu { server: "gRPC" }
341            );
342
343            let listener = TcpListener::bind(addr)
344                .await
345                .context(TcpBindSnafu { addr })?;
346            let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
347            let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
348            info!("gRPC server(name={}) is bound to {}", self.name(), addr);
349
350            *shutdown_tx = Some(tx);
351
352            (incoming, addr)
353        };
354
355        let metrics_layer = tower::ServiceBuilder::new()
356            .layer(MetricsMiddlewareLayer)
357            .into_inner();
358
359        let mut builder = tonic::transport::Server::builder()
360            .accept_http1(true)
361            .layer(metrics_layer)
362            .layer(tonic_web::GrpcWebLayer::new());
363
364        if let Some(tls_config) = self.tls_config.clone() {
365            // tonic builds the underlying rustls server config here, which requires a
366            // process-level crypto provider to be installed first.
367            if let Err(err) = install_default_crypto_provider() {
368                warn!("Failed to install default rustls crypto provider: {err}");
369            }
370            builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
371        }
372
373        if let Some(max_connection_age) = self.config.max_connection_age {
374            builder = builder.max_connection_age(max_connection_age);
375        }
376
377        let mut builder = builder
378            .add_routes(routes)
379            .add_service(self.create_healthcheck_service())
380            .add_service(self.create_reflection_service());
381
382        if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
383            builder = builder.add_service(otel_arrow_service);
384        }
385
386        let (serve_state_tx, serve_state_rx) = oneshot::channel();
387        let mut serve_state = self.serve_state.lock().await;
388        *serve_state = Some(serve_state_rx);
389
390        let _handle = common_runtime::spawn_global(async move {
391            let result = builder
392                .serve_with_incoming_shutdown(incoming, rx.map(drop))
393                .await
394                .context(StartGrpcSnafu);
395            serve_state_tx.send(result)
396        });
397
398        self.bind_addr = Some(addr);
399        Ok(())
400    }
401
402    fn name(&self) -> &str {
403        if let Some(name) = &self.name {
404            name
405        } else {
406            GRPC_SERVER
407        }
408    }
409
410    fn bind_addr(&self) -> Option<SocketAddr> {
411        self.bind_addr
412    }
413
414    fn as_any(&self) -> &dyn Any {
415        self
416    }
417}