servers/grpc/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::frontend::frontend_server::FrontendServer;
16use api::v1::greptime_database_server::GreptimeDatabaseServer;
17use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
18use api::v1::region::region_server::RegionServer;
19use arrow_flight::flight_service_server::FlightServiceServer;
20use auth::UserProviderRef;
21use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
22use common_runtime::Runtime;
23use otel_arrow_rust::opentelemetry::ArrowMetricsServiceServer;
24use snafu::ResultExt;
25use tokio::sync::Mutex;
26use tonic::codec::CompressionEncoding;
27use tonic::service::interceptor::InterceptedService;
28use tonic::service::RoutesBuilder;
29use tonic::transport::{Identity, ServerTlsConfig};
30
31use crate::grpc::database::DatabaseService;
32use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper};
33use crate::grpc::frontend_grpc_handler::FrontendGrpcHandler;
34use crate::grpc::greptime_handler::GreptimeRequestHandler;
35use crate::grpc::prom_query_gateway::PrometheusGatewayService;
36use crate::grpc::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
37use crate::grpc::{GrpcServer, GrpcServerConfig};
38use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
39use crate::prometheus_handler::PrometheusHandlerRef;
40use crate::query_handler::OpenTelemetryProtocolHandlerRef;
41use crate::tls::TlsOption;
42
43/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
44/// This macro will automatically add some gRPC properties to the service.
45#[macro_export]
46macro_rules! add_service {
47    ($builder: ident, $service: expr) => {
48        let max_recv_message_size = $builder.config().max_recv_message_size;
49        let max_send_message_size = $builder.config().max_send_message_size;
50
51        use tonic::codec::CompressionEncoding;
52        let service_builder = $service
53            .max_decoding_message_size(max_recv_message_size)
54            .max_encoding_message_size(max_send_message_size)
55            .accept_compressed(CompressionEncoding::Gzip)
56            .accept_compressed(CompressionEncoding::Zstd)
57            .send_compressed(CompressionEncoding::Gzip)
58            .send_compressed(CompressionEncoding::Zstd);
59
60        $builder.routes_builder_mut().add_service(service_builder);
61    };
62}
63
64pub struct GrpcServerBuilder {
65    config: GrpcServerConfig,
66    runtime: Runtime,
67    routes_builder: RoutesBuilder,
68    tls_config: Option<ServerTlsConfig>,
69    otel_arrow_service: Option<
70        InterceptedService<
71            ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
72            HeaderInterceptor,
73        >,
74    >,
75}
76
77impl GrpcServerBuilder {
78    pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
79        Self {
80            config,
81            runtime,
82            routes_builder: RoutesBuilder::default(),
83            tls_config: None,
84            otel_arrow_service: None,
85        }
86    }
87
88    pub fn config(&self) -> &GrpcServerConfig {
89        &self.config
90    }
91
92    pub fn runtime(&self) -> &Runtime {
93        &self.runtime
94    }
95
96    /// Add handler for [DatabaseService] service.
97    pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
98        add_service!(
99            self,
100            GreptimeDatabaseServer::new(DatabaseService::new(database_handler))
101        );
102        self
103    }
104
105    /// Add handler for Prometheus-compatible PromQL queries ([PrometheusGateway]).
106    pub fn prometheus_handler(
107        mut self,
108        prometheus_handler: PrometheusHandlerRef,
109        user_provider: Option<UserProviderRef>,
110    ) -> Self {
111        add_service!(
112            self,
113            PrometheusGatewayServer::new(PrometheusGatewayService::new(
114                prometheus_handler,
115                user_provider,
116            ))
117        );
118        self
119    }
120
121    /// Add handler for [FlightService](arrow_flight::flight_service_server::FlightService).
122    pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self {
123        add_service!(
124            self,
125            FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
126        );
127        self
128    }
129
130    /// Add handler for Frontend gRPC service.
131    pub fn frontend_grpc_handler(mut self, handler: FrontendGrpcHandler) -> Self {
132        add_service!(self, FrontendServer::new(handler));
133        self
134    }
135
136    /// Add handler for [OtelArrowService].
137    pub fn otel_arrow_handler(
138        mut self,
139        handler: OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>,
140    ) -> Self {
141        let mut server = ArrowMetricsServiceServer::new(handler);
142        server = server
143            .max_decoding_message_size(self.config.max_recv_message_size)
144            .max_encoding_message_size(self.config.max_send_message_size)
145            .accept_compressed(CompressionEncoding::Zstd)
146            .send_compressed(CompressionEncoding::Zstd);
147        let svc = InterceptedService::new(server, HeaderInterceptor {});
148        self.otel_arrow_service = Some(svc);
149        self
150    }
151
152    /// Add handler for [RegionServer].
153    pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
154        let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone());
155        add_service!(self, RegionServer::new(handler));
156        self
157    }
158
159    pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
160        &mut self.routes_builder
161    }
162
163    pub fn with_tls_config(mut self, tls_option: TlsOption) -> Result<Self> {
164        // tonic does not support watching for tls config changes
165        // so we don't support it either for now
166        if tls_option.watch {
167            return Err(Error::NotSupported {
168                feat: "Certificates watch and reloading for gRPC is not supported at the moment"
169                    .to_string(),
170            });
171        }
172        self.tls_config = if tls_option.should_force_tls() {
173            let cert = std::fs::read_to_string(tls_option.cert_path)
174                .context(InvalidConfigFilePathSnafu)?;
175            let key =
176                std::fs::read_to_string(tls_option.key_path).context(InvalidConfigFilePathSnafu)?;
177            let identity = Identity::from_pem(cert, key);
178            Some(ServerTlsConfig::new().identity(identity))
179        } else {
180            None
181        };
182        Ok(self)
183    }
184
185    pub fn build(self) -> GrpcServer {
186        GrpcServer {
187            routes: Mutex::new(Some(self.routes_builder.routes())),
188            shutdown_tx: Mutex::new(None),
189            serve_state: Mutex::new(None),
190            tls_config: self.tls_config,
191            otel_arrow_service: Mutex::new(self.otel_arrow_service),
192            bind_addr: None,
193        }
194    }
195}