servers/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod cancellation;
17pub mod context_auth;
18mod database;
19pub mod flight;
20pub mod frontend_grpc_handler;
21pub mod greptime_handler;
22pub mod prom_query_gateway;
23pub mod region_server;
24
25use std::net::SocketAddr;
26
27use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
28use api::v1::{HealthCheckRequest, HealthCheckResponse};
29use async_trait::async_trait;
30use common_base::readable_size::ReadableSize;
31use common_grpc::channel_manager::{
32    DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
33};
34use common_telemetry::{error, info, warn};
35use futures::FutureExt;
36use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
37use serde::{Deserialize, Serialize};
38use snafu::{OptionExt, ResultExt, ensure};
39use tokio::net::TcpListener;
40use tokio::sync::Mutex;
41use tokio::sync::oneshot::{self, Receiver, Sender};
42use tonic::service::Routes;
43use tonic::service::interceptor::InterceptedService;
44use tonic::transport::ServerTlsConfig;
45use tonic::transport::server::TcpIncoming;
46use tonic::{Request, Response, Status};
47use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer};
48
49use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
50use crate::metrics::MetricsMiddlewareLayer;
51use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
52use crate::query_handler::OpenTelemetryProtocolHandlerRef;
53use crate::server::Server;
54use crate::tls::TlsOption;
55
56type TonicResult<T> = std::result::Result<T, Status>;
57
58#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
59#[serde(default)]
60pub struct GrpcOptions {
61    /// The address to bind the gRPC server.
62    pub bind_addr: String,
63    /// The address to advertise to clients.
64    pub server_addr: String,
65    /// Max gRPC receiving(decoding) message size
66    pub max_recv_message_size: ReadableSize,
67    /// Max gRPC sending(encoding) message size
68    pub max_send_message_size: ReadableSize,
69    /// Compression mode in Arrow Flight service.
70    pub flight_compression: FlightCompression,
71    pub runtime_size: usize,
72    #[serde(default = "Default::default")]
73    pub tls: TlsOption,
74}
75
76impl GrpcOptions {
77    /// Detect the server address.
78    #[cfg(not(target_os = "android"))]
79    pub fn detect_server_addr(&mut self) {
80        if self.server_addr.is_empty() {
81            match local_ip_address::local_ip() {
82                Ok(ip) => {
83                    let detected_addr = format!(
84                        "{}:{}",
85                        ip,
86                        self.bind_addr
87                            .split(':')
88                            .nth(1)
89                            .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
90                    );
91                    info!("Using detected: {} as server address", detected_addr);
92                    self.server_addr = detected_addr;
93                }
94                Err(e) => {
95                    error!("Failed to detect local ip address: {}", e);
96                }
97            }
98        }
99    }
100
101    #[cfg(target_os = "android")]
102    pub fn detect_server_addr(&mut self) {
103        if self.server_addr.is_empty() {
104            common_telemetry::debug!("detect local IP is not supported on Android");
105        }
106    }
107
108    /// Create a [GrpcServerConfig] from self's options.
109    pub fn as_config(&self) -> GrpcServerConfig {
110        GrpcServerConfig {
111            max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
112            max_send_message_size: self.max_send_message_size.as_bytes() as usize,
113            tls: self.tls.clone(),
114        }
115    }
116}
117
118const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
119
120const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
121
122impl Default for GrpcOptions {
123    fn default() -> Self {
124        Self {
125            bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
126            // If hostname is not set, the server will use the local ip address as the hostname.
127            server_addr: String::new(),
128            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
129            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
130            flight_compression: FlightCompression::ArrowIpc,
131            runtime_size: 8,
132            tls: TlsOption::default(),
133        }
134    }
135}
136
137impl GrpcOptions {
138    /// Default options for internal gRPC server.
139    /// The internal gRPC server is used for communication between different nodes in cluster.
140    /// It is not exposed to the outside world.
141    pub fn internal_default() -> Self {
142        Self {
143            bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
144            // If hostname is not set, the server will use the local ip address as the hostname.
145            server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
146            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
147            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
148            flight_compression: FlightCompression::ArrowIpc,
149            runtime_size: 8,
150            tls: TlsOption::default(),
151        }
152    }
153
154    pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
155        self.bind_addr = bind_addr.to_string();
156        self
157    }
158
159    pub fn with_server_addr(mut self, server_addr: &str) -> Self {
160        self.server_addr = server_addr.to_string();
161        self
162    }
163}
164
165#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
166#[serde(rename_all = "snake_case")]
167pub enum FlightCompression {
168    /// Disable all compression in Arrow Flight service.
169    #[default]
170    None,
171    /// Enable only transport layer compression (zstd).
172    Transport,
173    /// Enable only payload compression (lz4)
174    ArrowIpc,
175    /// Enable all compression.
176    All,
177}
178
179impl FlightCompression {
180    pub fn transport_compression(&self) -> bool {
181        self == &FlightCompression::Transport || self == &FlightCompression::All
182    }
183
184    pub fn arrow_compression(&self) -> bool {
185        self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
186    }
187}
188
189pub struct GrpcServer {
190    // states
191    shutdown_tx: Mutex<Option<Sender<()>>>,
192    /// gRPC serving state receiver. Only present if the gRPC server is started.
193    /// Used to wait for the server to stop, performing the old blocking fashion.
194    serve_state: Mutex<Option<Receiver<Result<()>>>>,
195    // handlers
196    routes: Mutex<Option<Routes>>,
197    // tls config
198    tls_config: Option<ServerTlsConfig>,
199    // Otel arrow service
200    otel_arrow_service: Mutex<
201        Option<
202            InterceptedService<
203                ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
204                HeaderInterceptor,
205            >,
206        >,
207    >,
208    bind_addr: Option<SocketAddr>,
209    name: Option<String>,
210}
211
212/// Grpc Server configuration
213#[derive(Debug, Clone)]
214pub struct GrpcServerConfig {
215    // Max gRPC receiving(decoding) message size
216    pub max_recv_message_size: usize,
217    // Max gRPC sending(encoding) message size
218    pub max_send_message_size: usize,
219    pub tls: TlsOption,
220}
221
222impl Default for GrpcServerConfig {
223    fn default() -> Self {
224        Self {
225            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
226            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
227            tls: TlsOption::default(),
228        }
229    }
230}
231
232impl GrpcServer {
233    pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
234        HealthCheckServer::new(HealthCheckHandler)
235    }
236
237    pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
238        tonic_reflection::server::Builder::configure()
239            .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
240            .with_service_name("greptime.v1.GreptimeDatabase")
241            .with_service_name("greptime.v1.HealthCheck")
242            .with_service_name("greptime.v1.RegionServer")
243            .build_v1()
244            .inspect_err(|e| {
245                common_telemetry::error!(e; "Failed to build gRPC reflection server");
246            })
247            .unwrap()
248    }
249
250    pub async fn wait_for_serve(&self) -> Result<()> {
251        let mut serve_state = self.serve_state.lock().await;
252        let rx = serve_state.take().context(InternalSnafu {
253            err_msg: "gRPC serving state is unknown, maybe the server is not started, \
254                      or we have already waited for the serve result before.",
255        })?;
256        let Ok(result) = rx.await else {
257            warn!("Background gRPC serving task is quited before we can receive the serve result.");
258            return Ok(());
259        };
260        if let Err(e) = result {
261            error!(e; "GRPC serve error");
262        }
263        Ok(())
264    }
265}
266
267pub struct HealthCheckHandler;
268
269#[async_trait]
270impl HealthCheck for HealthCheckHandler {
271    async fn health_check(
272        &self,
273        _req: Request<HealthCheckRequest>,
274    ) -> TonicResult<Response<HealthCheckResponse>> {
275        Ok(Response::new(HealthCheckResponse {}))
276    }
277}
278
279pub const GRPC_SERVER: &str = "GRPC_SERVER";
280
281#[async_trait]
282impl Server for GrpcServer {
283    async fn shutdown(&self) -> Result<()> {
284        let mut shutdown_tx = self.shutdown_tx.lock().await;
285        if let Some(tx) = shutdown_tx.take()
286            && tx.send(()).is_err()
287        {
288            info!("Receiver dropped, the grpc server has already exited");
289        }
290        info!("Shutdown grpc server");
291
292        Ok(())
293    }
294
295    async fn start(&mut self, addr: SocketAddr) -> Result<()> {
296        let routes = {
297            let mut routes = self.routes.lock().await;
298            let Some(routes) = routes.take() else {
299                return AlreadyStartedSnafu {
300                    server: self.name(),
301                }
302                .fail();
303            };
304            routes
305        };
306
307        let (tx, rx) = oneshot::channel();
308        let (incoming, addr) = {
309            let mut shutdown_tx = self.shutdown_tx.lock().await;
310            ensure!(
311                shutdown_tx.is_none(),
312                AlreadyStartedSnafu { server: "gRPC" }
313            );
314
315            let listener = TcpListener::bind(addr)
316                .await
317                .context(TcpBindSnafu { addr })?;
318            let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
319            let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
320            info!("gRPC server(name={}) is bound to {}", self.name(), addr);
321
322            *shutdown_tx = Some(tx);
323
324            (incoming, addr)
325        };
326
327        let metrics_layer = tower::ServiceBuilder::new()
328            .layer(MetricsMiddlewareLayer)
329            .into_inner();
330
331        let mut builder = tonic::transport::Server::builder().layer(metrics_layer);
332        if let Some(tls_config) = self.tls_config.clone() {
333            builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
334        }
335
336        let mut builder = builder
337            .add_routes(routes)
338            .add_service(self.create_healthcheck_service())
339            .add_service(self.create_reflection_service());
340
341        if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
342            builder = builder.add_service(otel_arrow_service);
343        }
344
345        let (serve_state_tx, serve_state_rx) = oneshot::channel();
346        let mut serve_state = self.serve_state.lock().await;
347        *serve_state = Some(serve_state_rx);
348
349        let _handle = common_runtime::spawn_global(async move {
350            let result = builder
351                .serve_with_incoming_shutdown(incoming, rx.map(drop))
352                .await
353                .context(StartGrpcSnafu);
354            serve_state_tx.send(result)
355        });
356
357        self.bind_addr = Some(addr);
358        Ok(())
359    }
360
361    fn name(&self) -> &str {
362        if let Some(name) = &self.name {
363            name
364        } else {
365            GRPC_SERVER
366        }
367    }
368
369    fn bind_addr(&self) -> Option<SocketAddr> {
370        self.bind_addr
371    }
372}