1pub mod builder;
16mod cancellation;
17pub mod context_auth;
18mod database;
19pub mod flight;
20pub mod frontend_grpc_handler;
21pub mod greptime_handler;
22pub mod prom_query_gateway;
23pub mod region_server;
24
25use std::net::SocketAddr;
26use std::time::Duration;
27
28use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
29use api::v1::{HealthCheckRequest, HealthCheckResponse};
30use async_trait::async_trait;
31use common_base::readable_size::ReadableSize;
32use common_grpc::channel_manager::{
33 DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
34};
35use common_telemetry::{error, info, warn};
36use futures::FutureExt;
37use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
38use serde::{Deserialize, Serialize};
39use snafu::{OptionExt, ResultExt, ensure};
40use tokio::net::TcpListener;
41use tokio::sync::Mutex;
42use tokio::sync::oneshot::{self, Receiver, Sender};
43use tonic::service::Routes;
44use tonic::service::interceptor::InterceptedService;
45use tonic::transport::ServerTlsConfig;
46use tonic::transport::server::TcpIncoming;
47use tonic::{Request, Response, Status};
48use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer};
49
50use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
51use crate::metrics::MetricsMiddlewareLayer;
52use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
53use crate::query_handler::OpenTelemetryProtocolHandlerRef;
54use crate::server::Server;
55use crate::tls::TlsOption;
56
57type TonicResult<T> = std::result::Result<T, Status>;
58
59#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
60#[serde(default)]
61pub struct GrpcOptions {
62 pub bind_addr: String,
64 pub server_addr: String,
66 pub max_recv_message_size: ReadableSize,
68 pub max_send_message_size: ReadableSize,
70 pub flight_compression: FlightCompression,
72 pub runtime_size: usize,
73 #[serde(default = "Default::default")]
74 pub tls: TlsOption,
75 #[serde(with = "humantime_serde")]
79 pub max_connection_age: Option<Duration>,
80}
81
82impl GrpcOptions {
83 #[cfg(not(target_os = "android"))]
85 pub fn detect_server_addr(&mut self) {
86 if self.server_addr.is_empty() {
87 match local_ip_address::local_ip() {
88 Ok(ip) => {
89 let detected_addr = format!(
90 "{}:{}",
91 ip,
92 self.bind_addr
93 .split(':')
94 .nth(1)
95 .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
96 );
97 info!("Using detected: {} as server address", detected_addr);
98 self.server_addr = detected_addr;
99 }
100 Err(e) => {
101 error!("Failed to detect local ip address: {}", e);
102 }
103 }
104 }
105 }
106
107 #[cfg(target_os = "android")]
108 pub fn detect_server_addr(&mut self) {
109 if self.server_addr.is_empty() {
110 common_telemetry::debug!("detect local IP is not supported on Android");
111 }
112 }
113
114 pub fn as_config(&self) -> GrpcServerConfig {
116 GrpcServerConfig {
117 max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
118 max_send_message_size: self.max_send_message_size.as_bytes() as usize,
119 tls: self.tls.clone(),
120 max_connection_age: self.max_connection_age,
121 }
122 }
123}
124
125const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
126
127const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
128
129impl Default for GrpcOptions {
130 fn default() -> Self {
131 Self {
132 bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
133 server_addr: String::new(),
135 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
136 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
137 flight_compression: FlightCompression::ArrowIpc,
138 runtime_size: 8,
139 tls: TlsOption::default(),
140 max_connection_age: None,
141 }
142 }
143}
144
145impl GrpcOptions {
146 pub fn internal_default() -> Self {
150 Self {
151 bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
152 server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
154 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
155 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
156 flight_compression: FlightCompression::ArrowIpc,
157 runtime_size: 8,
158 tls: TlsOption::default(),
159 max_connection_age: None,
160 }
161 }
162
163 pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
164 self.bind_addr = bind_addr.to_string();
165 self
166 }
167
168 pub fn with_server_addr(mut self, server_addr: &str) -> Self {
169 self.server_addr = server_addr.to_string();
170 self
171 }
172}
173
174#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
175#[serde(rename_all = "snake_case")]
176pub enum FlightCompression {
177 #[default]
179 None,
180 Transport,
182 ArrowIpc,
184 All,
186}
187
188impl FlightCompression {
189 pub fn transport_compression(&self) -> bool {
190 self == &FlightCompression::Transport || self == &FlightCompression::All
191 }
192
193 pub fn arrow_compression(&self) -> bool {
194 self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
195 }
196}
197
198pub struct GrpcServer {
199 shutdown_tx: Mutex<Option<Sender<()>>>,
201 serve_state: Mutex<Option<Receiver<Result<()>>>>,
204 routes: Mutex<Option<Routes>>,
206 tls_config: Option<ServerTlsConfig>,
208 otel_arrow_service: Mutex<
210 Option<
211 InterceptedService<
212 ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
213 HeaderInterceptor,
214 >,
215 >,
216 >,
217 bind_addr: Option<SocketAddr>,
218 name: Option<String>,
219 config: GrpcServerConfig,
220}
221
222#[derive(Debug, Clone)]
224pub struct GrpcServerConfig {
225 pub max_recv_message_size: usize,
227 pub max_send_message_size: usize,
229 pub tls: TlsOption,
230 pub max_connection_age: Option<Duration>,
234}
235
236impl Default for GrpcServerConfig {
237 fn default() -> Self {
238 Self {
239 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
240 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
241 tls: TlsOption::default(),
242 max_connection_age: None,
243 }
244 }
245}
246
247impl GrpcServer {
248 pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
249 HealthCheckServer::new(HealthCheckHandler)
250 }
251
252 pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
253 tonic_reflection::server::Builder::configure()
254 .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
255 .with_service_name("greptime.v1.GreptimeDatabase")
256 .with_service_name("greptime.v1.HealthCheck")
257 .with_service_name("greptime.v1.RegionServer")
258 .build_v1()
259 .inspect_err(|e| {
260 common_telemetry::error!(e; "Failed to build gRPC reflection server");
261 })
262 .unwrap()
263 }
264
265 pub async fn wait_for_serve(&self) -> Result<()> {
266 let mut serve_state = self.serve_state.lock().await;
267 let rx = serve_state.take().context(InternalSnafu {
268 err_msg: "gRPC serving state is unknown, maybe the server is not started, \
269 or we have already waited for the serve result before.",
270 })?;
271 let Ok(result) = rx.await else {
272 warn!("Background gRPC serving task is quited before we can receive the serve result.");
273 return Ok(());
274 };
275 if let Err(e) = result {
276 error!(e; "GRPC serve error");
277 }
278 Ok(())
279 }
280}
281
282pub struct HealthCheckHandler;
283
284#[async_trait]
285impl HealthCheck for HealthCheckHandler {
286 async fn health_check(
287 &self,
288 _req: Request<HealthCheckRequest>,
289 ) -> TonicResult<Response<HealthCheckResponse>> {
290 Ok(Response::new(HealthCheckResponse {}))
291 }
292}
293
294pub const GRPC_SERVER: &str = "GRPC_SERVER";
295
296#[async_trait]
297impl Server for GrpcServer {
298 async fn shutdown(&self) -> Result<()> {
299 let mut shutdown_tx = self.shutdown_tx.lock().await;
300 if let Some(tx) = shutdown_tx.take()
301 && tx.send(()).is_err()
302 {
303 info!("Receiver dropped, the grpc server has already exited");
304 }
305 info!("Shutdown grpc server");
306
307 Ok(())
308 }
309
310 async fn start(&mut self, addr: SocketAddr) -> Result<()> {
311 let routes = {
312 let mut routes = self.routes.lock().await;
313 let Some(routes) = routes.take() else {
314 return AlreadyStartedSnafu {
315 server: self.name(),
316 }
317 .fail();
318 };
319 routes
320 };
321
322 let (tx, rx) = oneshot::channel();
323 let (incoming, addr) = {
324 let mut shutdown_tx = self.shutdown_tx.lock().await;
325 ensure!(
326 shutdown_tx.is_none(),
327 AlreadyStartedSnafu { server: "gRPC" }
328 );
329
330 let listener = TcpListener::bind(addr)
331 .await
332 .context(TcpBindSnafu { addr })?;
333 let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
334 let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
335 info!("gRPC server(name={}) is bound to {}", self.name(), addr);
336
337 *shutdown_tx = Some(tx);
338
339 (incoming, addr)
340 };
341
342 let metrics_layer = tower::ServiceBuilder::new()
343 .layer(MetricsMiddlewareLayer)
344 .into_inner();
345
346 let mut builder = tonic::transport::Server::builder().layer(metrics_layer);
347 if let Some(tls_config) = self.tls_config.clone() {
348 builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
349 }
350
351 if let Some(max_connection_age) = self.config.max_connection_age {
352 builder = builder.max_connection_age(max_connection_age);
353 }
354
355 let mut builder = builder
356 .add_routes(routes)
357 .add_service(self.create_healthcheck_service())
358 .add_service(self.create_reflection_service());
359
360 if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
361 builder = builder.add_service(otel_arrow_service);
362 }
363
364 let (serve_state_tx, serve_state_rx) = oneshot::channel();
365 let mut serve_state = self.serve_state.lock().await;
366 *serve_state = Some(serve_state_rx);
367
368 let _handle = common_runtime::spawn_global(async move {
369 let result = builder
370 .serve_with_incoming_shutdown(incoming, rx.map(drop))
371 .await
372 .context(StartGrpcSnafu);
373 serve_state_tx.send(result)
374 });
375
376 self.bind_addr = Some(addr);
377 Ok(())
378 }
379
380 fn name(&self) -> &str {
381 if let Some(name) = &self.name {
382 name
383 } else {
384 GRPC_SERVER
385 }
386 }
387
388 fn bind_addr(&self) -> Option<SocketAddr> {
389 self.bind_addr
390 }
391}