1pub mod builder;
16mod cancellation;
17pub mod context_auth;
18mod database;
19pub mod flight;
20pub mod frontend_grpc_handler;
21pub mod greptime_handler;
22pub mod memory_limit;
23pub mod prom_query_gateway;
24pub mod region_server;
25
26use std::any::Any;
27use std::net::SocketAddr;
28use std::time::Duration;
29
30use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
31use api::v1::{HealthCheckRequest, HealthCheckResponse};
32use async_trait::async_trait;
33use common_base::readable_size::ReadableSize;
34use common_grpc::channel_manager::{
35 DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
36};
37use common_telemetry::{error, info, warn};
38use futures::FutureExt;
39use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
40use serde::{Deserialize, Serialize};
41use snafu::{OptionExt, ResultExt, ensure};
42use tokio::net::TcpListener;
43use tokio::sync::Mutex;
44use tokio::sync::oneshot::{self, Receiver, Sender};
45use tonic::service::Routes;
46use tonic::service::interceptor::InterceptedService;
47use tonic::transport::ServerTlsConfig;
48use tonic::transport::server::TcpIncoming;
49use tonic::{Request, Response, Status};
50use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer};
51
52use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
53use crate::metrics::MetricsMiddlewareLayer;
54use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
55use crate::query_handler::OpenTelemetryProtocolHandlerRef;
56use crate::server::Server;
57use crate::tls::TlsOption;
58
59type TonicResult<T> = std::result::Result<T, Status>;
60
61#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
62#[serde(default)]
63pub struct GrpcOptions {
64 pub bind_addr: String,
66 pub server_addr: String,
68 pub max_recv_message_size: ReadableSize,
70 pub max_send_message_size: ReadableSize,
72 pub flight_compression: FlightCompression,
74 pub runtime_size: usize,
75 #[serde(default = "Default::default")]
76 pub tls: TlsOption,
77 #[serde(with = "humantime_serde")]
81 pub max_connection_age: Option<Duration>,
82 #[serde(with = "humantime_serde")]
84 pub http2_keep_alive_interval: Duration,
85 #[serde(with = "humantime_serde")]
87 pub http2_keep_alive_timeout: Duration,
88}
89
90impl GrpcOptions {
91 #[cfg(not(target_os = "android"))]
93 pub fn detect_server_addr(&mut self) {
94 if self.server_addr.is_empty() {
95 match local_ip_address::local_ip() {
96 Ok(ip) => {
97 let detected_addr = format!(
98 "{}:{}",
99 ip,
100 self.bind_addr
101 .split(':')
102 .nth(1)
103 .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
104 );
105 info!("Using detected: {} as server address", detected_addr);
106 self.server_addr = detected_addr;
107 }
108 Err(e) => {
109 error!("Failed to detect local ip address: {}", e);
110 }
111 }
112 }
113 }
114
115 #[cfg(target_os = "android")]
116 pub fn detect_server_addr(&mut self) {
117 if self.server_addr.is_empty() {
118 common_telemetry::debug!("detect local IP is not supported on Android");
119 }
120 }
121
122 pub fn as_config(&self) -> GrpcServerConfig {
124 GrpcServerConfig {
125 max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
126 max_send_message_size: self.max_send_message_size.as_bytes() as usize,
127 tls: self.tls.clone(),
128 max_connection_age: self.max_connection_age,
129 }
130 }
131}
132
133const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
134
135const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
136
137impl Default for GrpcOptions {
138 fn default() -> Self {
139 Self {
140 bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
141 server_addr: String::new(),
143 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
144 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
145 flight_compression: FlightCompression::ArrowIpc,
146 runtime_size: 8,
147 tls: TlsOption::default(),
148 max_connection_age: None,
149 http2_keep_alive_interval: Duration::from_secs(10),
150 http2_keep_alive_timeout: Duration::from_secs(3),
151 }
152 }
153}
154
155impl GrpcOptions {
156 pub fn internal_default() -> Self {
160 Self {
161 bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
162 server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
164 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
165 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
166 flight_compression: FlightCompression::ArrowIpc,
167 runtime_size: 8,
168 tls: TlsOption::default(),
169 max_connection_age: None,
170 http2_keep_alive_interval: Duration::from_secs(10),
171 http2_keep_alive_timeout: Duration::from_secs(3),
172 }
173 }
174
175 pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
176 self.bind_addr = bind_addr.to_string();
177 self
178 }
179
180 pub fn with_server_addr(mut self, server_addr: &str) -> Self {
181 self.server_addr = server_addr.to_string();
182 self
183 }
184}
185
186#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
187#[serde(rename_all = "snake_case")]
188pub enum FlightCompression {
189 #[default]
191 None,
192 Transport,
194 ArrowIpc,
196 All,
198}
199
200impl FlightCompression {
201 pub fn transport_compression(&self) -> bool {
202 self == &FlightCompression::Transport || self == &FlightCompression::All
203 }
204
205 pub fn arrow_compression(&self) -> bool {
206 self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
207 }
208}
209
210pub struct GrpcServer {
211 shutdown_tx: Mutex<Option<Sender<()>>>,
213 serve_state: Mutex<Option<Receiver<Result<()>>>>,
216 routes: Mutex<Option<Routes>>,
218 tls_config: Option<ServerTlsConfig>,
220 otel_arrow_service: Mutex<
222 Option<
223 InterceptedService<
224 ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
225 HeaderInterceptor,
226 >,
227 >,
228 >,
229 bind_addr: Option<SocketAddr>,
230 name: Option<String>,
231 config: GrpcServerConfig,
232}
233
234#[derive(Debug, Clone)]
236pub struct GrpcServerConfig {
237 pub max_recv_message_size: usize,
239 pub max_send_message_size: usize,
241 pub tls: TlsOption,
242 pub max_connection_age: Option<Duration>,
246}
247
248impl Default for GrpcServerConfig {
249 fn default() -> Self {
250 Self {
251 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
252 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
253 tls: TlsOption::default(),
254 max_connection_age: None,
255 }
256 }
257}
258
259impl GrpcServer {
260 pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
261 HealthCheckServer::new(HealthCheckHandler)
262 }
263
264 pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
265 tonic_reflection::server::Builder::configure()
266 .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
267 .with_service_name("greptime.v1.GreptimeDatabase")
268 .with_service_name("greptime.v1.HealthCheck")
269 .with_service_name("greptime.v1.RegionServer")
270 .build_v1()
271 .inspect_err(|e| {
272 common_telemetry::error!(e; "Failed to build gRPC reflection server");
273 })
274 .unwrap()
275 }
276
277 pub async fn wait_for_serve(&self) -> Result<()> {
278 let mut serve_state = self.serve_state.lock().await;
279 let rx = serve_state.take().context(InternalSnafu {
280 err_msg: "gRPC serving state is unknown, maybe the server is not started, \
281 or we have already waited for the serve result before.",
282 })?;
283 let Ok(result) = rx.await else {
284 warn!("Background gRPC serving task is quited before we can receive the serve result.");
285 return Ok(());
286 };
287 if let Err(e) = result {
288 error!(e; "GRPC serve error");
289 }
290 Ok(())
291 }
292}
293
294pub struct HealthCheckHandler;
295
296#[async_trait]
297impl HealthCheck for HealthCheckHandler {
298 async fn health_check(
299 &self,
300 _req: Request<HealthCheckRequest>,
301 ) -> TonicResult<Response<HealthCheckResponse>> {
302 Ok(Response::new(HealthCheckResponse {}))
303 }
304}
305
306pub const GRPC_SERVER: &str = "GRPC_SERVER";
307
308#[async_trait]
309impl Server for GrpcServer {
310 async fn shutdown(&self) -> Result<()> {
311 let mut shutdown_tx = self.shutdown_tx.lock().await;
312 if let Some(tx) = shutdown_tx.take()
313 && tx.send(()).is_err()
314 {
315 info!("Receiver dropped, the grpc server has already exited");
316 }
317 info!("Shutdown grpc server");
318
319 Ok(())
320 }
321
322 async fn start(&mut self, addr: SocketAddr) -> Result<()> {
323 let routes = {
324 let mut routes = self.routes.lock().await;
325 let Some(routes) = routes.take() else {
326 return AlreadyStartedSnafu {
327 server: self.name(),
328 }
329 .fail();
330 };
331 routes
332 };
333
334 let (tx, rx) = oneshot::channel();
335 let (incoming, addr) = {
336 let mut shutdown_tx = self.shutdown_tx.lock().await;
337 ensure!(
338 shutdown_tx.is_none(),
339 AlreadyStartedSnafu { server: "gRPC" }
340 );
341
342 let listener = TcpListener::bind(addr)
343 .await
344 .context(TcpBindSnafu { addr })?;
345 let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
346 let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
347 info!("gRPC server(name={}) is bound to {}", self.name(), addr);
348
349 *shutdown_tx = Some(tx);
350
351 (incoming, addr)
352 };
353
354 let metrics_layer = tower::ServiceBuilder::new()
355 .layer(MetricsMiddlewareLayer)
356 .into_inner();
357
358 let mut builder = tonic::transport::Server::builder().layer(metrics_layer);
359 if let Some(tls_config) = self.tls_config.clone() {
360 builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
361 }
362
363 if let Some(max_connection_age) = self.config.max_connection_age {
364 builder = builder.max_connection_age(max_connection_age);
365 }
366
367 let mut builder = builder
368 .add_routes(routes)
369 .add_service(self.create_healthcheck_service())
370 .add_service(self.create_reflection_service());
371
372 if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
373 builder = builder.add_service(otel_arrow_service);
374 }
375
376 let (serve_state_tx, serve_state_rx) = oneshot::channel();
377 let mut serve_state = self.serve_state.lock().await;
378 *serve_state = Some(serve_state_rx);
379
380 let _handle = common_runtime::spawn_global(async move {
381 let result = builder
382 .serve_with_incoming_shutdown(incoming, rx.map(drop))
383 .await
384 .context(StartGrpcSnafu);
385 serve_state_tx.send(result)
386 });
387
388 self.bind_addr = Some(addr);
389 Ok(())
390 }
391
392 fn name(&self) -> &str {
393 if let Some(name) = &self.name {
394 name
395 } else {
396 GRPC_SERVER
397 }
398 }
399
400 fn bind_addr(&self) -> Option<SocketAddr> {
401 self.bind_addr
402 }
403
404 fn as_any(&self) -> &dyn Any {
405 self
406 }
407}