servers/grpc/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::convert::Infallible;
16
17use api::v1::frontend::frontend_server::FrontendServer;
18use api::v1::greptime_database_server::GreptimeDatabaseServer;
19use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
20use api::v1::region::region_server::RegionServer;
21use arrow_flight::flight_service_server::FlightServiceServer;
22use auth::UserProviderRef;
23use axum::extract::Request;
24use axum::response::IntoResponse;
25use axum::routing::Route;
26use common_grpc::error::{InvalidConfigFilePathSnafu, Result};
27use common_runtime::Runtime;
28use common_telemetry::warn;
29use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
30use snafu::ResultExt;
31use tokio::sync::Mutex;
32use tonic::codec::CompressionEncoding;
33use tonic::codegen::Service;
34use tonic::service::RoutesBuilder;
35use tonic::service::interceptor::InterceptedService;
36use tonic::transport::{Identity, ServerTlsConfig};
37use tower::Layer;
38
39use crate::grpc::database::DatabaseService;
40use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper};
41use crate::grpc::frontend_grpc_handler::FrontendGrpcHandler;
42use crate::grpc::greptime_handler::GreptimeRequestHandler;
43use crate::grpc::prom_query_gateway::PrometheusGatewayService;
44use crate::grpc::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
45use crate::grpc::{GrpcServer, GrpcServerConfig};
46use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
47use crate::prometheus_handler::PrometheusHandlerRef;
48use crate::query_handler::OpenTelemetryProtocolHandlerRef;
49use crate::request_memory_limiter::ServerMemoryLimiter;
50use crate::tls::TlsOption;
51
52/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
53/// This macro will automatically add some gRPC properties to the service.
54#[macro_export]
55macro_rules! add_service {
56    ($builder: ident, $service: expr) => {
57        let max_recv_message_size = $builder.config().max_recv_message_size;
58        let max_send_message_size = $builder.config().max_send_message_size;
59
60        use tonic::codec::CompressionEncoding;
61        let service_builder = $service
62            .max_decoding_message_size(max_recv_message_size)
63            .max_encoding_message_size(max_send_message_size)
64            .accept_compressed(CompressionEncoding::Gzip)
65            .accept_compressed(CompressionEncoding::Zstd)
66            .send_compressed(CompressionEncoding::Gzip)
67            .send_compressed(CompressionEncoding::Zstd);
68
69        // Apply memory limiter layer
70        use $crate::grpc::memory_limit::MemoryLimiterExtensionLayer;
71        let service_with_limiter = $crate::tower::ServiceBuilder::new()
72            .layer(MemoryLimiterExtensionLayer::new(
73                $builder.memory_limiter().clone(),
74            ))
75            .service(service_builder);
76
77        $builder
78            .routes_builder_mut()
79            .add_service(service_with_limiter);
80    };
81}
82
83pub struct GrpcServerBuilder {
84    name: Option<String>,
85    config: GrpcServerConfig,
86    runtime: Runtime,
87    routes_builder: RoutesBuilder,
88    tls_config: Option<ServerTlsConfig>,
89    otel_arrow_service: Option<
90        InterceptedService<
91            ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
92            HeaderInterceptor,
93        >,
94    >,
95    memory_limiter: ServerMemoryLimiter,
96}
97
98impl GrpcServerBuilder {
99    pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
100        // Create a default unlimited limiter (can be overridden with with_memory_limiter)
101        let memory_limiter = ServerMemoryLimiter::default();
102
103        Self {
104            name: None,
105            config,
106            runtime,
107            routes_builder: RoutesBuilder::default(),
108            tls_config: None,
109            otel_arrow_service: None,
110            memory_limiter,
111        }
112    }
113
114    /// Set a global memory limiter for all server protocols.
115    pub fn with_memory_limiter(mut self, limiter: ServerMemoryLimiter) -> Self {
116        self.memory_limiter = limiter;
117        self
118    }
119
120    pub fn config(&self) -> &GrpcServerConfig {
121        &self.config
122    }
123
124    pub fn runtime(&self) -> &Runtime {
125        &self.runtime
126    }
127
128    pub fn memory_limiter(&self) -> &ServerMemoryLimiter {
129        &self.memory_limiter
130    }
131
132    pub fn name(self, name: Option<String>) -> Self {
133        Self { name, ..self }
134    }
135
136    /// Add handler for [DatabaseService] service.
137    pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
138        add_service!(
139            self,
140            GreptimeDatabaseServer::new(DatabaseService::new(database_handler))
141        );
142        self
143    }
144
145    /// Add handler for Prometheus-compatible PromQL queries ([PrometheusGateway]).
146    pub fn prometheus_handler(
147        mut self,
148        prometheus_handler: PrometheusHandlerRef,
149        user_provider: Option<UserProviderRef>,
150    ) -> Self {
151        add_service!(
152            self,
153            PrometheusGatewayServer::new(PrometheusGatewayService::new(
154                prometheus_handler,
155                user_provider,
156            ))
157        );
158        self
159    }
160
161    /// Add handler for [FlightService](arrow_flight::flight_service_server::FlightService).
162    pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self {
163        add_service!(
164            self,
165            FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
166        );
167        self
168    }
169
170    /// Add handler for Frontend gRPC service.
171    pub fn frontend_grpc_handler(mut self, handler: FrontendGrpcHandler) -> Self {
172        add_service!(self, FrontendServer::new(handler));
173        self
174    }
175
176    /// Add handler for [OtelArrowService].
177    pub fn otel_arrow_handler(
178        mut self,
179        handler: OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>,
180    ) -> Self {
181        let mut server = ArrowMetricsServiceServer::new(handler);
182        server = server
183            .max_decoding_message_size(self.config.max_recv_message_size)
184            .max_encoding_message_size(self.config.max_send_message_size)
185            .accept_compressed(CompressionEncoding::Zstd)
186            .send_compressed(CompressionEncoding::Zstd);
187        let svc = InterceptedService::new(server, HeaderInterceptor {});
188        self.otel_arrow_service = Some(svc);
189        self
190    }
191
192    /// Add handler for [RegionServer].
193    pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
194        let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone());
195        add_service!(self, RegionServer::new(handler));
196        self
197    }
198
199    pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
200        &mut self.routes_builder
201    }
202
203    pub fn with_tls_config(mut self, tls_option: TlsOption) -> Result<Self> {
204        // tonic does not support watching for tls config changes
205        // so we don't support it either for now
206        if tls_option.watch {
207            warn!("Certificates watch and reloading for gRPC is NOT supported at the moment");
208        }
209        self.tls_config = if tls_option.should_force_tls() {
210            let cert = std::fs::read_to_string(tls_option.cert_path)
211                .context(InvalidConfigFilePathSnafu)?;
212            let key =
213                std::fs::read_to_string(tls_option.key_path).context(InvalidConfigFilePathSnafu)?;
214            let identity = Identity::from_pem(cert, key);
215            Some(ServerTlsConfig::new().identity(identity))
216        } else {
217            None
218        };
219        Ok(self)
220    }
221
222    pub fn add_layer<L>(self, layer: L) -> Self
223    where
224        L: Layer<Route> + Clone + Send + Sync + 'static,
225        L::Service: Service<Request> + Clone + Send + Sync + 'static,
226        <L::Service as Service<Request>>::Response: IntoResponse + 'static,
227        <L::Service as Service<Request>>::Error: Into<Infallible> + 'static,
228        <L::Service as Service<Request>>::Future: Send + 'static,
229    {
230        let routes = self.routes_builder.routes();
231        let router = routes.into_axum_router();
232        let router = router.layer(layer);
233        Self {
234            routes_builder: RoutesBuilder::from(router),
235            ..self
236        }
237    }
238
239    pub fn build(self) -> GrpcServer {
240        GrpcServer {
241            routes: Mutex::new(Some(self.routes_builder.routes())),
242            shutdown_tx: Mutex::new(None),
243            serve_state: Mutex::new(None),
244            tls_config: self.tls_config,
245            otel_arrow_service: Mutex::new(self.otel_arrow_service),
246            bind_addr: None,
247            name: self.name,
248            config: self.config,
249        }
250    }
251}