1use api::v1::frontend::frontend_server::FrontendServer;
16use api::v1::greptime_database_server::GreptimeDatabaseServer;
17use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
18use api::v1::region::region_server::RegionServer;
19use arrow_flight::flight_service_server::FlightServiceServer;
20use auth::UserProviderRef;
21use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
22use common_runtime::Runtime;
23use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
24use snafu::ResultExt;
25use tokio::sync::Mutex;
26use tonic::codec::CompressionEncoding;
27use tonic::service::interceptor::InterceptedService;
28use tonic::service::RoutesBuilder;
29use tonic::transport::{Identity, ServerTlsConfig};
30
31use crate::grpc::database::DatabaseService;
32use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper};
33use crate::grpc::frontend_grpc_handler::FrontendGrpcHandler;
34use crate::grpc::greptime_handler::GreptimeRequestHandler;
35use crate::grpc::prom_query_gateway::PrometheusGatewayService;
36use crate::grpc::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
37use crate::grpc::{GrpcServer, GrpcServerConfig};
38use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
39use crate::prometheus_handler::PrometheusHandlerRef;
40use crate::query_handler::OpenTelemetryProtocolHandlerRef;
41use crate::tls::TlsOption;
42
43#[macro_export]
46macro_rules! add_service {
47 ($builder: ident, $service: expr) => {
48 let max_recv_message_size = $builder.config().max_recv_message_size;
49 let max_send_message_size = $builder.config().max_send_message_size;
50
51 use tonic::codec::CompressionEncoding;
52 let service_builder = $service
53 .max_decoding_message_size(max_recv_message_size)
54 .max_encoding_message_size(max_send_message_size)
55 .accept_compressed(CompressionEncoding::Gzip)
56 .accept_compressed(CompressionEncoding::Zstd)
57 .send_compressed(CompressionEncoding::Gzip)
58 .send_compressed(CompressionEncoding::Zstd);
59
60 $builder.routes_builder_mut().add_service(service_builder);
61 };
62}
63
64pub struct GrpcServerBuilder {
65 name: Option<String>,
66 config: GrpcServerConfig,
67 runtime: Runtime,
68 routes_builder: RoutesBuilder,
69 tls_config: Option<ServerTlsConfig>,
70 otel_arrow_service: Option<
71 InterceptedService<
72 ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
73 HeaderInterceptor,
74 >,
75 >,
76}
77
78impl GrpcServerBuilder {
79 pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
80 Self {
81 name: None,
82 config,
83 runtime,
84 routes_builder: RoutesBuilder::default(),
85 tls_config: None,
86 otel_arrow_service: None,
87 }
88 }
89
90 pub fn config(&self) -> &GrpcServerConfig {
91 &self.config
92 }
93
94 pub fn runtime(&self) -> &Runtime {
95 &self.runtime
96 }
97
98 pub fn name(self, name: Option<String>) -> Self {
99 Self { name, ..self }
100 }
101
102 pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
104 add_service!(
105 self,
106 GreptimeDatabaseServer::new(DatabaseService::new(database_handler))
107 );
108 self
109 }
110
111 pub fn prometheus_handler(
113 mut self,
114 prometheus_handler: PrometheusHandlerRef,
115 user_provider: Option<UserProviderRef>,
116 ) -> Self {
117 add_service!(
118 self,
119 PrometheusGatewayServer::new(PrometheusGatewayService::new(
120 prometheus_handler,
121 user_provider,
122 ))
123 );
124 self
125 }
126
127 pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self {
129 add_service!(
130 self,
131 FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
132 );
133 self
134 }
135
136 pub fn frontend_grpc_handler(mut self, handler: FrontendGrpcHandler) -> Self {
138 add_service!(self, FrontendServer::new(handler));
139 self
140 }
141
142 pub fn otel_arrow_handler(
144 mut self,
145 handler: OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>,
146 ) -> Self {
147 let mut server = ArrowMetricsServiceServer::new(handler);
148 server = server
149 .max_decoding_message_size(self.config.max_recv_message_size)
150 .max_encoding_message_size(self.config.max_send_message_size)
151 .accept_compressed(CompressionEncoding::Zstd)
152 .send_compressed(CompressionEncoding::Zstd);
153 let svc = InterceptedService::new(server, HeaderInterceptor {});
154 self.otel_arrow_service = Some(svc);
155 self
156 }
157
158 pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
160 let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone());
161 add_service!(self, RegionServer::new(handler));
162 self
163 }
164
165 pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
166 &mut self.routes_builder
167 }
168
169 pub fn with_tls_config(mut self, tls_option: TlsOption) -> Result<Self> {
170 if tls_option.watch {
173 return Err(Error::NotSupported {
174 feat: "Certificates watch and reloading for gRPC is not supported at the moment"
175 .to_string(),
176 });
177 }
178 self.tls_config = if tls_option.should_force_tls() {
179 let cert = std::fs::read_to_string(tls_option.cert_path)
180 .context(InvalidConfigFilePathSnafu)?;
181 let key =
182 std::fs::read_to_string(tls_option.key_path).context(InvalidConfigFilePathSnafu)?;
183 let identity = Identity::from_pem(cert, key);
184 Some(ServerTlsConfig::new().identity(identity))
185 } else {
186 None
187 };
188 Ok(self)
189 }
190
191 pub fn build(self) -> GrpcServer {
192 GrpcServer {
193 routes: Mutex::new(Some(self.routes_builder.routes())),
194 shutdown_tx: Mutex::new(None),
195 serve_state: Mutex::new(None),
196 tls_config: self.tls_config,
197 otel_arrow_service: Mutex::new(self.otel_arrow_service),
198 bind_addr: None,
199 name: self.name,
200 }
201 }
202}