1pub mod builder;
16mod cancellation;
17pub mod context_auth;
18mod database;
19pub mod flight;
20pub mod frontend_grpc_handler;
21pub mod greptime_handler;
22pub mod memory_limit;
23pub mod prom_query_gateway;
24pub mod region_server;
25
26use std::net::SocketAddr;
27use std::time::Duration;
28
29use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
30use api::v1::{HealthCheckRequest, HealthCheckResponse};
31use async_trait::async_trait;
32use common_base::readable_size::ReadableSize;
33use common_grpc::channel_manager::{
34 DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
35};
36use common_telemetry::{error, info, warn};
37use futures::FutureExt;
38use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
39use serde::{Deserialize, Serialize};
40use snafu::{OptionExt, ResultExt, ensure};
41use tokio::net::TcpListener;
42use tokio::sync::Mutex;
43use tokio::sync::oneshot::{self, Receiver, Sender};
44use tonic::service::Routes;
45use tonic::service::interceptor::InterceptedService;
46use tonic::transport::ServerTlsConfig;
47use tonic::transport::server::TcpIncoming;
48use tonic::{Request, Response, Status};
49use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer};
50
51use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
52use crate::metrics::MetricsMiddlewareLayer;
53use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
54use crate::query_handler::OpenTelemetryProtocolHandlerRef;
55use crate::request_limiter::RequestMemoryLimiter;
56use crate::server::Server;
57use crate::tls::TlsOption;
58
59type TonicResult<T> = std::result::Result<T, Status>;
60
61#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
62#[serde(default)]
63pub struct GrpcOptions {
64 pub bind_addr: String,
66 pub server_addr: String,
68 pub max_recv_message_size: ReadableSize,
70 pub max_send_message_size: ReadableSize,
72 pub max_total_message_memory: ReadableSize,
74 pub flight_compression: FlightCompression,
76 pub runtime_size: usize,
77 #[serde(default = "Default::default")]
78 pub tls: TlsOption,
79 #[serde(with = "humantime_serde")]
83 pub max_connection_age: Option<Duration>,
84}
85
86impl GrpcOptions {
87 #[cfg(not(target_os = "android"))]
89 pub fn detect_server_addr(&mut self) {
90 if self.server_addr.is_empty() {
91 match local_ip_address::local_ip() {
92 Ok(ip) => {
93 let detected_addr = format!(
94 "{}:{}",
95 ip,
96 self.bind_addr
97 .split(':')
98 .nth(1)
99 .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
100 );
101 info!("Using detected: {} as server address", detected_addr);
102 self.server_addr = detected_addr;
103 }
104 Err(e) => {
105 error!("Failed to detect local ip address: {}", e);
106 }
107 }
108 }
109 }
110
111 #[cfg(target_os = "android")]
112 pub fn detect_server_addr(&mut self) {
113 if self.server_addr.is_empty() {
114 common_telemetry::debug!("detect local IP is not supported on Android");
115 }
116 }
117
118 pub fn as_config(&self) -> GrpcServerConfig {
120 GrpcServerConfig {
121 max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
122 max_send_message_size: self.max_send_message_size.as_bytes() as usize,
123 max_total_message_memory: self.max_total_message_memory.as_bytes() as usize,
124 tls: self.tls.clone(),
125 max_connection_age: self.max_connection_age,
126 }
127 }
128}
129
130const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
131
132const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
133
134impl Default for GrpcOptions {
135 fn default() -> Self {
136 Self {
137 bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
138 server_addr: String::new(),
140 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
141 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
142 max_total_message_memory: ReadableSize(0),
143 flight_compression: FlightCompression::ArrowIpc,
144 runtime_size: 8,
145 tls: TlsOption::default(),
146 max_connection_age: None,
147 }
148 }
149}
150
151impl GrpcOptions {
152 pub fn internal_default() -> Self {
156 Self {
157 bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
158 server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
160 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
161 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
162 max_total_message_memory: ReadableSize(0),
163 flight_compression: FlightCompression::ArrowIpc,
164 runtime_size: 8,
165 tls: TlsOption::default(),
166 max_connection_age: None,
167 }
168 }
169
170 pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
171 self.bind_addr = bind_addr.to_string();
172 self
173 }
174
175 pub fn with_server_addr(mut self, server_addr: &str) -> Self {
176 self.server_addr = server_addr.to_string();
177 self
178 }
179}
180
181#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
182#[serde(rename_all = "snake_case")]
183pub enum FlightCompression {
184 #[default]
186 None,
187 Transport,
189 ArrowIpc,
191 All,
193}
194
195impl FlightCompression {
196 pub fn transport_compression(&self) -> bool {
197 self == &FlightCompression::Transport || self == &FlightCompression::All
198 }
199
200 pub fn arrow_compression(&self) -> bool {
201 self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
202 }
203}
204
205pub struct GrpcServer {
206 shutdown_tx: Mutex<Option<Sender<()>>>,
208 serve_state: Mutex<Option<Receiver<Result<()>>>>,
211 routes: Mutex<Option<Routes>>,
213 tls_config: Option<ServerTlsConfig>,
215 otel_arrow_service: Mutex<
217 Option<
218 InterceptedService<
219 ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
220 HeaderInterceptor,
221 >,
222 >,
223 >,
224 bind_addr: Option<SocketAddr>,
225 name: Option<String>,
226 config: GrpcServerConfig,
227 memory_limiter: RequestMemoryLimiter,
228}
229
230#[derive(Debug, Clone)]
232pub struct GrpcServerConfig {
233 pub max_recv_message_size: usize,
235 pub max_send_message_size: usize,
237 pub max_total_message_memory: usize,
239 pub tls: TlsOption,
240 pub max_connection_age: Option<Duration>,
244}
245
246impl Default for GrpcServerConfig {
247 fn default() -> Self {
248 Self {
249 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
250 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
251 max_total_message_memory: 0,
252 tls: TlsOption::default(),
253 max_connection_age: None,
254 }
255 }
256}
257
258impl GrpcServer {
259 pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
260 HealthCheckServer::new(HealthCheckHandler)
261 }
262
263 pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
264 tonic_reflection::server::Builder::configure()
265 .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
266 .with_service_name("greptime.v1.GreptimeDatabase")
267 .with_service_name("greptime.v1.HealthCheck")
268 .with_service_name("greptime.v1.RegionServer")
269 .build_v1()
270 .inspect_err(|e| {
271 common_telemetry::error!(e; "Failed to build gRPC reflection server");
272 })
273 .unwrap()
274 }
275
276 pub async fn wait_for_serve(&self) -> Result<()> {
277 let mut serve_state = self.serve_state.lock().await;
278 let rx = serve_state.take().context(InternalSnafu {
279 err_msg: "gRPC serving state is unknown, maybe the server is not started, \
280 or we have already waited for the serve result before.",
281 })?;
282 let Ok(result) = rx.await else {
283 warn!("Background gRPC serving task is quited before we can receive the serve result.");
284 return Ok(());
285 };
286 if let Err(e) = result {
287 error!(e; "GRPC serve error");
288 }
289 Ok(())
290 }
291
292 pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
294 &self.memory_limiter
295 }
296}
297
298pub struct HealthCheckHandler;
299
300#[async_trait]
301impl HealthCheck for HealthCheckHandler {
302 async fn health_check(
303 &self,
304 _req: Request<HealthCheckRequest>,
305 ) -> TonicResult<Response<HealthCheckResponse>> {
306 Ok(Response::new(HealthCheckResponse {}))
307 }
308}
309
310pub const GRPC_SERVER: &str = "GRPC_SERVER";
311
312#[async_trait]
313impl Server for GrpcServer {
314 async fn shutdown(&self) -> Result<()> {
315 let mut shutdown_tx = self.shutdown_tx.lock().await;
316 if let Some(tx) = shutdown_tx.take()
317 && tx.send(()).is_err()
318 {
319 info!("Receiver dropped, the grpc server has already exited");
320 }
321 info!("Shutdown grpc server");
322
323 Ok(())
324 }
325
326 async fn start(&mut self, addr: SocketAddr) -> Result<()> {
327 let routes = {
328 let mut routes = self.routes.lock().await;
329 let Some(routes) = routes.take() else {
330 return AlreadyStartedSnafu {
331 server: self.name(),
332 }
333 .fail();
334 };
335 routes
336 };
337
338 let (tx, rx) = oneshot::channel();
339 let (incoming, addr) = {
340 let mut shutdown_tx = self.shutdown_tx.lock().await;
341 ensure!(
342 shutdown_tx.is_none(),
343 AlreadyStartedSnafu { server: "gRPC" }
344 );
345
346 let listener = TcpListener::bind(addr)
347 .await
348 .context(TcpBindSnafu { addr })?;
349 let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
350 let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
351 info!("gRPC server(name={}) is bound to {}", self.name(), addr);
352
353 *shutdown_tx = Some(tx);
354
355 (incoming, addr)
356 };
357
358 let metrics_layer = tower::ServiceBuilder::new()
359 .layer(MetricsMiddlewareLayer)
360 .into_inner();
361
362 let mut builder = tonic::transport::Server::builder().layer(metrics_layer);
363 if let Some(tls_config) = self.tls_config.clone() {
364 builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
365 }
366
367 if let Some(max_connection_age) = self.config.max_connection_age {
368 builder = builder.max_connection_age(max_connection_age);
369 }
370
371 let mut builder = builder
372 .add_routes(routes)
373 .add_service(self.create_healthcheck_service())
374 .add_service(self.create_reflection_service());
375
376 if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
377 builder = builder.add_service(otel_arrow_service);
378 }
379
380 let (serve_state_tx, serve_state_rx) = oneshot::channel();
381 let mut serve_state = self.serve_state.lock().await;
382 *serve_state = Some(serve_state_rx);
383
384 let _handle = common_runtime::spawn_global(async move {
385 let result = builder
386 .serve_with_incoming_shutdown(incoming, rx.map(drop))
387 .await
388 .context(StartGrpcSnafu);
389 serve_state_tx.send(result)
390 });
391
392 self.bind_addr = Some(addr);
393 Ok(())
394 }
395
396 fn name(&self) -> &str {
397 if let Some(name) = &self.name {
398 name
399 } else {
400 GRPC_SERVER
401 }
402 }
403
404 fn bind_addr(&self) -> Option<SocketAddr> {
405 self.bind_addr
406 }
407}