servers/grpc/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::convert::Infallible;
16
17use api::v1::frontend::frontend_server::FrontendServer;
18use api::v1::greptime_database_server::GreptimeDatabaseServer;
19use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
20use api::v1::region::region_server::RegionServer;
21use arrow_flight::flight_service_server::FlightServiceServer;
22use auth::UserProviderRef;
23use axum::extract::Request;
24use axum::response::IntoResponse;
25use axum::routing::Route;
26use common_grpc::error::{InvalidConfigFilePathSnafu, Result};
27use common_runtime::Runtime;
28use common_telemetry::warn;
29use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
30use snafu::ResultExt;
31use tokio::sync::Mutex;
32use tonic::codec::CompressionEncoding;
33use tonic::codegen::Service;
34use tonic::service::RoutesBuilder;
35use tonic::service::interceptor::InterceptedService;
36use tonic::transport::{Identity, ServerTlsConfig};
37use tower::Layer;
38
39use crate::grpc::database::DatabaseService;
40use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper};
41use crate::grpc::frontend_grpc_handler::FrontendGrpcHandler;
42use crate::grpc::greptime_handler::GreptimeRequestHandler;
43use crate::grpc::prom_query_gateway::PrometheusGatewayService;
44use crate::grpc::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
45use crate::grpc::{GrpcServer, GrpcServerConfig};
46use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
47use crate::prometheus_handler::PrometheusHandlerRef;
48use crate::query_handler::OpenTelemetryProtocolHandlerRef;
49use crate::request_limiter::RequestMemoryLimiter;
50use crate::tls::TlsOption;
51
52/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
53/// This macro will automatically add some gRPC properties to the service.
54#[macro_export]
55macro_rules! add_service {
56    ($builder: ident, $service: expr) => {
57        let max_recv_message_size = $builder.config().max_recv_message_size;
58        let max_send_message_size = $builder.config().max_send_message_size;
59
60        use tonic::codec::CompressionEncoding;
61        let service_builder = $service
62            .max_decoding_message_size(max_recv_message_size)
63            .max_encoding_message_size(max_send_message_size)
64            .accept_compressed(CompressionEncoding::Gzip)
65            .accept_compressed(CompressionEncoding::Zstd)
66            .send_compressed(CompressionEncoding::Gzip)
67            .send_compressed(CompressionEncoding::Zstd);
68
69        // Apply memory limiter layer
70        use $crate::grpc::memory_limit::MemoryLimiterExtensionLayer;
71        let service_with_limiter = $crate::tower::ServiceBuilder::new()
72            .layer(MemoryLimiterExtensionLayer::new(
73                $builder.memory_limiter().clone(),
74            ))
75            .service(service_builder);
76
77        $builder
78            .routes_builder_mut()
79            .add_service(service_with_limiter);
80    };
81}
82
83pub struct GrpcServerBuilder {
84    name: Option<String>,
85    config: GrpcServerConfig,
86    runtime: Runtime,
87    routes_builder: RoutesBuilder,
88    tls_config: Option<ServerTlsConfig>,
89    otel_arrow_service: Option<
90        InterceptedService<
91            ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
92            HeaderInterceptor,
93        >,
94    >,
95    memory_limiter: RequestMemoryLimiter,
96}
97
98impl GrpcServerBuilder {
99    pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
100        let memory_limiter = RequestMemoryLimiter::new(config.max_total_message_memory);
101        Self {
102            name: None,
103            config,
104            runtime,
105            routes_builder: RoutesBuilder::default(),
106            tls_config: None,
107            otel_arrow_service: None,
108            memory_limiter,
109        }
110    }
111
112    pub fn config(&self) -> &GrpcServerConfig {
113        &self.config
114    }
115
116    pub fn runtime(&self) -> &Runtime {
117        &self.runtime
118    }
119
120    pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
121        &self.memory_limiter
122    }
123
124    pub fn name(self, name: Option<String>) -> Self {
125        Self { name, ..self }
126    }
127
128    /// Add handler for [DatabaseService] service.
129    pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
130        add_service!(
131            self,
132            GreptimeDatabaseServer::new(DatabaseService::new(database_handler))
133        );
134        self
135    }
136
137    /// Add handler for Prometheus-compatible PromQL queries ([PrometheusGateway]).
138    pub fn prometheus_handler(
139        mut self,
140        prometheus_handler: PrometheusHandlerRef,
141        user_provider: Option<UserProviderRef>,
142    ) -> Self {
143        add_service!(
144            self,
145            PrometheusGatewayServer::new(PrometheusGatewayService::new(
146                prometheus_handler,
147                user_provider,
148            ))
149        );
150        self
151    }
152
153    /// Add handler for [FlightService](arrow_flight::flight_service_server::FlightService).
154    pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self {
155        add_service!(
156            self,
157            FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
158        );
159        self
160    }
161
162    /// Add handler for Frontend gRPC service.
163    pub fn frontend_grpc_handler(mut self, handler: FrontendGrpcHandler) -> Self {
164        add_service!(self, FrontendServer::new(handler));
165        self
166    }
167
168    /// Add handler for [OtelArrowService].
169    pub fn otel_arrow_handler(
170        mut self,
171        handler: OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>,
172    ) -> Self {
173        let mut server = ArrowMetricsServiceServer::new(handler);
174        server = server
175            .max_decoding_message_size(self.config.max_recv_message_size)
176            .max_encoding_message_size(self.config.max_send_message_size)
177            .accept_compressed(CompressionEncoding::Zstd)
178            .send_compressed(CompressionEncoding::Zstd);
179        let svc = InterceptedService::new(server, HeaderInterceptor {});
180        self.otel_arrow_service = Some(svc);
181        self
182    }
183
184    /// Add handler for [RegionServer].
185    pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
186        let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone());
187        add_service!(self, RegionServer::new(handler));
188        self
189    }
190
191    pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
192        &mut self.routes_builder
193    }
194
195    pub fn with_tls_config(mut self, tls_option: TlsOption) -> Result<Self> {
196        // tonic does not support watching for tls config changes
197        // so we don't support it either for now
198        if tls_option.watch {
199            warn!("Certificates watch and reloading for gRPC is NOT supported at the moment");
200        }
201        self.tls_config = if tls_option.should_force_tls() {
202            let cert = std::fs::read_to_string(tls_option.cert_path)
203                .context(InvalidConfigFilePathSnafu)?;
204            let key =
205                std::fs::read_to_string(tls_option.key_path).context(InvalidConfigFilePathSnafu)?;
206            let identity = Identity::from_pem(cert, key);
207            Some(ServerTlsConfig::new().identity(identity))
208        } else {
209            None
210        };
211        Ok(self)
212    }
213
214    pub fn add_layer<L>(self, layer: L) -> Self
215    where
216        L: Layer<Route> + Clone + Send + Sync + 'static,
217        L::Service: Service<Request> + Clone + Send + Sync + 'static,
218        <L::Service as Service<Request>>::Response: IntoResponse + 'static,
219        <L::Service as Service<Request>>::Error: Into<Infallible> + 'static,
220        <L::Service as Service<Request>>::Future: Send + 'static,
221    {
222        let routes = self.routes_builder.routes();
223        let router = routes.into_axum_router();
224        let router = router.layer(layer);
225        Self {
226            routes_builder: RoutesBuilder::from(router),
227            ..self
228        }
229    }
230
231    pub fn build(self) -> GrpcServer {
232        GrpcServer {
233            routes: Mutex::new(Some(self.routes_builder.routes())),
234            shutdown_tx: Mutex::new(None),
235            serve_state: Mutex::new(None),
236            tls_config: self.tls_config,
237            otel_arrow_service: Mutex::new(self.otel_arrow_service),
238            bind_addr: None,
239            name: self.name,
240            config: self.config,
241            memory_limiter: self.memory_limiter,
242        }
243    }
244}