servers/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod authorize;
16pub mod builder;
17mod cancellation;
18mod database;
19pub mod flight;
20pub mod frontend_grpc_handler;
21pub mod greptime_handler;
22pub mod prom_query_gateway;
23pub mod region_server;
24
25use std::net::SocketAddr;
26
27use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
28use api::v1::{HealthCheckRequest, HealthCheckResponse};
29use async_trait::async_trait;
30use common_base::readable_size::ReadableSize;
31use common_grpc::channel_manager::{
32    DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
33};
34use common_telemetry::{error, info, warn};
35use futures::FutureExt;
36use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
37use serde::{Deserialize, Serialize};
38use snafu::{ensure, OptionExt, ResultExt};
39use tokio::net::TcpListener;
40use tokio::sync::oneshot::{self, Receiver, Sender};
41use tokio::sync::Mutex;
42use tonic::service::interceptor::InterceptedService;
43use tonic::service::Routes;
44use tonic::transport::server::TcpIncoming;
45use tonic::transport::ServerTlsConfig;
46use tonic::{Request, Response, Status};
47use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer};
48
49use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
50use crate::metrics::MetricsMiddlewareLayer;
51use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
52use crate::query_handler::OpenTelemetryProtocolHandlerRef;
53use crate::server::Server;
54use crate::tls::TlsOption;
55
56type TonicResult<T> = std::result::Result<T, Status>;
57
58#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
59pub struct GrpcOptions {
60    /// The address to bind the gRPC server.
61    pub bind_addr: String,
62    /// The address to advertise to clients.
63    pub server_addr: String,
64    /// Max gRPC receiving(decoding) message size
65    pub max_recv_message_size: ReadableSize,
66    /// Max gRPC sending(encoding) message size
67    pub max_send_message_size: ReadableSize,
68    /// Compression mode in Arrow Flight service.
69    pub flight_compression: FlightCompression,
70    pub runtime_size: usize,
71    #[serde(default = "Default::default")]
72    pub tls: TlsOption,
73}
74
75impl GrpcOptions {
76    /// Detect the server address.
77    #[cfg(not(target_os = "android"))]
78    pub fn detect_server_addr(&mut self) {
79        if self.server_addr.is_empty() {
80            match local_ip_address::local_ip() {
81                Ok(ip) => {
82                    let detected_addr = format!(
83                        "{}:{}",
84                        ip,
85                        self.bind_addr
86                            .split(':')
87                            .nth(1)
88                            .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
89                    );
90                    info!("Using detected: {} as server address", detected_addr);
91                    self.server_addr = detected_addr;
92                }
93                Err(e) => {
94                    error!("Failed to detect local ip address: {}", e);
95                }
96            }
97        }
98    }
99
100    #[cfg(target_os = "android")]
101    pub fn detect_server_addr(&mut self) {
102        if self.server_addr.is_empty() {
103            common_telemetry::debug!("detect local IP is not supported on Android");
104        }
105    }
106
107    /// Create a [GrpcServerConfig] from self's options.
108    pub fn as_config(&self) -> GrpcServerConfig {
109        GrpcServerConfig {
110            max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
111            max_send_message_size: self.max_send_message_size.as_bytes() as usize,
112            tls: self.tls.clone(),
113        }
114    }
115}
116
117const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
118
119const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
120
121impl Default for GrpcOptions {
122    fn default() -> Self {
123        Self {
124            bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
125            // If hostname is not set, the server will use the local ip address as the hostname.
126            server_addr: String::new(),
127            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
128            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
129            flight_compression: FlightCompression::ArrowIpc,
130            runtime_size: 8,
131            tls: TlsOption::default(),
132        }
133    }
134}
135
136impl GrpcOptions {
137    /// Default options for internal gRPC server.
138    /// The internal gRPC server is used for communication between different nodes in cluster.
139    /// It is not exposed to the outside world.
140    pub fn internal_default() -> Self {
141        Self {
142            bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
143            // If hostname is not set, the server will use the local ip address as the hostname.
144            server_addr: String::new(),
145            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
146            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
147            flight_compression: FlightCompression::ArrowIpc,
148            runtime_size: 8,
149            tls: TlsOption::default(),
150        }
151    }
152
153    pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
154        self.bind_addr = bind_addr.to_string();
155        self
156    }
157
158    pub fn with_server_addr(mut self, server_addr: &str) -> Self {
159        self.server_addr = server_addr.to_string();
160        self
161    }
162}
163
164#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
165#[serde(rename_all = "snake_case")]
166pub enum FlightCompression {
167    /// Disable all compression in Arrow Flight service.
168    #[default]
169    None,
170    /// Enable only transport layer compression (zstd).
171    Transport,
172    /// Enable only payload compression (lz4)
173    ArrowIpc,
174    /// Enable all compression.
175    All,
176}
177
178impl FlightCompression {
179    pub fn transport_compression(&self) -> bool {
180        self == &FlightCompression::Transport || self == &FlightCompression::All
181    }
182
183    pub fn arrow_compression(&self) -> bool {
184        self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
185    }
186}
187
188pub struct GrpcServer {
189    // states
190    shutdown_tx: Mutex<Option<Sender<()>>>,
191    /// gRPC serving state receiver. Only present if the gRPC server is started.
192    /// Used to wait for the server to stop, performing the old blocking fashion.
193    serve_state: Mutex<Option<Receiver<Result<()>>>>,
194    // handlers
195    routes: Mutex<Option<Routes>>,
196    // tls config
197    tls_config: Option<ServerTlsConfig>,
198    // Otel arrow service
199    otel_arrow_service: Mutex<
200        Option<
201            InterceptedService<
202                ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
203                HeaderInterceptor,
204            >,
205        >,
206    >,
207    bind_addr: Option<SocketAddr>,
208    name: Option<String>,
209}
210
211/// Grpc Server configuration
212#[derive(Debug, Clone)]
213pub struct GrpcServerConfig {
214    // Max gRPC receiving(decoding) message size
215    pub max_recv_message_size: usize,
216    // Max gRPC sending(encoding) message size
217    pub max_send_message_size: usize,
218    pub tls: TlsOption,
219}
220
221impl Default for GrpcServerConfig {
222    fn default() -> Self {
223        Self {
224            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
225            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
226            tls: TlsOption::default(),
227        }
228    }
229}
230
231impl GrpcServer {
232    pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
233        HealthCheckServer::new(HealthCheckHandler)
234    }
235
236    pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
237        tonic_reflection::server::Builder::configure()
238            .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
239            .with_service_name("greptime.v1.GreptimeDatabase")
240            .with_service_name("greptime.v1.HealthCheck")
241            .with_service_name("greptime.v1.RegionServer")
242            .build_v1()
243            .inspect_err(|e| {
244                common_telemetry::error!(e; "Failed to build gRPC reflection server");
245            })
246            .unwrap()
247    }
248
249    pub async fn wait_for_serve(&self) -> Result<()> {
250        let mut serve_state = self.serve_state.lock().await;
251        let rx = serve_state.take().context(InternalSnafu {
252            err_msg: "gRPC serving state is unknown, maybe the server is not started, \
253                      or we have already waited for the serve result before.",
254        })?;
255        let Ok(result) = rx.await else {
256            warn!("Background gRPC serving task is quited before we can receive the serve result.");
257            return Ok(());
258        };
259        if let Err(e) = result {
260            error!(e; "GRPC serve error");
261        }
262        Ok(())
263    }
264}
265
266pub struct HealthCheckHandler;
267
268#[async_trait]
269impl HealthCheck for HealthCheckHandler {
270    async fn health_check(
271        &self,
272        _req: Request<HealthCheckRequest>,
273    ) -> TonicResult<Response<HealthCheckResponse>> {
274        Ok(Response::new(HealthCheckResponse {}))
275    }
276}
277
278pub const GRPC_SERVER: &str = "GRPC_SERVER";
279
280#[async_trait]
281impl Server for GrpcServer {
282    async fn shutdown(&self) -> Result<()> {
283        let mut shutdown_tx = self.shutdown_tx.lock().await;
284        if let Some(tx) = shutdown_tx.take() {
285            if tx.send(()).is_err() {
286                info!("Receiver dropped, the grpc server has already exited");
287            }
288        }
289        info!("Shutdown grpc server");
290
291        Ok(())
292    }
293
294    async fn start(&mut self, addr: SocketAddr) -> Result<()> {
295        let routes = {
296            let mut routes = self.routes.lock().await;
297            let Some(routes) = routes.take() else {
298                return AlreadyStartedSnafu {
299                    server: self.name(),
300                }
301                .fail();
302            };
303            routes
304        };
305
306        let (tx, rx) = oneshot::channel();
307        let (incoming, addr) = {
308            let mut shutdown_tx = self.shutdown_tx.lock().await;
309            ensure!(
310                shutdown_tx.is_none(),
311                AlreadyStartedSnafu { server: "gRPC" }
312            );
313
314            let listener = TcpListener::bind(addr)
315                .await
316                .context(TcpBindSnafu { addr })?;
317            let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
318            let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
319            info!("gRPC server(name={}) is bound to {}", self.name(), addr);
320
321            *shutdown_tx = Some(tx);
322
323            (incoming, addr)
324        };
325
326        let metrics_layer = tower::ServiceBuilder::new()
327            .layer(MetricsMiddlewareLayer)
328            .into_inner();
329
330        let mut builder = tonic::transport::Server::builder().layer(metrics_layer);
331        if let Some(tls_config) = self.tls_config.clone() {
332            builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
333        }
334
335        let mut builder = builder
336            .add_routes(routes)
337            .add_service(self.create_healthcheck_service())
338            .add_service(self.create_reflection_service());
339
340        if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
341            builder = builder.add_service(otel_arrow_service);
342        }
343
344        let (serve_state_tx, serve_state_rx) = oneshot::channel();
345        let mut serve_state = self.serve_state.lock().await;
346        *serve_state = Some(serve_state_rx);
347
348        let _handle = common_runtime::spawn_global(async move {
349            let result = builder
350                .serve_with_incoming_shutdown(incoming, rx.map(drop))
351                .await
352                .context(StartGrpcSnafu);
353            serve_state_tx.send(result)
354        });
355
356        self.bind_addr = Some(addr);
357        Ok(())
358    }
359
360    fn name(&self) -> &str {
361        if let Some(name) = &self.name {
362            name
363        } else {
364            GRPC_SERVER
365        }
366    }
367
368    fn bind_addr(&self) -> Option<SocketAddr> {
369        self.bind_addr
370    }
371}