1use api::v1::greptime_database_server::GreptimeDatabaseServer;
16use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
17use api::v1::region::region_server::RegionServer;
18use arrow_flight::flight_service_server::FlightServiceServer;
19use auth::UserProviderRef;
20use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
21use common_runtime::Runtime;
22use otel_arrow_rust::opentelemetry::ArrowMetricsServiceServer;
23use snafu::ResultExt;
24use tokio::sync::Mutex;
25use tonic::codec::CompressionEncoding;
26use tonic::service::interceptor::InterceptedService;
27use tonic::service::RoutesBuilder;
28use tonic::transport::{Identity, ServerTlsConfig};
29
30use crate::grpc::database::DatabaseService;
31use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper};
32use crate::grpc::greptime_handler::GreptimeRequestHandler;
33use crate::grpc::prom_query_gateway::PrometheusGatewayService;
34use crate::grpc::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
35use crate::grpc::{GrpcServer, GrpcServerConfig};
36use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
37use crate::prometheus_handler::PrometheusHandlerRef;
38use crate::query_handler::OpenTelemetryProtocolHandlerRef;
39use crate::tls::TlsOption;
40
41#[macro_export]
44macro_rules! add_service {
45 ($builder: ident, $service: expr) => {
46 let max_recv_message_size = $builder.config().max_recv_message_size;
47 let max_send_message_size = $builder.config().max_send_message_size;
48
49 use tonic::codec::CompressionEncoding;
50 let service_builder = $service
51 .max_decoding_message_size(max_recv_message_size)
52 .max_encoding_message_size(max_send_message_size)
53 .accept_compressed(CompressionEncoding::Gzip)
54 .accept_compressed(CompressionEncoding::Zstd)
55 .send_compressed(CompressionEncoding::Gzip)
56 .send_compressed(CompressionEncoding::Zstd);
57
58 $builder.routes_builder_mut().add_service(service_builder);
59 };
60}
61
62pub struct GrpcServerBuilder {
63 config: GrpcServerConfig,
64 runtime: Runtime,
65 routes_builder: RoutesBuilder,
66 tls_config: Option<ServerTlsConfig>,
67 otel_arrow_service: Option<
68 InterceptedService<
69 ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
70 HeaderInterceptor,
71 >,
72 >,
73}
74
75impl GrpcServerBuilder {
76 pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
77 Self {
78 config,
79 runtime,
80 routes_builder: RoutesBuilder::default(),
81 tls_config: None,
82 otel_arrow_service: None,
83 }
84 }
85
86 pub fn config(&self) -> &GrpcServerConfig {
87 &self.config
88 }
89
90 pub fn runtime(&self) -> &Runtime {
91 &self.runtime
92 }
93
94 pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
96 add_service!(
97 self,
98 GreptimeDatabaseServer::new(DatabaseService::new(database_handler))
99 );
100 self
101 }
102
103 pub fn prometheus_handler(
105 mut self,
106 prometheus_handler: PrometheusHandlerRef,
107 user_provider: Option<UserProviderRef>,
108 ) -> Self {
109 add_service!(
110 self,
111 PrometheusGatewayServer::new(PrometheusGatewayService::new(
112 prometheus_handler,
113 user_provider,
114 ))
115 );
116 self
117 }
118
119 pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self {
121 add_service!(
122 self,
123 FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
124 );
125 self
126 }
127
128 pub fn otel_arrow_handler(
130 mut self,
131 handler: OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>,
132 ) -> Self {
133 let mut server = ArrowMetricsServiceServer::new(handler);
134 server = server
135 .max_decoding_message_size(self.config.max_recv_message_size)
136 .max_encoding_message_size(self.config.max_send_message_size)
137 .accept_compressed(CompressionEncoding::Zstd)
138 .send_compressed(CompressionEncoding::Zstd);
139 let svc = InterceptedService::new(server, HeaderInterceptor {});
140 self.otel_arrow_service = Some(svc);
141 self
142 }
143
144 pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
146 let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone());
147 add_service!(self, RegionServer::new(handler));
148 self
149 }
150
151 pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
152 &mut self.routes_builder
153 }
154
155 pub fn with_tls_config(mut self, tls_option: TlsOption) -> Result<Self> {
156 if tls_option.watch {
159 return Err(Error::NotSupported {
160 feat: "Certificates watch and reloading for gRPC is not supported at the moment"
161 .to_string(),
162 });
163 }
164 self.tls_config = if tls_option.should_force_tls() {
165 let cert = std::fs::read_to_string(tls_option.cert_path)
166 .context(InvalidConfigFilePathSnafu)?;
167 let key =
168 std::fs::read_to_string(tls_option.key_path).context(InvalidConfigFilePathSnafu)?;
169 let identity = Identity::from_pem(cert, key);
170 Some(ServerTlsConfig::new().identity(identity))
171 } else {
172 None
173 };
174 Ok(self)
175 }
176
177 pub fn build(self) -> GrpcServer {
178 GrpcServer {
179 routes: Mutex::new(Some(self.routes_builder.routes())),
180 shutdown_tx: Mutex::new(None),
181 serve_state: Mutex::new(None),
182 tls_config: self.tls_config,
183 otel_arrow_service: Mutex::new(self.otel_arrow_service),
184 bind_addr: None,
185 }
186 }
187}