1use std::convert::Infallible;
16
17use api::v1::frontend::frontend_server::FrontendServer;
18use api::v1::greptime_database_server::GreptimeDatabaseServer;
19use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
20use api::v1::region::region_server::RegionServer;
21use arrow_flight::flight_service_server::FlightServiceServer;
22use auth::UserProviderRef;
23use axum::extract::Request;
24use axum::response::IntoResponse;
25use axum::routing::Route;
26use common_grpc::error::{InvalidConfigFilePathSnafu, Result};
27use common_runtime::Runtime;
28use common_telemetry::warn;
29use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
30use snafu::ResultExt;
31use tokio::sync::Mutex;
32use tonic::codec::CompressionEncoding;
33use tonic::codegen::Service;
34use tonic::service::RoutesBuilder;
35use tonic::service::interceptor::InterceptedService;
36use tonic::transport::{Identity, ServerTlsConfig};
37use tower::Layer;
38
39use crate::grpc::database::DatabaseService;
40use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper};
41use crate::grpc::frontend_grpc_handler::FrontendGrpcHandler;
42use crate::grpc::greptime_handler::GreptimeRequestHandler;
43use crate::grpc::prom_query_gateway::PrometheusGatewayService;
44use crate::grpc::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
45use crate::grpc::{GrpcServer, GrpcServerConfig};
46use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
47use crate::prometheus_handler::PrometheusHandlerRef;
48use crate::query_handler::OpenTelemetryProtocolHandlerRef;
49use crate::request_limiter::RequestMemoryLimiter;
50use crate::tls::TlsOption;
51
52#[macro_export]
55macro_rules! add_service {
56 ($builder: ident, $service: expr) => {
57 let max_recv_message_size = $builder.config().max_recv_message_size;
58 let max_send_message_size = $builder.config().max_send_message_size;
59
60 use tonic::codec::CompressionEncoding;
61 let service_builder = $service
62 .max_decoding_message_size(max_recv_message_size)
63 .max_encoding_message_size(max_send_message_size)
64 .accept_compressed(CompressionEncoding::Gzip)
65 .accept_compressed(CompressionEncoding::Zstd)
66 .send_compressed(CompressionEncoding::Gzip)
67 .send_compressed(CompressionEncoding::Zstd);
68
69 use $crate::grpc::memory_limit::MemoryLimiterExtensionLayer;
71 let service_with_limiter = $crate::tower::ServiceBuilder::new()
72 .layer(MemoryLimiterExtensionLayer::new(
73 $builder.memory_limiter().clone(),
74 ))
75 .service(service_builder);
76
77 $builder
78 .routes_builder_mut()
79 .add_service(service_with_limiter);
80 };
81}
82
83pub struct GrpcServerBuilder {
84 name: Option<String>,
85 config: GrpcServerConfig,
86 runtime: Runtime,
87 routes_builder: RoutesBuilder,
88 tls_config: Option<ServerTlsConfig>,
89 otel_arrow_service: Option<
90 InterceptedService<
91 ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
92 HeaderInterceptor,
93 >,
94 >,
95 memory_limiter: RequestMemoryLimiter,
96}
97
98impl GrpcServerBuilder {
99 pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
100 let memory_limiter = RequestMemoryLimiter::new(config.max_total_message_memory);
101 Self {
102 name: None,
103 config,
104 runtime,
105 routes_builder: RoutesBuilder::default(),
106 tls_config: None,
107 otel_arrow_service: None,
108 memory_limiter,
109 }
110 }
111
112 pub fn config(&self) -> &GrpcServerConfig {
113 &self.config
114 }
115
116 pub fn runtime(&self) -> &Runtime {
117 &self.runtime
118 }
119
120 pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
121 &self.memory_limiter
122 }
123
124 pub fn name(self, name: Option<String>) -> Self {
125 Self { name, ..self }
126 }
127
128 pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
130 add_service!(
131 self,
132 GreptimeDatabaseServer::new(DatabaseService::new(database_handler))
133 );
134 self
135 }
136
137 pub fn prometheus_handler(
139 mut self,
140 prometheus_handler: PrometheusHandlerRef,
141 user_provider: Option<UserProviderRef>,
142 ) -> Self {
143 add_service!(
144 self,
145 PrometheusGatewayServer::new(PrometheusGatewayService::new(
146 prometheus_handler,
147 user_provider,
148 ))
149 );
150 self
151 }
152
153 pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self {
155 add_service!(
156 self,
157 FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
158 );
159 self
160 }
161
162 pub fn frontend_grpc_handler(mut self, handler: FrontendGrpcHandler) -> Self {
164 add_service!(self, FrontendServer::new(handler));
165 self
166 }
167
168 pub fn otel_arrow_handler(
170 mut self,
171 handler: OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>,
172 ) -> Self {
173 let mut server = ArrowMetricsServiceServer::new(handler);
174 server = server
175 .max_decoding_message_size(self.config.max_recv_message_size)
176 .max_encoding_message_size(self.config.max_send_message_size)
177 .accept_compressed(CompressionEncoding::Zstd)
178 .send_compressed(CompressionEncoding::Zstd);
179 let svc = InterceptedService::new(server, HeaderInterceptor {});
180 self.otel_arrow_service = Some(svc);
181 self
182 }
183
184 pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
186 let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone());
187 add_service!(self, RegionServer::new(handler));
188 self
189 }
190
191 pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
192 &mut self.routes_builder
193 }
194
195 pub fn with_tls_config(mut self, tls_option: TlsOption) -> Result<Self> {
196 if tls_option.watch {
199 warn!("Certificates watch and reloading for gRPC is NOT supported at the moment");
200 }
201 self.tls_config = if tls_option.should_force_tls() {
202 let cert = std::fs::read_to_string(tls_option.cert_path)
203 .context(InvalidConfigFilePathSnafu)?;
204 let key =
205 std::fs::read_to_string(tls_option.key_path).context(InvalidConfigFilePathSnafu)?;
206 let identity = Identity::from_pem(cert, key);
207 Some(ServerTlsConfig::new().identity(identity))
208 } else {
209 None
210 };
211 Ok(self)
212 }
213
214 pub fn add_layer<L>(self, layer: L) -> Self
215 where
216 L: Layer<Route> + Clone + Send + Sync + 'static,
217 L::Service: Service<Request> + Clone + Send + Sync + 'static,
218 <L::Service as Service<Request>>::Response: IntoResponse + 'static,
219 <L::Service as Service<Request>>::Error: Into<Infallible> + 'static,
220 <L::Service as Service<Request>>::Future: Send + 'static,
221 {
222 let routes = self.routes_builder.routes();
223 let router = routes.into_axum_router();
224 let router = router.layer(layer);
225 Self {
226 routes_builder: RoutesBuilder::from(router),
227 ..self
228 }
229 }
230
231 pub fn build(self) -> GrpcServer {
232 GrpcServer {
233 routes: Mutex::new(Some(self.routes_builder.routes())),
234 shutdown_tx: Mutex::new(None),
235 serve_state: Mutex::new(None),
236 tls_config: self.tls_config,
237 otel_arrow_service: Mutex::new(self.otel_arrow_service),
238 bind_addr: None,
239 name: self.name,
240 config: self.config,
241 memory_limiter: self.memory_limiter,
242 }
243 }
244}