servers/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod cancellation;
17pub mod context_auth;
18mod database;
19pub mod flight;
20pub mod frontend_grpc_handler;
21pub mod greptime_handler;
22pub mod prom_query_gateway;
23pub mod region_server;
24
25use std::net::SocketAddr;
26use std::time::Duration;
27
28use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
29use api::v1::{HealthCheckRequest, HealthCheckResponse};
30use async_trait::async_trait;
31use common_base::readable_size::ReadableSize;
32use common_grpc::channel_manager::{
33    DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
34};
35use common_telemetry::{error, info, warn};
36use futures::FutureExt;
37use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
38use serde::{Deserialize, Serialize};
39use snafu::{OptionExt, ResultExt, ensure};
40use tokio::net::TcpListener;
41use tokio::sync::Mutex;
42use tokio::sync::oneshot::{self, Receiver, Sender};
43use tonic::service::Routes;
44use tonic::service::interceptor::InterceptedService;
45use tonic::transport::ServerTlsConfig;
46use tonic::transport::server::TcpIncoming;
47use tonic::{Request, Response, Status};
48use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer};
49
50use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
51use crate::metrics::MetricsMiddlewareLayer;
52use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
53use crate::query_handler::OpenTelemetryProtocolHandlerRef;
54use crate::server::Server;
55use crate::tls::TlsOption;
56
57type TonicResult<T> = std::result::Result<T, Status>;
58
59#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
60#[serde(default)]
61pub struct GrpcOptions {
62    /// The address to bind the gRPC server.
63    pub bind_addr: String,
64    /// The address to advertise to clients.
65    pub server_addr: String,
66    /// Max gRPC receiving(decoding) message size
67    pub max_recv_message_size: ReadableSize,
68    /// Max gRPC sending(encoding) message size
69    pub max_send_message_size: ReadableSize,
70    /// Compression mode in Arrow Flight service.
71    pub flight_compression: FlightCompression,
72    pub runtime_size: usize,
73    #[serde(default = "Default::default")]
74    pub tls: TlsOption,
75    /// Maximum time that a channel may exist.
76    /// Useful when the server wants to control the reconnection of its clients.
77    /// Default to `None`, means infinite.
78    #[serde(with = "humantime_serde")]
79    pub max_connection_age: Option<Duration>,
80}
81
82impl GrpcOptions {
83    /// Detect the server address.
84    #[cfg(not(target_os = "android"))]
85    pub fn detect_server_addr(&mut self) {
86        if self.server_addr.is_empty() {
87            match local_ip_address::local_ip() {
88                Ok(ip) => {
89                    let detected_addr = format!(
90                        "{}:{}",
91                        ip,
92                        self.bind_addr
93                            .split(':')
94                            .nth(1)
95                            .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
96                    );
97                    info!("Using detected: {} as server address", detected_addr);
98                    self.server_addr = detected_addr;
99                }
100                Err(e) => {
101                    error!("Failed to detect local ip address: {}", e);
102                }
103            }
104        }
105    }
106
107    #[cfg(target_os = "android")]
108    pub fn detect_server_addr(&mut self) {
109        if self.server_addr.is_empty() {
110            common_telemetry::debug!("detect local IP is not supported on Android");
111        }
112    }
113
114    /// Create a [GrpcServerConfig] from self's options.
115    pub fn as_config(&self) -> GrpcServerConfig {
116        GrpcServerConfig {
117            max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
118            max_send_message_size: self.max_send_message_size.as_bytes() as usize,
119            tls: self.tls.clone(),
120            max_connection_age: self.max_connection_age,
121        }
122    }
123}
124
125const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
126
127const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
128
129impl Default for GrpcOptions {
130    fn default() -> Self {
131        Self {
132            bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
133            // If hostname is not set, the server will use the local ip address as the hostname.
134            server_addr: String::new(),
135            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
136            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
137            flight_compression: FlightCompression::ArrowIpc,
138            runtime_size: 8,
139            tls: TlsOption::default(),
140            max_connection_age: None,
141        }
142    }
143}
144
145impl GrpcOptions {
146    /// Default options for internal gRPC server.
147    /// The internal gRPC server is used for communication between different nodes in cluster.
148    /// It is not exposed to the outside world.
149    pub fn internal_default() -> Self {
150        Self {
151            bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
152            // If hostname is not set, the server will use the local ip address as the hostname.
153            server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
154            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
155            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
156            flight_compression: FlightCompression::ArrowIpc,
157            runtime_size: 8,
158            tls: TlsOption::default(),
159            max_connection_age: None,
160        }
161    }
162
163    pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
164        self.bind_addr = bind_addr.to_string();
165        self
166    }
167
168    pub fn with_server_addr(mut self, server_addr: &str) -> Self {
169        self.server_addr = server_addr.to_string();
170        self
171    }
172}
173
174#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
175#[serde(rename_all = "snake_case")]
176pub enum FlightCompression {
177    /// Disable all compression in Arrow Flight service.
178    #[default]
179    None,
180    /// Enable only transport layer compression (zstd).
181    Transport,
182    /// Enable only payload compression (lz4)
183    ArrowIpc,
184    /// Enable all compression.
185    All,
186}
187
188impl FlightCompression {
189    pub fn transport_compression(&self) -> bool {
190        self == &FlightCompression::Transport || self == &FlightCompression::All
191    }
192
193    pub fn arrow_compression(&self) -> bool {
194        self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
195    }
196}
197
198pub struct GrpcServer {
199    // states
200    shutdown_tx: Mutex<Option<Sender<()>>>,
201    /// gRPC serving state receiver. Only present if the gRPC server is started.
202    /// Used to wait for the server to stop, performing the old blocking fashion.
203    serve_state: Mutex<Option<Receiver<Result<()>>>>,
204    // handlers
205    routes: Mutex<Option<Routes>>,
206    // tls config
207    tls_config: Option<ServerTlsConfig>,
208    // Otel arrow service
209    otel_arrow_service: Mutex<
210        Option<
211            InterceptedService<
212                ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
213                HeaderInterceptor,
214            >,
215        >,
216    >,
217    bind_addr: Option<SocketAddr>,
218    name: Option<String>,
219    config: GrpcServerConfig,
220}
221
222/// Grpc Server configuration
223#[derive(Debug, Clone)]
224pub struct GrpcServerConfig {
225    // Max gRPC receiving(decoding) message size
226    pub max_recv_message_size: usize,
227    // Max gRPC sending(encoding) message size
228    pub max_send_message_size: usize,
229    pub tls: TlsOption,
230    /// Maximum time that a channel may exist.
231    /// Useful when the server wants to control the reconnection of its clients.
232    /// Default to `None`, means infinite.
233    pub max_connection_age: Option<Duration>,
234}
235
236impl Default for GrpcServerConfig {
237    fn default() -> Self {
238        Self {
239            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
240            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
241            tls: TlsOption::default(),
242            max_connection_age: None,
243        }
244    }
245}
246
247impl GrpcServer {
248    pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
249        HealthCheckServer::new(HealthCheckHandler)
250    }
251
252    pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
253        tonic_reflection::server::Builder::configure()
254            .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
255            .with_service_name("greptime.v1.GreptimeDatabase")
256            .with_service_name("greptime.v1.HealthCheck")
257            .with_service_name("greptime.v1.RegionServer")
258            .build_v1()
259            .inspect_err(|e| {
260                common_telemetry::error!(e; "Failed to build gRPC reflection server");
261            })
262            .unwrap()
263    }
264
265    pub async fn wait_for_serve(&self) -> Result<()> {
266        let mut serve_state = self.serve_state.lock().await;
267        let rx = serve_state.take().context(InternalSnafu {
268            err_msg: "gRPC serving state is unknown, maybe the server is not started, \
269                      or we have already waited for the serve result before.",
270        })?;
271        let Ok(result) = rx.await else {
272            warn!("Background gRPC serving task is quited before we can receive the serve result.");
273            return Ok(());
274        };
275        if let Err(e) = result {
276            error!(e; "GRPC serve error");
277        }
278        Ok(())
279    }
280}
281
282pub struct HealthCheckHandler;
283
284#[async_trait]
285impl HealthCheck for HealthCheckHandler {
286    async fn health_check(
287        &self,
288        _req: Request<HealthCheckRequest>,
289    ) -> TonicResult<Response<HealthCheckResponse>> {
290        Ok(Response::new(HealthCheckResponse {}))
291    }
292}
293
294pub const GRPC_SERVER: &str = "GRPC_SERVER";
295
296#[async_trait]
297impl Server for GrpcServer {
298    async fn shutdown(&self) -> Result<()> {
299        let mut shutdown_tx = self.shutdown_tx.lock().await;
300        if let Some(tx) = shutdown_tx.take()
301            && tx.send(()).is_err()
302        {
303            info!("Receiver dropped, the grpc server has already exited");
304        }
305        info!("Shutdown grpc server");
306
307        Ok(())
308    }
309
310    async fn start(&mut self, addr: SocketAddr) -> Result<()> {
311        let routes = {
312            let mut routes = self.routes.lock().await;
313            let Some(routes) = routes.take() else {
314                return AlreadyStartedSnafu {
315                    server: self.name(),
316                }
317                .fail();
318            };
319            routes
320        };
321
322        let (tx, rx) = oneshot::channel();
323        let (incoming, addr) = {
324            let mut shutdown_tx = self.shutdown_tx.lock().await;
325            ensure!(
326                shutdown_tx.is_none(),
327                AlreadyStartedSnafu { server: "gRPC" }
328            );
329
330            let listener = TcpListener::bind(addr)
331                .await
332                .context(TcpBindSnafu { addr })?;
333            let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
334            let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
335            info!("gRPC server(name={}) is bound to {}", self.name(), addr);
336
337            *shutdown_tx = Some(tx);
338
339            (incoming, addr)
340        };
341
342        let metrics_layer = tower::ServiceBuilder::new()
343            .layer(MetricsMiddlewareLayer)
344            .into_inner();
345
346        let mut builder = tonic::transport::Server::builder().layer(metrics_layer);
347        if let Some(tls_config) = self.tls_config.clone() {
348            builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
349        }
350
351        if let Some(max_connection_age) = self.config.max_connection_age {
352            builder = builder.max_connection_age(max_connection_age);
353        }
354
355        let mut builder = builder
356            .add_routes(routes)
357            .add_service(self.create_healthcheck_service())
358            .add_service(self.create_reflection_service());
359
360        if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
361            builder = builder.add_service(otel_arrow_service);
362        }
363
364        let (serve_state_tx, serve_state_rx) = oneshot::channel();
365        let mut serve_state = self.serve_state.lock().await;
366        *serve_state = Some(serve_state_rx);
367
368        let _handle = common_runtime::spawn_global(async move {
369            let result = builder
370                .serve_with_incoming_shutdown(incoming, rx.map(drop))
371                .await
372                .context(StartGrpcSnafu);
373            serve_state_tx.send(result)
374        });
375
376        self.bind_addr = Some(addr);
377        Ok(())
378    }
379
380    fn name(&self) -> &str {
381        if let Some(name) = &self.name {
382            name
383        } else {
384            GRPC_SERVER
385        }
386    }
387
388    fn bind_addr(&self) -> Option<SocketAddr> {
389        self.bind_addr
390    }
391}