servers/grpc/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::frontend::frontend_server::FrontendServer;
16use api::v1::greptime_database_server::GreptimeDatabaseServer;
17use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
18use api::v1::region::region_server::RegionServer;
19use arrow_flight::flight_service_server::FlightServiceServer;
20use auth::UserProviderRef;
21use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
22use common_runtime::Runtime;
23use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
24use snafu::ResultExt;
25use tokio::sync::Mutex;
26use tonic::codec::CompressionEncoding;
27use tonic::service::RoutesBuilder;
28use tonic::service::interceptor::InterceptedService;
29use tonic::transport::{Identity, ServerTlsConfig};
30
31use crate::grpc::database::DatabaseService;
32use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper};
33use crate::grpc::frontend_grpc_handler::FrontendGrpcHandler;
34use crate::grpc::greptime_handler::GreptimeRequestHandler;
35use crate::grpc::prom_query_gateway::PrometheusGatewayService;
36use crate::grpc::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
37use crate::grpc::{GrpcServer, GrpcServerConfig};
38use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
39use crate::prometheus_handler::PrometheusHandlerRef;
40use crate::query_handler::OpenTelemetryProtocolHandlerRef;
41use crate::request_limiter::RequestMemoryLimiter;
42use crate::tls::TlsOption;
43
44/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
45/// This macro will automatically add some gRPC properties to the service.
46#[macro_export]
47macro_rules! add_service {
48    ($builder: ident, $service: expr) => {
49        let max_recv_message_size = $builder.config().max_recv_message_size;
50        let max_send_message_size = $builder.config().max_send_message_size;
51
52        use tonic::codec::CompressionEncoding;
53        let service_builder = $service
54            .max_decoding_message_size(max_recv_message_size)
55            .max_encoding_message_size(max_send_message_size)
56            .accept_compressed(CompressionEncoding::Gzip)
57            .accept_compressed(CompressionEncoding::Zstd)
58            .send_compressed(CompressionEncoding::Gzip)
59            .send_compressed(CompressionEncoding::Zstd);
60
61        // Apply memory limiter layer
62        use $crate::grpc::memory_limit::MemoryLimiterExtensionLayer;
63        let service_with_limiter = $crate::tower::ServiceBuilder::new()
64            .layer(MemoryLimiterExtensionLayer::new(
65                $builder.memory_limiter().clone(),
66            ))
67            .service(service_builder);
68
69        $builder
70            .routes_builder_mut()
71            .add_service(service_with_limiter);
72    };
73}
74
75pub struct GrpcServerBuilder {
76    name: Option<String>,
77    config: GrpcServerConfig,
78    runtime: Runtime,
79    routes_builder: RoutesBuilder,
80    tls_config: Option<ServerTlsConfig>,
81    otel_arrow_service: Option<
82        InterceptedService<
83            ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
84            HeaderInterceptor,
85        >,
86    >,
87    memory_limiter: RequestMemoryLimiter,
88}
89
90impl GrpcServerBuilder {
91    pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
92        let memory_limiter = RequestMemoryLimiter::new(config.max_total_message_memory);
93        Self {
94            name: None,
95            config,
96            runtime,
97            routes_builder: RoutesBuilder::default(),
98            tls_config: None,
99            otel_arrow_service: None,
100            memory_limiter,
101        }
102    }
103
104    pub fn config(&self) -> &GrpcServerConfig {
105        &self.config
106    }
107
108    pub fn runtime(&self) -> &Runtime {
109        &self.runtime
110    }
111
112    pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
113        &self.memory_limiter
114    }
115
116    pub fn name(self, name: Option<String>) -> Self {
117        Self { name, ..self }
118    }
119
120    /// Add handler for [DatabaseService] service.
121    pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
122        add_service!(
123            self,
124            GreptimeDatabaseServer::new(DatabaseService::new(database_handler))
125        );
126        self
127    }
128
129    /// Add handler for Prometheus-compatible PromQL queries ([PrometheusGateway]).
130    pub fn prometheus_handler(
131        mut self,
132        prometheus_handler: PrometheusHandlerRef,
133        user_provider: Option<UserProviderRef>,
134    ) -> Self {
135        add_service!(
136            self,
137            PrometheusGatewayServer::new(PrometheusGatewayService::new(
138                prometheus_handler,
139                user_provider,
140            ))
141        );
142        self
143    }
144
145    /// Add handler for [FlightService](arrow_flight::flight_service_server::FlightService).
146    pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self {
147        add_service!(
148            self,
149            FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
150        );
151        self
152    }
153
154    /// Add handler for Frontend gRPC service.
155    pub fn frontend_grpc_handler(mut self, handler: FrontendGrpcHandler) -> Self {
156        add_service!(self, FrontendServer::new(handler));
157        self
158    }
159
160    /// Add handler for [OtelArrowService].
161    pub fn otel_arrow_handler(
162        mut self,
163        handler: OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>,
164    ) -> Self {
165        let mut server = ArrowMetricsServiceServer::new(handler);
166        server = server
167            .max_decoding_message_size(self.config.max_recv_message_size)
168            .max_encoding_message_size(self.config.max_send_message_size)
169            .accept_compressed(CompressionEncoding::Zstd)
170            .send_compressed(CompressionEncoding::Zstd);
171        let svc = InterceptedService::new(server, HeaderInterceptor {});
172        self.otel_arrow_service = Some(svc);
173        self
174    }
175
176    /// Add handler for [RegionServer].
177    pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
178        let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone());
179        add_service!(self, RegionServer::new(handler));
180        self
181    }
182
183    pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
184        &mut self.routes_builder
185    }
186
187    pub fn with_tls_config(mut self, tls_option: TlsOption) -> Result<Self> {
188        // tonic does not support watching for tls config changes
189        // so we don't support it either for now
190        if tls_option.watch {
191            return Err(Error::NotSupported {
192                feat: "Certificates watch and reloading for gRPC is not supported at the moment"
193                    .to_string(),
194            });
195        }
196        self.tls_config = if tls_option.should_force_tls() {
197            let cert = std::fs::read_to_string(tls_option.cert_path)
198                .context(InvalidConfigFilePathSnafu)?;
199            let key =
200                std::fs::read_to_string(tls_option.key_path).context(InvalidConfigFilePathSnafu)?;
201            let identity = Identity::from_pem(cert, key);
202            Some(ServerTlsConfig::new().identity(identity))
203        } else {
204            None
205        };
206        Ok(self)
207    }
208
209    pub fn build(self) -> GrpcServer {
210        GrpcServer {
211            routes: Mutex::new(Some(self.routes_builder.routes())),
212            shutdown_tx: Mutex::new(None),
213            serve_state: Mutex::new(None),
214            tls_config: self.tls_config,
215            otel_arrow_service: Mutex::new(self.otel_arrow_service),
216            bind_addr: None,
217            name: self.name,
218            config: self.config,
219            memory_limiter: self.memory_limiter,
220        }
221    }
222}