1pub mod builder;
16mod cancellation;
17pub mod context_auth;
18mod database;
19pub mod flight;
20pub mod frontend_grpc_handler;
21pub mod greptime_handler;
22pub mod memory_limit;
23pub mod prom_query_gateway;
24pub mod region_server;
25
26use std::any::Any;
27use std::net::SocketAddr;
28use std::time::Duration;
29
30use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
31use api::v1::{HealthCheckRequest, HealthCheckResponse};
32use async_trait::async_trait;
33use common_base::readable_size::ReadableSize;
34use common_grpc::channel_manager::{
35 DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
36};
37use common_telemetry::{error, info, warn};
38use futures::FutureExt;
39use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
40use serde::{Deserialize, Serialize};
41use snafu::{OptionExt, ResultExt, ensure};
42use tokio::net::TcpListener;
43use tokio::sync::Mutex;
44use tokio::sync::oneshot::{self, Receiver, Sender};
45use tonic::service::Routes;
46use tonic::service::interceptor::InterceptedService;
47use tonic::transport::ServerTlsConfig;
48use tonic::transport::server::TcpIncoming;
49use tonic::{Request, Response, Status};
50use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer};
51
52use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
53use crate::install_default_crypto_provider;
54use crate::metrics::MetricsMiddlewareLayer;
55use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
56use crate::query_handler::OpenTelemetryProtocolHandlerRef;
57use crate::server::Server;
58use crate::tls::TlsOption;
59
60type TonicResult<T> = std::result::Result<T, Status>;
61
62#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
63#[serde(default)]
64pub struct GrpcOptions {
65 pub bind_addr: String,
67 pub server_addr: String,
69 pub max_recv_message_size: ReadableSize,
71 pub max_send_message_size: ReadableSize,
73 pub flight_compression: FlightCompression,
75 pub runtime_size: usize,
76 #[serde(default = "Default::default")]
77 pub tls: TlsOption,
78 #[serde(with = "humantime_serde")]
82 pub max_connection_age: Option<Duration>,
83 #[serde(with = "humantime_serde")]
85 pub http2_keep_alive_interval: Duration,
86 #[serde(with = "humantime_serde")]
88 pub http2_keep_alive_timeout: Duration,
89}
90
91impl GrpcOptions {
92 #[cfg(not(target_os = "android"))]
94 pub fn detect_server_addr(&mut self) {
95 if self.server_addr.is_empty() {
96 match local_ip_address::local_ip() {
97 Ok(ip) => {
98 let detected_addr = format!(
99 "{}:{}",
100 ip,
101 self.bind_addr
102 .split(':')
103 .nth(1)
104 .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
105 );
106 info!("Using detected: {} as server address", detected_addr);
107 self.server_addr = detected_addr;
108 }
109 Err(e) => {
110 error!("Failed to detect local ip address: {}", e);
111 }
112 }
113 }
114 }
115
116 #[cfg(target_os = "android")]
117 pub fn detect_server_addr(&mut self) {
118 if self.server_addr.is_empty() {
119 common_telemetry::debug!("detect local IP is not supported on Android");
120 }
121 }
122
123 pub fn as_config(&self) -> GrpcServerConfig {
125 GrpcServerConfig {
126 max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
127 max_send_message_size: self.max_send_message_size.as_bytes() as usize,
128 tls: self.tls.clone(),
129 max_connection_age: self.max_connection_age,
130 }
131 }
132}
133
134const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
135
136const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
137
138impl Default for GrpcOptions {
139 fn default() -> Self {
140 Self {
141 bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
142 server_addr: String::new(),
144 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
145 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
146 flight_compression: FlightCompression::ArrowIpc,
147 runtime_size: 8,
148 tls: TlsOption::default(),
149 max_connection_age: None,
150 http2_keep_alive_interval: Duration::from_secs(10),
151 http2_keep_alive_timeout: Duration::from_secs(3),
152 }
153 }
154}
155
156impl GrpcOptions {
157 pub fn internal_default() -> Self {
161 Self {
162 bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
163 server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
165 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
166 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
167 flight_compression: FlightCompression::ArrowIpc,
168 runtime_size: 8,
169 tls: TlsOption::default(),
170 max_connection_age: None,
171 http2_keep_alive_interval: Duration::from_secs(10),
172 http2_keep_alive_timeout: Duration::from_secs(3),
173 }
174 }
175
176 pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
177 self.bind_addr = bind_addr.to_string();
178 self
179 }
180
181 pub fn with_server_addr(mut self, server_addr: &str) -> Self {
182 self.server_addr = server_addr.to_string();
183 self
184 }
185}
186
187#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
188#[serde(rename_all = "snake_case")]
189pub enum FlightCompression {
190 #[default]
192 None,
193 Transport,
195 ArrowIpc,
197 All,
199}
200
201impl FlightCompression {
202 pub fn transport_compression(&self) -> bool {
203 self == &FlightCompression::Transport || self == &FlightCompression::All
204 }
205
206 pub fn arrow_compression(&self) -> bool {
207 self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
208 }
209}
210
211pub struct GrpcServer {
212 shutdown_tx: Mutex<Option<Sender<()>>>,
214 serve_state: Mutex<Option<Receiver<Result<()>>>>,
217 routes: Mutex<Option<Routes>>,
219 tls_config: Option<ServerTlsConfig>,
221 otel_arrow_service: Mutex<
223 Option<
224 InterceptedService<
225 ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
226 HeaderInterceptor,
227 >,
228 >,
229 >,
230 bind_addr: Option<SocketAddr>,
231 name: Option<String>,
232 config: GrpcServerConfig,
233}
234
235#[derive(Debug, Clone)]
237pub struct GrpcServerConfig {
238 pub max_recv_message_size: usize,
240 pub max_send_message_size: usize,
242 pub tls: TlsOption,
243 pub max_connection_age: Option<Duration>,
247}
248
249impl Default for GrpcServerConfig {
250 fn default() -> Self {
251 Self {
252 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
253 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
254 tls: TlsOption::default(),
255 max_connection_age: None,
256 }
257 }
258}
259
260impl GrpcServer {
261 pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
262 HealthCheckServer::new(HealthCheckHandler)
263 }
264
265 pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
266 tonic_reflection::server::Builder::configure()
267 .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
268 .with_service_name("greptime.v1.GreptimeDatabase")
269 .with_service_name("greptime.v1.HealthCheck")
270 .with_service_name("greptime.v1.RegionServer")
271 .build_v1()
272 .inspect_err(|e| {
273 common_telemetry::error!(e; "Failed to build gRPC reflection server");
274 })
275 .unwrap()
276 }
277
278 pub async fn wait_for_serve(&self) -> Result<()> {
279 let mut serve_state = self.serve_state.lock().await;
280 let rx = serve_state.take().context(InternalSnafu {
281 err_msg: "gRPC serving state is unknown, maybe the server is not started, \
282 or we have already waited for the serve result before.",
283 })?;
284 let Ok(result) = rx.await else {
285 warn!("Background gRPC serving task is quited before we can receive the serve result.");
286 return Ok(());
287 };
288 if let Err(e) = result {
289 error!(e; "GRPC serve error");
290 }
291 Ok(())
292 }
293}
294
295pub struct HealthCheckHandler;
296
297#[async_trait]
298impl HealthCheck for HealthCheckHandler {
299 async fn health_check(
300 &self,
301 _req: Request<HealthCheckRequest>,
302 ) -> TonicResult<Response<HealthCheckResponse>> {
303 Ok(Response::new(HealthCheckResponse {}))
304 }
305}
306
307pub const GRPC_SERVER: &str = "GRPC_SERVER";
308
309#[async_trait]
310impl Server for GrpcServer {
311 async fn shutdown(&self) -> Result<()> {
312 let mut shutdown_tx = self.shutdown_tx.lock().await;
313 if let Some(tx) = shutdown_tx.take()
314 && tx.send(()).is_err()
315 {
316 info!("Receiver dropped, the grpc server has already exited");
317 }
318 info!("Shutdown grpc server");
319
320 Ok(())
321 }
322
323 async fn start(&mut self, addr: SocketAddr) -> Result<()> {
324 let routes = {
325 let mut routes = self.routes.lock().await;
326 let Some(routes) = routes.take() else {
327 return AlreadyStartedSnafu {
328 server: self.name(),
329 }
330 .fail();
331 };
332 routes
333 };
334
335 let (tx, rx) = oneshot::channel();
336 let (incoming, addr) = {
337 let mut shutdown_tx = self.shutdown_tx.lock().await;
338 ensure!(
339 shutdown_tx.is_none(),
340 AlreadyStartedSnafu { server: "gRPC" }
341 );
342
343 let listener = TcpListener::bind(addr)
344 .await
345 .context(TcpBindSnafu { addr })?;
346 let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
347 let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
348 info!("gRPC server(name={}) is bound to {}", self.name(), addr);
349
350 *shutdown_tx = Some(tx);
351
352 (incoming, addr)
353 };
354
355 let metrics_layer = tower::ServiceBuilder::new()
356 .layer(MetricsMiddlewareLayer)
357 .into_inner();
358
359 let mut builder = tonic::transport::Server::builder()
360 .accept_http1(true)
361 .layer(metrics_layer)
362 .layer(tonic_web::GrpcWebLayer::new());
363
364 if let Some(tls_config) = self.tls_config.clone() {
365 if let Err(err) = install_default_crypto_provider() {
368 warn!("Failed to install default rustls crypto provider: {err}");
369 }
370 builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
371 }
372
373 if let Some(max_connection_age) = self.config.max_connection_age {
374 builder = builder.max_connection_age(max_connection_age);
375 }
376
377 let mut builder = builder
378 .add_routes(routes)
379 .add_service(self.create_healthcheck_service())
380 .add_service(self.create_reflection_service());
381
382 if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
383 builder = builder.add_service(otel_arrow_service);
384 }
385
386 let (serve_state_tx, serve_state_rx) = oneshot::channel();
387 let mut serve_state = self.serve_state.lock().await;
388 *serve_state = Some(serve_state_rx);
389
390 let _handle = common_runtime::spawn_global(async move {
391 let result = builder
392 .serve_with_incoming_shutdown(incoming, rx.map(drop))
393 .await
394 .context(StartGrpcSnafu);
395 serve_state_tx.send(result)
396 });
397
398 self.bind_addr = Some(addr);
399 Ok(())
400 }
401
402 fn name(&self) -> &str {
403 if let Some(name) = &self.name {
404 name
405 } else {
406 GRPC_SERVER
407 }
408 }
409
410 fn bind_addr(&self) -> Option<SocketAddr> {
411 self.bind_addr
412 }
413
414 fn as_any(&self) -> &dyn Any {
415 self
416 }
417}