servers/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod cancellation;
17pub mod context_auth;
18mod database;
19pub mod flight;
20pub mod frontend_grpc_handler;
21pub mod greptime_handler;
22pub mod memory_limit;
23pub mod prom_query_gateway;
24pub mod region_server;
25
26use std::net::SocketAddr;
27use std::time::Duration;
28
29use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
30use api::v1::{HealthCheckRequest, HealthCheckResponse};
31use async_trait::async_trait;
32use common_base::readable_size::ReadableSize;
33use common_grpc::channel_manager::{
34    DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
35};
36use common_telemetry::{error, info, warn};
37use futures::FutureExt;
38use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
39use serde::{Deserialize, Serialize};
40use snafu::{OptionExt, ResultExt, ensure};
41use tokio::net::TcpListener;
42use tokio::sync::Mutex;
43use tokio::sync::oneshot::{self, Receiver, Sender};
44use tonic::service::Routes;
45use tonic::service::interceptor::InterceptedService;
46use tonic::transport::ServerTlsConfig;
47use tonic::transport::server::TcpIncoming;
48use tonic::{Request, Response, Status};
49use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer};
50
51use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
52use crate::metrics::MetricsMiddlewareLayer;
53use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
54use crate::query_handler::OpenTelemetryProtocolHandlerRef;
55use crate::request_limiter::RequestMemoryLimiter;
56use crate::server::Server;
57use crate::tls::TlsOption;
58
59type TonicResult<T> = std::result::Result<T, Status>;
60
61#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
62#[serde(default)]
63pub struct GrpcOptions {
64    /// The address to bind the gRPC server.
65    pub bind_addr: String,
66    /// The address to advertise to clients.
67    pub server_addr: String,
68    /// Max gRPC receiving(decoding) message size
69    pub max_recv_message_size: ReadableSize,
70    /// Max gRPC sending(encoding) message size
71    pub max_send_message_size: ReadableSize,
72    /// Maximum total memory for all concurrent gRPC request messages. 0 disables the limit.
73    pub max_total_message_memory: ReadableSize,
74    /// Compression mode in Arrow Flight service.
75    pub flight_compression: FlightCompression,
76    pub runtime_size: usize,
77    #[serde(default = "Default::default")]
78    pub tls: TlsOption,
79    /// Maximum time that a channel may exist.
80    /// Useful when the server wants to control the reconnection of its clients.
81    /// Default to `None`, means infinite.
82    #[serde(with = "humantime_serde")]
83    pub max_connection_age: Option<Duration>,
84}
85
86impl GrpcOptions {
87    /// Detect the server address.
88    #[cfg(not(target_os = "android"))]
89    pub fn detect_server_addr(&mut self) {
90        if self.server_addr.is_empty() {
91            match local_ip_address::local_ip() {
92                Ok(ip) => {
93                    let detected_addr = format!(
94                        "{}:{}",
95                        ip,
96                        self.bind_addr
97                            .split(':')
98                            .nth(1)
99                            .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
100                    );
101                    info!("Using detected: {} as server address", detected_addr);
102                    self.server_addr = detected_addr;
103                }
104                Err(e) => {
105                    error!("Failed to detect local ip address: {}", e);
106                }
107            }
108        }
109    }
110
111    #[cfg(target_os = "android")]
112    pub fn detect_server_addr(&mut self) {
113        if self.server_addr.is_empty() {
114            common_telemetry::debug!("detect local IP is not supported on Android");
115        }
116    }
117
118    /// Create a [GrpcServerConfig] from self's options.
119    pub fn as_config(&self) -> GrpcServerConfig {
120        GrpcServerConfig {
121            max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
122            max_send_message_size: self.max_send_message_size.as_bytes() as usize,
123            max_total_message_memory: self.max_total_message_memory.as_bytes() as usize,
124            tls: self.tls.clone(),
125            max_connection_age: self.max_connection_age,
126        }
127    }
128}
129
130const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
131
132const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
133
134impl Default for GrpcOptions {
135    fn default() -> Self {
136        Self {
137            bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
138            // If hostname is not set, the server will use the local ip address as the hostname.
139            server_addr: String::new(),
140            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
141            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
142            max_total_message_memory: ReadableSize(0),
143            flight_compression: FlightCompression::ArrowIpc,
144            runtime_size: 8,
145            tls: TlsOption::default(),
146            max_connection_age: None,
147        }
148    }
149}
150
151impl GrpcOptions {
152    /// Default options for internal gRPC server.
153    /// The internal gRPC server is used for communication between different nodes in cluster.
154    /// It is not exposed to the outside world.
155    pub fn internal_default() -> Self {
156        Self {
157            bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
158            // If hostname is not set, the server will use the local ip address as the hostname.
159            server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
160            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
161            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
162            max_total_message_memory: ReadableSize(0),
163            flight_compression: FlightCompression::ArrowIpc,
164            runtime_size: 8,
165            tls: TlsOption::default(),
166            max_connection_age: None,
167        }
168    }
169
170    pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
171        self.bind_addr = bind_addr.to_string();
172        self
173    }
174
175    pub fn with_server_addr(mut self, server_addr: &str) -> Self {
176        self.server_addr = server_addr.to_string();
177        self
178    }
179}
180
181#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
182#[serde(rename_all = "snake_case")]
183pub enum FlightCompression {
184    /// Disable all compression in Arrow Flight service.
185    #[default]
186    None,
187    /// Enable only transport layer compression (zstd).
188    Transport,
189    /// Enable only payload compression (lz4)
190    ArrowIpc,
191    /// Enable all compression.
192    All,
193}
194
195impl FlightCompression {
196    pub fn transport_compression(&self) -> bool {
197        self == &FlightCompression::Transport || self == &FlightCompression::All
198    }
199
200    pub fn arrow_compression(&self) -> bool {
201        self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
202    }
203}
204
205pub struct GrpcServer {
206    // states
207    shutdown_tx: Mutex<Option<Sender<()>>>,
208    /// gRPC serving state receiver. Only present if the gRPC server is started.
209    /// Used to wait for the server to stop, performing the old blocking fashion.
210    serve_state: Mutex<Option<Receiver<Result<()>>>>,
211    // handlers
212    routes: Mutex<Option<Routes>>,
213    // tls config
214    tls_config: Option<ServerTlsConfig>,
215    // Otel arrow service
216    otel_arrow_service: Mutex<
217        Option<
218            InterceptedService<
219                ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
220                HeaderInterceptor,
221            >,
222        >,
223    >,
224    bind_addr: Option<SocketAddr>,
225    name: Option<String>,
226    config: GrpcServerConfig,
227    memory_limiter: RequestMemoryLimiter,
228}
229
230/// Grpc Server configuration
231#[derive(Debug, Clone)]
232pub struct GrpcServerConfig {
233    // Max gRPC receiving(decoding) message size
234    pub max_recv_message_size: usize,
235    // Max gRPC sending(encoding) message size
236    pub max_send_message_size: usize,
237    /// Maximum total memory for all concurrent gRPC request messages. 0 disables the limit.
238    pub max_total_message_memory: usize,
239    pub tls: TlsOption,
240    /// Maximum time that a channel may exist.
241    /// Useful when the server wants to control the reconnection of its clients.
242    /// Default to `None`, means infinite.
243    pub max_connection_age: Option<Duration>,
244}
245
246impl Default for GrpcServerConfig {
247    fn default() -> Self {
248        Self {
249            max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
250            max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
251            max_total_message_memory: 0,
252            tls: TlsOption::default(),
253            max_connection_age: None,
254        }
255    }
256}
257
258impl GrpcServer {
259    pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
260        HealthCheckServer::new(HealthCheckHandler)
261    }
262
263    pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
264        tonic_reflection::server::Builder::configure()
265            .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
266            .with_service_name("greptime.v1.GreptimeDatabase")
267            .with_service_name("greptime.v1.HealthCheck")
268            .with_service_name("greptime.v1.RegionServer")
269            .build_v1()
270            .inspect_err(|e| {
271                common_telemetry::error!(e; "Failed to build gRPC reflection server");
272            })
273            .unwrap()
274    }
275
276    pub async fn wait_for_serve(&self) -> Result<()> {
277        let mut serve_state = self.serve_state.lock().await;
278        let rx = serve_state.take().context(InternalSnafu {
279            err_msg: "gRPC serving state is unknown, maybe the server is not started, \
280                      or we have already waited for the serve result before.",
281        })?;
282        let Ok(result) = rx.await else {
283            warn!("Background gRPC serving task is quited before we can receive the serve result.");
284            return Ok(());
285        };
286        if let Err(e) = result {
287            error!(e; "GRPC serve error");
288        }
289        Ok(())
290    }
291
292    /// Get the memory limiter for monitoring current memory usage
293    pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
294        &self.memory_limiter
295    }
296}
297
298pub struct HealthCheckHandler;
299
300#[async_trait]
301impl HealthCheck for HealthCheckHandler {
302    async fn health_check(
303        &self,
304        _req: Request<HealthCheckRequest>,
305    ) -> TonicResult<Response<HealthCheckResponse>> {
306        Ok(Response::new(HealthCheckResponse {}))
307    }
308}
309
310pub const GRPC_SERVER: &str = "GRPC_SERVER";
311
312#[async_trait]
313impl Server for GrpcServer {
314    async fn shutdown(&self) -> Result<()> {
315        let mut shutdown_tx = self.shutdown_tx.lock().await;
316        if let Some(tx) = shutdown_tx.take()
317            && tx.send(()).is_err()
318        {
319            info!("Receiver dropped, the grpc server has already exited");
320        }
321        info!("Shutdown grpc server");
322
323        Ok(())
324    }
325
326    async fn start(&mut self, addr: SocketAddr) -> Result<()> {
327        let routes = {
328            let mut routes = self.routes.lock().await;
329            let Some(routes) = routes.take() else {
330                return AlreadyStartedSnafu {
331                    server: self.name(),
332                }
333                .fail();
334            };
335            routes
336        };
337
338        let (tx, rx) = oneshot::channel();
339        let (incoming, addr) = {
340            let mut shutdown_tx = self.shutdown_tx.lock().await;
341            ensure!(
342                shutdown_tx.is_none(),
343                AlreadyStartedSnafu { server: "gRPC" }
344            );
345
346            let listener = TcpListener::bind(addr)
347                .await
348                .context(TcpBindSnafu { addr })?;
349            let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
350            let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
351            info!("gRPC server(name={}) is bound to {}", self.name(), addr);
352
353            *shutdown_tx = Some(tx);
354
355            (incoming, addr)
356        };
357
358        let metrics_layer = tower::ServiceBuilder::new()
359            .layer(MetricsMiddlewareLayer)
360            .into_inner();
361
362        let mut builder = tonic::transport::Server::builder().layer(metrics_layer);
363        if let Some(tls_config) = self.tls_config.clone() {
364            builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
365        }
366
367        if let Some(max_connection_age) = self.config.max_connection_age {
368            builder = builder.max_connection_age(max_connection_age);
369        }
370
371        let mut builder = builder
372            .add_routes(routes)
373            .add_service(self.create_healthcheck_service())
374            .add_service(self.create_reflection_service());
375
376        if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
377            builder = builder.add_service(otel_arrow_service);
378        }
379
380        let (serve_state_tx, serve_state_rx) = oneshot::channel();
381        let mut serve_state = self.serve_state.lock().await;
382        *serve_state = Some(serve_state_rx);
383
384        let _handle = common_runtime::spawn_global(async move {
385            let result = builder
386                .serve_with_incoming_shutdown(incoming, rx.map(drop))
387                .await
388                .context(StartGrpcSnafu);
389            serve_state_tx.send(result)
390        });
391
392        self.bind_addr = Some(addr);
393        Ok(())
394    }
395
396    fn name(&self) -> &str {
397        if let Some(name) = &self.name {
398            name
399        } else {
400            GRPC_SERVER
401        }
402    }
403
404    fn bind_addr(&self) -> Option<SocketAddr> {
405        self.bind_addr
406    }
407}