1mod authorize;
16pub mod builder;
17mod cancellation;
18mod database;
19pub mod flight;
20pub mod greptime_handler;
21pub mod prom_query_gateway;
22pub mod region_server;
23
24use std::net::SocketAddr;
25
26use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
27use api::v1::{HealthCheckRequest, HealthCheckResponse};
28use async_trait::async_trait;
29use common_base::readable_size::ReadableSize;
30use common_grpc::channel_manager::{
31 DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
32};
33use common_telemetry::{error, info, warn};
34use futures::FutureExt;
35use otel_arrow_rust::opentelemetry::ArrowMetricsServiceServer;
36use serde::{Deserialize, Serialize};
37use snafu::{ensure, OptionExt, ResultExt};
38use tokio::net::TcpListener;
39use tokio::sync::oneshot::{self, Receiver, Sender};
40use tokio::sync::Mutex;
41use tonic::service::interceptor::InterceptedService;
42use tonic::service::Routes;
43use tonic::transport::server::TcpIncoming;
44use tonic::transport::ServerTlsConfig;
45use tonic::{Request, Response, Status};
46use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
47
48use crate::error::{
49 AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu,
50};
51use crate::metrics::MetricsMiddlewareLayer;
52use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
53use crate::query_handler::OpenTelemetryProtocolHandlerRef;
54use crate::server::Server;
55use crate::tls::TlsOption;
56
57type TonicResult<T> = std::result::Result<T, Status>;
58
59#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
60pub struct GrpcOptions {
61 pub bind_addr: String,
63 pub server_addr: String,
65 pub max_recv_message_size: ReadableSize,
67 pub max_send_message_size: ReadableSize,
69 pub runtime_size: usize,
70 #[serde(default = "Default::default")]
71 pub tls: TlsOption,
72}
73
74impl GrpcOptions {
75 #[cfg(not(target_os = "android"))]
77 pub fn detect_server_addr(&mut self) {
78 if self.server_addr.is_empty() {
79 match local_ip_address::local_ip() {
80 Ok(ip) => {
81 let detected_addr = format!(
82 "{}:{}",
83 ip,
84 self.bind_addr
85 .split(':')
86 .nth(1)
87 .unwrap_or(DEFAULT_GRPC_ADDR_PORT)
88 );
89 info!("Using detected: {} as server address", detected_addr);
90 self.server_addr = detected_addr;
91 }
92 Err(e) => {
93 error!("Failed to detect local ip address: {}", e);
94 }
95 }
96 }
97 }
98
99 #[cfg(target_os = "android")]
100 pub fn detect_server_addr(&mut self) {
101 if self.server_addr.is_empty() {
102 common_telemetry::debug!("detect local IP is not supported on Android");
103 }
104 }
105}
106
107const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
108
109impl Default for GrpcOptions {
110 fn default() -> Self {
111 Self {
112 bind_addr: format!("127.0.0.1:{}", DEFAULT_GRPC_ADDR_PORT),
113 server_addr: String::new(),
115 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
116 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
117 runtime_size: 8,
118 tls: TlsOption::default(),
119 }
120 }
121}
122
123impl GrpcOptions {
124 pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
125 self.bind_addr = bind_addr.to_string();
126 self
127 }
128
129 pub fn with_server_addr(mut self, server_addr: &str) -> Self {
130 self.server_addr = server_addr.to_string();
131 self
132 }
133}
134
135pub struct GrpcServer {
136 shutdown_tx: Mutex<Option<Sender<()>>>,
138 serve_state: Mutex<Option<Receiver<Result<()>>>>,
141 routes: Mutex<Option<Routes>>,
143 tls_config: Option<ServerTlsConfig>,
145 otel_arrow_service: Mutex<
147 Option<
148 InterceptedService<
149 ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
150 HeaderInterceptor,
151 >,
152 >,
153 >,
154 bind_addr: Option<SocketAddr>,
155}
156
157#[derive(Debug, Clone)]
159pub struct GrpcServerConfig {
160 pub max_recv_message_size: usize,
162 pub max_send_message_size: usize,
164 pub tls: TlsOption,
165}
166
167impl Default for GrpcServerConfig {
168 fn default() -> Self {
169 Self {
170 max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
171 max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
172 tls: TlsOption::default(),
173 }
174 }
175}
176
177impl GrpcServer {
178 pub fn create_healthcheck_service(&self) -> HealthCheckServer<impl HealthCheck> {
179 HealthCheckServer::new(HealthCheckHandler)
180 }
181
182 pub fn create_reflection_service(&self) -> ServerReflectionServer<impl ServerReflection> {
183 tonic_reflection::server::Builder::configure()
184 .register_encoded_file_descriptor_set(api::v1::GREPTIME_GRPC_DESC)
185 .with_service_name("greptime.v1.GreptimeDatabase")
186 .with_service_name("greptime.v1.HealthCheck")
187 .with_service_name("greptime.v1.RegionServer")
188 .build_v1()
189 .inspect_err(|e| {
190 common_telemetry::error!(e; "Failed to build gRPC reflection server");
191 })
192 .unwrap()
193 }
194
195 pub async fn wait_for_serve(&self) -> Result<()> {
196 let mut serve_state = self.serve_state.lock().await;
197 let rx = serve_state.take().context(InternalSnafu {
198 err_msg: "gRPC serving state is unknown, maybe the server is not started, \
199 or we have already waited for the serve result before.",
200 })?;
201 let Ok(result) = rx.await else {
202 warn!("Background gRPC serving task is quited before we can receive the serve result.");
203 return Ok(());
204 };
205 if let Err(e) = result {
206 error!(e; "GRPC serve error");
207 }
208 Ok(())
209 }
210}
211
212pub struct HealthCheckHandler;
213
214#[async_trait]
215impl HealthCheck for HealthCheckHandler {
216 async fn health_check(
217 &self,
218 _req: Request<HealthCheckRequest>,
219 ) -> TonicResult<Response<HealthCheckResponse>> {
220 Ok(Response::new(HealthCheckResponse {}))
221 }
222}
223
224pub const GRPC_SERVER: &str = "GRPC_SERVER";
225
226#[async_trait]
227impl Server for GrpcServer {
228 async fn shutdown(&self) -> Result<()> {
229 let mut shutdown_tx = self.shutdown_tx.lock().await;
230 if let Some(tx) = shutdown_tx.take() {
231 if tx.send(()).is_err() {
232 info!("Receiver dropped, the grpc server has already existed");
233 }
234 }
235 info!("Shutdown grpc server");
236
237 Ok(())
238 }
239
240 async fn start(&mut self, addr: SocketAddr) -> Result<()> {
241 let routes = {
242 let mut routes = self.routes.lock().await;
243 let Some(routes) = routes.take() else {
244 return AlreadyStartedSnafu {
245 server: self.name(),
246 }
247 .fail();
248 };
249 routes
250 };
251
252 let (tx, rx) = oneshot::channel();
253 let (incoming, addr) = {
254 let mut shutdown_tx = self.shutdown_tx.lock().await;
255 ensure!(
256 shutdown_tx.is_none(),
257 AlreadyStartedSnafu { server: "gRPC" }
258 );
259
260 let listener = TcpListener::bind(addr)
261 .await
262 .context(TcpBindSnafu { addr })?;
263 let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
264 let incoming =
265 TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
266 info!("gRPC server is bound to {}", addr);
267
268 *shutdown_tx = Some(tx);
269
270 (incoming, addr)
271 };
272
273 let metrics_layer = tower::ServiceBuilder::new()
274 .layer(MetricsMiddlewareLayer)
275 .into_inner();
276
277 let mut builder = tonic::transport::Server::builder().layer(metrics_layer);
278 if let Some(tls_config) = self.tls_config.clone() {
279 builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
280 }
281
282 let mut builder = builder
283 .add_routes(routes)
284 .add_service(self.create_healthcheck_service())
285 .add_service(self.create_reflection_service());
286
287 if let Some(otel_arrow_service) = self.otel_arrow_service.lock().await.take() {
288 builder = builder.add_service(otel_arrow_service);
289 }
290
291 let (serve_state_tx, serve_state_rx) = oneshot::channel();
292 let mut serve_state = self.serve_state.lock().await;
293 *serve_state = Some(serve_state_rx);
294
295 let _handle = common_runtime::spawn_global(async move {
296 let result = builder
297 .serve_with_incoming_shutdown(incoming, rx.map(drop))
298 .await
299 .context(StartGrpcSnafu);
300 serve_state_tx.send(result)
301 });
302
303 self.bind_addr = Some(addr);
304 Ok(())
305 }
306
307 fn name(&self) -> &str {
308 GRPC_SERVER
309 }
310
311 fn bind_addr(&self) -> Option<SocketAddr> {
312 self.bind_addr
313 }
314}