1pub mod builder;
16mod cancellation;
17pub mod context_auth;
18mod database;
19pub mod flight;
20pub mod frontend_grpc_handler;
21pub mod greptime_handler;
22pub mod memory_limit;
23pub mod prom_query_gateway;
24pub mod region_server;
25
26use std::any::Any;
27use std::net::{IpAddr, SocketAddr};
28use std::time::Duration;
29
30use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
31use api::v1::{HealthCheckRequest, HealthCheckResponse};
32use async_trait::async_trait;
33use common_base::readable_size::ReadableSize;
34use common_grpc::channel_manager::{
35 DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
36};
37use common_telemetry::{error, info, warn};
38use futures::FutureExt;
39use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
40use serde::{Deserialize, Serialize};
41use snafu::{OptionExt, ResultExt, ensure};
42use tokio::net::TcpListener;
43use tokio::sync::Mutex;
44use tokio::sync::oneshot::{self, Receiver, Sender};
45use tonic::service::Routes;
46use tonic::service::interceptor::InterceptedService;
47use tonic::transport::ServerTlsConfig;
48use tonic::transport::server::TcpIncoming;
49use tonic::{Request, Response, Status};
50use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer};
51
52use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
53use crate::install_default_crypto_provider;
54use crate::metrics::MetricsMiddlewareLayer;
55use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
56use crate::query_handler::OpenTelemetryProtocolHandlerRef;
57use crate::server::Server;
58use crate::tls::TlsOption;
59
60type TonicResult<T> = std::result::Result<T, Status>;
61
62#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
63#[serde(default)]
64pub struct GrpcOptions {
65 pub bind_addr: String,
67 pub server_addr: String,
69 pub max_recv_message_size: ReadableSize,
71 pub max_send_message_size: ReadableSize,
73 pub flight_compression: FlightCompression,
75 pub runtime_size: usize,
76 #[serde(default = "Default::default")]
77 pub tls: TlsOption,
78 #[serde(with = "humantime_serde")]
82 pub max_connection_age: Option<Duration>,
83 #[serde(with = "humantime_serde")]
85 pub http2_keep_alive_interval: Duration,
86 #[serde(with = "humantime_serde")]
88 pub http2_keep_alive_timeout: Duration,
89}
90
91impl GrpcOptions {
92 #[cfg(not(target_os = "android"))]
94 pub fn detect_server_addr(&mut self) {
95 if self.server_addr.is_empty() {
96 match local_ip_address::local_ip() {
97 Ok(ip) => {
98 let port = port_from_bind_addr(&self.bind_addr);
99 let detected_addr = format_server_addr(ip, port);
100 info!("Using detected: {} as server address", detected_addr);
101 self.server_addr = detected_addr;
102 }
103 Err(e) => {
104 error!("Failed to detect local ip address: {}", e);
105 }
106 }
107 }
108 }
109
110 #[cfg(target_os = "android")]
111 pub fn detect_server_addr(&mut self) {
112 if self.server_addr.is_empty() {
113 common_telemetry::debug!("detect local IP is not supported on Android");
114 }
115 }
116
117 pub fn as_config(&self) -> GrpcServerConfig {
119 GrpcServerConfig {
120 max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
121 max_send_message_size: self.max_send_message_size.as_bytes() as usize,
122 tls: self.tls.clone(),
123 max_connection_age: self.max_connection_age,
124 }
125 }
126}
127
128const DEFAULT_GRPC_ADDR_PORT: u16 = 4001;
129
130fn port_from_bind_addr(bind_addr: &str) -> u16 {
131 bind_addr
132 .rsplit_once(':')
133 .and_then(|(_, port)| port.parse().ok())
134 .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
135}
136
137fn format_server_addr(ip: IpAddr, port: u16) -> String {
138 SocketAddr::new(ip, port).to_string()
139}
140
141const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
142
143impl Default for GrpcOptions {
144 fn default() -> Self {
145 Self {
146 bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
147 server_addr: String::new(),
149 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
150 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
151 flight_compression: FlightCompression::ArrowIpc,
152 runtime_size: 8,
153 tls: TlsOption::default(),
154 max_connection_age: None,
155 http2_keep_alive_interval: Duration::from_secs(10),
156 http2_keep_alive_timeout: Duration::from_secs(3),
157 }
158 }
159}
160
161impl GrpcOptions {
162 pub fn internal_default() -> Self {
166 Self {
167 bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
168 server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
170 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
171 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
172 flight_compression: FlightCompression::ArrowIpc,
173 runtime_size: 8,
174 tls: TlsOption::default(),
175 max_connection_age: None,
176 http2_keep_alive_interval: Duration::from_secs(10),
177 http2_keep_alive_timeout: Duration::from_secs(3),
178 }
179 }
180
181 pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
182 self.bind_addr = bind_addr.to_string();
183 self
184 }
185
186 pub fn with_server_addr(mut self, server_addr: &str) -> Self {
187 self.server_addr = server_addr.to_string();
188 self
189 }
190}
191
192#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
193#[serde(rename_all = "snake_case")]
194pub enum FlightCompression {
195 #[default]
197 None,
198 Transport,
200 ArrowIpc,
202 All,
204}
205
206impl FlightCompression {
207 pub fn transport_compression(&self) -> bool {
208 self == &FlightCompression::Transport || self == &FlightCompression::All
209 }
210
211 pub fn arrow_compression(&self) -> bool {
212 self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
213 }
214}
215
216pub struct GrpcServer {
217 shutdown_tx: Mutex<Option<Sender<()>>>,
219 serve_state: Mutex<Option<Receiver<Result<()>>>>,
222 routes: Mutex<Option<Routes>>,
224 tls_config: Option<ServerTlsConfig>,
226 otel_arrow_service: Mutex<
228 Option<
229 InterceptedService<
230 ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
231 HeaderInterceptor,
232 >,
233 >,
234 >,
235 bind_addr: Option<SocketAddr>,
236 name: Option<String>,
237 config: GrpcServerConfig,
238}
239
240#[derive(Debug, Clone)]
242pub struct GrpcServerConfig {
243 pub max_recv_message_size: usize,
245 pub max_send_message_size: usize,
247 pub tls: TlsOption,
248 pub max_connection_age: Option<Duration>,
252}
253
254impl Default for GrpcServerConfig {
255 fn default() -> Self {
256 Self {
257 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
258 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
259 tls: TlsOption::default(),
260 max_connection_age: None,
261 }
262 }
263}
264
265impl GrpcServer {
266 pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
267 HealthCheckServer::new(HealthCheckHandler)
268 }
269
270 pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
271 tonic_reflection::server::Builder::configure()
272 .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
273 .with_service_name("greptime.v1.GreptimeDatabase")
274 .with_service_name("greptime.v1.HealthCheck")
275 .with_service_name("greptime.v1.RegionServer")
276 .build_v1()
277 .inspect_err(|e| {
278 common_telemetry::error!(e; "Failed to build gRPC reflection server");
279 })
280 .unwrap()
281 }
282
283 pub async fn wait_for_serve(&self) -> Result<()> {
284 let mut serve_state = self.serve_state.lock().await;
285 let rx = serve_state.take().context(InternalSnafu {
286 err_msg: "gRPC serving state is unknown, maybe the server is not started, \
287 or we have already waited for the serve result before.",
288 })?;
289 let Ok(result) = rx.await else {
290 warn!("Background gRPC serving task is quited before we can receive the serve result.");
291 return Ok(());
292 };
293 if let Err(e) = result {
294 error!(e; "GRPC serve error");
295 }
296 Ok(())
297 }
298}
299
300pub struct HealthCheckHandler;
301
302#[async_trait]
303impl HealthCheck for HealthCheckHandler {
304 async fn health_check(
305 &self,
306 _req: Request<HealthCheckRequest>,
307 ) -> TonicResult<Response<HealthCheckResponse>> {
308 Ok(Response::new(HealthCheckResponse {}))
309 }
310}
311
312pub const GRPC_SERVER: &str = "GRPC_SERVER";
313
314#[async_trait]
315impl Server for GrpcServer {
316 async fn shutdown(&self) -> Result<()> {
317 let mut shutdown_tx = self.shutdown_tx.lock().await;
318 if let Some(tx) = shutdown_tx.take()
319 && tx.send(()).is_err()
320 {
321 info!("Receiver dropped, the grpc server has already exited");
322 }
323 info!("Shutdown grpc server");
324
325 Ok(())
326 }
327
328 async fn start(&mut self, addr: SocketAddr) -> Result<()> {
329 let routes = {
330 let mut routes = self.routes.lock().await;
331 let Some(routes) = routes.take() else {
332 return AlreadyStartedSnafu {
333 server: self.name(),
334 }
335 .fail();
336 };
337 routes
338 };
339
340 let (tx, rx) = oneshot::channel();
341 let (incoming, addr) = {
342 let mut shutdown_tx = self.shutdown_tx.lock().await;
343 ensure!(
344 shutdown_tx.is_none(),
345 AlreadyStartedSnafu { server: "gRPC" }
346 );
347
348 let listener = TcpListener::bind(addr)
349 .await
350 .context(TcpBindSnafu { addr })?;
351 let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
352 let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
353 info!("gRPC server(name={}) is bound to {}", self.name(), addr);
354
355 *shutdown_tx = Some(tx);
356
357 (incoming, addr)
358 };
359
360 let metrics_layer = tower::ServiceBuilder::new()
361 .layer(MetricsMiddlewareLayer)
362 .into_inner();
363
364 let mut builder = tonic::transport::Server::builder()
365 .accept_http1(true)
366 .layer(metrics_layer)
367 .layer(tonic_web::GrpcWebLayer::new());
368
369 if let Some(tls_config) = self.tls_config.clone() {
370 if let Err(err) = install_default_crypto_provider() {
373 warn!("Failed to install default rustls crypto provider: {err}");
374 }
375 builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
376 }
377
378 if let Some(max_connection_age) = self.config.max_connection_age {
379 builder = builder.max_connection_age(max_connection_age);
380 }
381
382 let mut builder = builder
383 .add_routes(routes)
384 .add_service(self.create_healthcheck_service())
385 .add_service(self.create_reflection_service());
386
387 if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
388 builder = builder.add_service(otel_arrow_service);
389 }
390
391 let (serve_state_tx, serve_state_rx) = oneshot::channel();
392 let mut serve_state = self.serve_state.lock().await;
393 *serve_state = Some(serve_state_rx);
394
395 let _handle = common_runtime::spawn_global(async move {
396 let result = builder
397 .serve_with_incoming_shutdown(incoming, rx.map(drop))
398 .await
399 .context(StartGrpcSnafu);
400 serve_state_tx.send(result)
401 });
402
403 self.bind_addr = Some(addr);
404 Ok(())
405 }
406
407 fn name(&self) -> &str {
408 if let Some(name) = &self.name {
409 name
410 } else {
411 GRPC_SERVER
412 }
413 }
414
415 fn bind_addr(&self) -> Option<SocketAddr> {
416 self.bind_addr
417 }
418
419 fn as_any(&self) -> &dyn Any {
420 self
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
427
428 use super::{DEFAULT_GRPC_ADDR_PORT, format_server_addr, port_from_bind_addr};
429
430 #[test]
431 fn test_port_from_bind_addr() {
432 assert_eq!(3002, port_from_bind_addr("127.0.0.1:3002"));
433 assert_eq!(3002, port_from_bind_addr("[::]:3002"));
434 assert_eq!(
435 3002,
436 port_from_bind_addr("greptimedb-metasrv.default.svc.cluster.local:3002")
437 );
438 assert_eq!(
439 DEFAULT_GRPC_ADDR_PORT,
440 port_from_bind_addr("invalid-bind-addr")
441 );
442 }
443
444 #[test]
445 fn test_format_server_addr() {
446 assert_eq!(
447 "127.0.0.1:3002",
448 format_server_addr(IpAddr::V4(Ipv4Addr::LOCALHOST), 3002)
449 );
450 assert_eq!(
451 "[::1]:3002",
452 format_server_addr(IpAddr::V6(Ipv6Addr::LOCALHOST), 3002)
453 );
454 }
455}