1use std::convert::Infallible;
16
17use api::v1::frontend::frontend_server::FrontendServer;
18use api::v1::greptime_database_server::GreptimeDatabaseServer;
19use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
20use api::v1::region::region_server::RegionServer;
21use arrow_flight::flight_service_server::FlightServiceServer;
22use auth::UserProviderRef;
23use axum::extract::Request;
24use axum::response::IntoResponse;
25use axum::routing::Route;
26use common_grpc::error::{InvalidConfigFilePathSnafu, Result};
27use common_runtime::Runtime;
28use common_telemetry::warn;
29use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
30use snafu::ResultExt;
31use tokio::sync::Mutex;
32use tonic::codec::CompressionEncoding;
33use tonic::codegen::Service;
34use tonic::service::RoutesBuilder;
35use tonic::service::interceptor::InterceptedService;
36use tonic::transport::{Identity, ServerTlsConfig};
37use tower::Layer;
38
39use crate::grpc::database::DatabaseService;
40use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper};
41use crate::grpc::frontend_grpc_handler::FrontendGrpcHandler;
42use crate::grpc::greptime_handler::GreptimeRequestHandler;
43use crate::grpc::prom_query_gateway::PrometheusGatewayService;
44use crate::grpc::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
45use crate::grpc::{GrpcServer, GrpcServerConfig};
46use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
47use crate::prometheus_handler::PrometheusHandlerRef;
48use crate::query_handler::OpenTelemetryProtocolHandlerRef;
49use crate::request_memory_limiter::ServerMemoryLimiter;
50use crate::tls::TlsOption;
51
52#[macro_export]
55macro_rules! add_service {
56 ($builder: ident, $service: expr) => {
57 let max_recv_message_size = $builder.config().max_recv_message_size;
58 let max_send_message_size = $builder.config().max_send_message_size;
59
60 use tonic::codec::CompressionEncoding;
61 let service_builder = $service
62 .max_decoding_message_size(max_recv_message_size)
63 .max_encoding_message_size(max_send_message_size)
64 .accept_compressed(CompressionEncoding::Gzip)
65 .accept_compressed(CompressionEncoding::Zstd)
66 .send_compressed(CompressionEncoding::Gzip)
67 .send_compressed(CompressionEncoding::Zstd);
68
69 use $crate::grpc::memory_limit::MemoryLimiterExtensionLayer;
71 let service_with_limiter = $crate::tower::ServiceBuilder::new()
72 .layer(MemoryLimiterExtensionLayer::new(
73 $builder.memory_limiter().clone(),
74 ))
75 .service(service_builder);
76
77 $builder
78 .routes_builder_mut()
79 .add_service(service_with_limiter);
80 };
81}
82
83pub struct GrpcServerBuilder {
84 name: Option<String>,
85 config: GrpcServerConfig,
86 runtime: Runtime,
87 routes_builder: RoutesBuilder,
88 tls_config: Option<ServerTlsConfig>,
89 otel_arrow_service: Option<
90 InterceptedService<
91 ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
92 HeaderInterceptor,
93 >,
94 >,
95 memory_limiter: ServerMemoryLimiter,
96}
97
98impl GrpcServerBuilder {
99 pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
100 let memory_limiter = ServerMemoryLimiter::default();
102
103 Self {
104 name: None,
105 config,
106 runtime,
107 routes_builder: RoutesBuilder::default(),
108 tls_config: None,
109 otel_arrow_service: None,
110 memory_limiter,
111 }
112 }
113
114 pub fn with_memory_limiter(mut self, limiter: ServerMemoryLimiter) -> Self {
116 self.memory_limiter = limiter;
117 self
118 }
119
120 pub fn config(&self) -> &GrpcServerConfig {
121 &self.config
122 }
123
124 pub fn runtime(&self) -> &Runtime {
125 &self.runtime
126 }
127
128 pub fn memory_limiter(&self) -> &ServerMemoryLimiter {
129 &self.memory_limiter
130 }
131
132 pub fn name(self, name: Option<String>) -> Self {
133 Self { name, ..self }
134 }
135
136 pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
138 add_service!(
139 self,
140 GreptimeDatabaseServer::new(DatabaseService::new(database_handler))
141 );
142 self
143 }
144
145 pub fn prometheus_handler(
147 mut self,
148 prometheus_handler: PrometheusHandlerRef,
149 user_provider: Option<UserProviderRef>,
150 ) -> Self {
151 add_service!(
152 self,
153 PrometheusGatewayServer::new(PrometheusGatewayService::new(
154 prometheus_handler,
155 user_provider,
156 ))
157 );
158 self
159 }
160
161 pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self {
163 add_service!(
164 self,
165 FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
166 );
167 self
168 }
169
170 pub fn frontend_grpc_handler(mut self, handler: FrontendGrpcHandler) -> Self {
172 add_service!(self, FrontendServer::new(handler));
173 self
174 }
175
176 pub fn otel_arrow_handler(
178 mut self,
179 handler: OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>,
180 ) -> Self {
181 let mut server = ArrowMetricsServiceServer::new(handler);
182 server = server
183 .max_decoding_message_size(self.config.max_recv_message_size)
184 .max_encoding_message_size(self.config.max_send_message_size)
185 .accept_compressed(CompressionEncoding::Zstd)
186 .send_compressed(CompressionEncoding::Zstd);
187 let svc = InterceptedService::new(server, HeaderInterceptor {});
188 self.otel_arrow_service = Some(svc);
189 self
190 }
191
192 pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
194 let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone());
195 add_service!(self, RegionServer::new(handler));
196 self
197 }
198
199 pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
200 &mut self.routes_builder
201 }
202
203 pub fn with_tls_config(mut self, tls_option: TlsOption) -> Result<Self> {
204 if tls_option.watch {
207 warn!("Certificates watch and reloading for gRPC is NOT supported at the moment");
208 }
209 self.tls_config = if tls_option.should_force_tls() {
210 let cert = std::fs::read_to_string(tls_option.cert_path)
211 .context(InvalidConfigFilePathSnafu)?;
212 let key =
213 std::fs::read_to_string(tls_option.key_path).context(InvalidConfigFilePathSnafu)?;
214 let identity = Identity::from_pem(cert, key);
215 Some(ServerTlsConfig::new().identity(identity))
216 } else {
217 None
218 };
219 Ok(self)
220 }
221
222 pub fn add_layer<L>(self, layer: L) -> Self
223 where
224 L: Layer<Route> + Clone + Send + Sync + 'static,
225 L::Service: Service<Request> + Clone + Send + Sync + 'static,
226 <L::Service as Service<Request>>::Response: IntoResponse + 'static,
227 <L::Service as Service<Request>>::Error: Into<Infallible> + 'static,
228 <L::Service as Service<Request>>::Future: Send + 'static,
229 {
230 let routes = self.routes_builder.routes();
231 let router = routes.into_axum_router();
232 let router = router.layer(layer);
233 Self {
234 routes_builder: RoutesBuilder::from(router),
235 ..self
236 }
237 }
238
239 pub fn build(self) -> GrpcServer {
240 GrpcServer {
241 routes: Mutex::new(Some(self.routes_builder.routes())),
242 shutdown_tx: Mutex::new(None),
243 serve_state: Mutex::new(None),
244 tls_config: self.tls_config,
245 otel_arrow_service: Mutex::new(self.otel_arrow_service),
246 bind_addr: None,
247 name: self.name,
248 config: self.config,
249 }
250 }
251}