1use api::v1::frontend::frontend_server::FrontendServer;
16use api::v1::greptime_database_server::GreptimeDatabaseServer;
17use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
18use api::v1::region::region_server::RegionServer;
19use arrow_flight::flight_service_server::FlightServiceServer;
20use auth::UserProviderRef;
21use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
22use common_runtime::Runtime;
23use otel_arrow_rust::opentelemetry::ArrowMetricsServiceServer;
24use snafu::ResultExt;
25use tokio::sync::Mutex;
26use tonic::codec::CompressionEncoding;
27use tonic::service::interceptor::InterceptedService;
28use tonic::service::RoutesBuilder;
29use tonic::transport::{Identity, ServerTlsConfig};
30
31use crate::grpc::database::DatabaseService;
32use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper};
33use crate::grpc::frontend_grpc_handler::FrontendGrpcHandler;
34use crate::grpc::greptime_handler::GreptimeRequestHandler;
35use crate::grpc::prom_query_gateway::PrometheusGatewayService;
36use crate::grpc::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
37use crate::grpc::{GrpcServer, GrpcServerConfig};
38use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
39use crate::prometheus_handler::PrometheusHandlerRef;
40use crate::query_handler::OpenTelemetryProtocolHandlerRef;
41use crate::tls::TlsOption;
42
43#[macro_export]
46macro_rules! add_service {
47 ($builder: ident, $service: expr) => {
48 let max_recv_message_size = $builder.config().max_recv_message_size;
49 let max_send_message_size = $builder.config().max_send_message_size;
50
51 use tonic::codec::CompressionEncoding;
52 let service_builder = $service
53 .max_decoding_message_size(max_recv_message_size)
54 .max_encoding_message_size(max_send_message_size)
55 .accept_compressed(CompressionEncoding::Gzip)
56 .accept_compressed(CompressionEncoding::Zstd)
57 .send_compressed(CompressionEncoding::Gzip)
58 .send_compressed(CompressionEncoding::Zstd);
59
60 $builder.routes_builder_mut().add_service(service_builder);
61 };
62}
63
64pub struct GrpcServerBuilder {
65 config: GrpcServerConfig,
66 runtime: Runtime,
67 routes_builder: RoutesBuilder,
68 tls_config: Option<ServerTlsConfig>,
69 otel_arrow_service: Option<
70 InterceptedService<
71 ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
72 HeaderInterceptor,
73 >,
74 >,
75}
76
77impl GrpcServerBuilder {
78 pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
79 Self {
80 config,
81 runtime,
82 routes_builder: RoutesBuilder::default(),
83 tls_config: None,
84 otel_arrow_service: None,
85 }
86 }
87
88 pub fn config(&self) -> &GrpcServerConfig {
89 &self.config
90 }
91
92 pub fn runtime(&self) -> &Runtime {
93 &self.runtime
94 }
95
96 pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
98 add_service!(
99 self,
100 GreptimeDatabaseServer::new(DatabaseService::new(database_handler))
101 );
102 self
103 }
104
105 pub fn prometheus_handler(
107 mut self,
108 prometheus_handler: PrometheusHandlerRef,
109 user_provider: Option<UserProviderRef>,
110 ) -> Self {
111 add_service!(
112 self,
113 PrometheusGatewayServer::new(PrometheusGatewayService::new(
114 prometheus_handler,
115 user_provider,
116 ))
117 );
118 self
119 }
120
121 pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self {
123 add_service!(
124 self,
125 FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
126 );
127 self
128 }
129
130 pub fn frontend_grpc_handler(mut self, handler: FrontendGrpcHandler) -> Self {
132 add_service!(self, FrontendServer::new(handler));
133 self
134 }
135
136 pub fn otel_arrow_handler(
138 mut self,
139 handler: OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>,
140 ) -> Self {
141 let mut server = ArrowMetricsServiceServer::new(handler);
142 server = server
143 .max_decoding_message_size(self.config.max_recv_message_size)
144 .max_encoding_message_size(self.config.max_send_message_size)
145 .accept_compressed(CompressionEncoding::Zstd)
146 .send_compressed(CompressionEncoding::Zstd);
147 let svc = InterceptedService::new(server, HeaderInterceptor {});
148 self.otel_arrow_service = Some(svc);
149 self
150 }
151
152 pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
154 let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone());
155 add_service!(self, RegionServer::new(handler));
156 self
157 }
158
159 pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
160 &mut self.routes_builder
161 }
162
163 pub fn with_tls_config(mut self, tls_option: TlsOption) -> Result<Self> {
164 if tls_option.watch {
167 return Err(Error::NotSupported {
168 feat: "Certificates watch and reloading for gRPC is not supported at the moment"
169 .to_string(),
170 });
171 }
172 self.tls_config = if tls_option.should_force_tls() {
173 let cert = std::fs::read_to_string(tls_option.cert_path)
174 .context(InvalidConfigFilePathSnafu)?;
175 let key =
176 std::fs::read_to_string(tls_option.key_path).context(InvalidConfigFilePathSnafu)?;
177 let identity = Identity::from_pem(cert, key);
178 Some(ServerTlsConfig::new().identity(identity))
179 } else {
180 None
181 };
182 Ok(self)
183 }
184
185 pub fn build(self) -> GrpcServer {
186 GrpcServer {
187 routes: Mutex::new(Some(self.routes_builder.routes())),
188 shutdown_tx: Mutex::new(None),
189 serve_state: Mutex::new(None),
190 tls_config: self.tls_config,
191 otel_arrow_service: Mutex::new(self.otel_arrow_service),
192 bind_addr: None,
193 }
194 }
195}