1use api::v1::frontend::frontend_server::FrontendServer;
16use api::v1::greptime_database_server::GreptimeDatabaseServer;
17use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
18use api::v1::region::region_server::RegionServer;
19use arrow_flight::flight_service_server::FlightServiceServer;
20use auth::UserProviderRef;
21use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
22use common_runtime::Runtime;
23use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
24use snafu::ResultExt;
25use tokio::sync::Mutex;
26use tonic::codec::CompressionEncoding;
27use tonic::service::RoutesBuilder;
28use tonic::service::interceptor::InterceptedService;
29use tonic::transport::{Identity, ServerTlsConfig};
30
31use crate::grpc::database::DatabaseService;
32use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper};
33use crate::grpc::frontend_grpc_handler::FrontendGrpcHandler;
34use crate::grpc::greptime_handler::GreptimeRequestHandler;
35use crate::grpc::prom_query_gateway::PrometheusGatewayService;
36use crate::grpc::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
37use crate::grpc::{GrpcServer, GrpcServerConfig};
38use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
39use crate::prometheus_handler::PrometheusHandlerRef;
40use crate::query_handler::OpenTelemetryProtocolHandlerRef;
41use crate::request_limiter::RequestMemoryLimiter;
42use crate::tls::TlsOption;
43
44#[macro_export]
47macro_rules! add_service {
48 ($builder: ident, $service: expr) => {
49 let max_recv_message_size = $builder.config().max_recv_message_size;
50 let max_send_message_size = $builder.config().max_send_message_size;
51
52 use tonic::codec::CompressionEncoding;
53 let service_builder = $service
54 .max_decoding_message_size(max_recv_message_size)
55 .max_encoding_message_size(max_send_message_size)
56 .accept_compressed(CompressionEncoding::Gzip)
57 .accept_compressed(CompressionEncoding::Zstd)
58 .send_compressed(CompressionEncoding::Gzip)
59 .send_compressed(CompressionEncoding::Zstd);
60
61 use $crate::grpc::memory_limit::MemoryLimiterExtensionLayer;
63 let service_with_limiter = $crate::tower::ServiceBuilder::new()
64 .layer(MemoryLimiterExtensionLayer::new(
65 $builder.memory_limiter().clone(),
66 ))
67 .service(service_builder);
68
69 $builder
70 .routes_builder_mut()
71 .add_service(service_with_limiter);
72 };
73}
74
75pub struct GrpcServerBuilder {
76 name: Option<String>,
77 config: GrpcServerConfig,
78 runtime: Runtime,
79 routes_builder: RoutesBuilder,
80 tls_config: Option<ServerTlsConfig>,
81 otel_arrow_service: Option<
82 InterceptedService<
83 ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
84 HeaderInterceptor,
85 >,
86 >,
87 memory_limiter: RequestMemoryLimiter,
88}
89
90impl GrpcServerBuilder {
91 pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
92 let memory_limiter = RequestMemoryLimiter::new(config.max_total_message_memory);
93 Self {
94 name: None,
95 config,
96 runtime,
97 routes_builder: RoutesBuilder::default(),
98 tls_config: None,
99 otel_arrow_service: None,
100 memory_limiter,
101 }
102 }
103
104 pub fn config(&self) -> &GrpcServerConfig {
105 &self.config
106 }
107
108 pub fn runtime(&self) -> &Runtime {
109 &self.runtime
110 }
111
112 pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
113 &self.memory_limiter
114 }
115
116 pub fn name(self, name: Option<String>) -> Self {
117 Self { name, ..self }
118 }
119
120 pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
122 add_service!(
123 self,
124 GreptimeDatabaseServer::new(DatabaseService::new(database_handler))
125 );
126 self
127 }
128
129 pub fn prometheus_handler(
131 mut self,
132 prometheus_handler: PrometheusHandlerRef,
133 user_provider: Option<UserProviderRef>,
134 ) -> Self {
135 add_service!(
136 self,
137 PrometheusGatewayServer::new(PrometheusGatewayService::new(
138 prometheus_handler,
139 user_provider,
140 ))
141 );
142 self
143 }
144
145 pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self {
147 add_service!(
148 self,
149 FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
150 );
151 self
152 }
153
154 pub fn frontend_grpc_handler(mut self, handler: FrontendGrpcHandler) -> Self {
156 add_service!(self, FrontendServer::new(handler));
157 self
158 }
159
160 pub fn otel_arrow_handler(
162 mut self,
163 handler: OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>,
164 ) -> Self {
165 let mut server = ArrowMetricsServiceServer::new(handler);
166 server = server
167 .max_decoding_message_size(self.config.max_recv_message_size)
168 .max_encoding_message_size(self.config.max_send_message_size)
169 .accept_compressed(CompressionEncoding::Zstd)
170 .send_compressed(CompressionEncoding::Zstd);
171 let svc = InterceptedService::new(server, HeaderInterceptor {});
172 self.otel_arrow_service = Some(svc);
173 self
174 }
175
176 pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
178 let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone());
179 add_service!(self, RegionServer::new(handler));
180 self
181 }
182
183 pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
184 &mut self.routes_builder
185 }
186
187 pub fn with_tls_config(mut self, tls_option: TlsOption) -> Result<Self> {
188 if tls_option.watch {
191 return Err(Error::NotSupported {
192 feat: "Certificates watch and reloading for gRPC is not supported at the moment"
193 .to_string(),
194 });
195 }
196 self.tls_config = if tls_option.should_force_tls() {
197 let cert = std::fs::read_to_string(tls_option.cert_path)
198 .context(InvalidConfigFilePathSnafu)?;
199 let key =
200 std::fs::read_to_string(tls_option.key_path).context(InvalidConfigFilePathSnafu)?;
201 let identity = Identity::from_pem(cert, key);
202 Some(ServerTlsConfig::new().identity(identity))
203 } else {
204 None
205 };
206 Ok(self)
207 }
208
209 pub fn build(self) -> GrpcServer {
210 GrpcServer {
211 routes: Mutex::new(Some(self.routes_builder.routes())),
212 shutdown_tx: Mutex::new(None),
213 serve_state: Mutex::new(None),
214 tls_config: self.tls_config,
215 otel_arrow_service: Mutex::new(self.otel_arrow_service),
216 bind_addr: None,
217 name: self.name,
218 config: self.config,
219 memory_limiter: self.memory_limiter,
220 }
221 }
222}