servers/grpc/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::frontend::frontend_server::FrontendServer;
16use api::v1::greptime_database_server::GreptimeDatabaseServer;
17use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
18use api::v1::region::region_server::RegionServer;
19use arrow_flight::flight_service_server::FlightServiceServer;
20use auth::UserProviderRef;
21use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
22use common_runtime::Runtime;
23use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
24use snafu::ResultExt;
25use tokio::sync::Mutex;
26use tonic::codec::CompressionEncoding;
27use tonic::service::interceptor::InterceptedService;
28use tonic::service::RoutesBuilder;
29use tonic::transport::{Identity, ServerTlsConfig};
30
31use crate::grpc::database::DatabaseService;
32use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper};
33use crate::grpc::frontend_grpc_handler::FrontendGrpcHandler;
34use crate::grpc::greptime_handler::GreptimeRequestHandler;
35use crate::grpc::prom_query_gateway::PrometheusGatewayService;
36use crate::grpc::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
37use crate::grpc::{GrpcServer, GrpcServerConfig};
38use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
39use crate::prometheus_handler::PrometheusHandlerRef;
40use crate::query_handler::OpenTelemetryProtocolHandlerRef;
41use crate::tls::TlsOption;
42
43/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
44/// This macro will automatically add some gRPC properties to the service.
45#[macro_export]
46macro_rules! add_service {
47    ($builder: ident, $service: expr) => {
48        let max_recv_message_size = $builder.config().max_recv_message_size;
49        let max_send_message_size = $builder.config().max_send_message_size;
50
51        use tonic::codec::CompressionEncoding;
52        let service_builder = $service
53            .max_decoding_message_size(max_recv_message_size)
54            .max_encoding_message_size(max_send_message_size)
55            .accept_compressed(CompressionEncoding::Gzip)
56            .accept_compressed(CompressionEncoding::Zstd)
57            .send_compressed(CompressionEncoding::Gzip)
58            .send_compressed(CompressionEncoding::Zstd);
59
60        $builder.routes_builder_mut().add_service(service_builder);
61    };
62}
63
64pub struct GrpcServerBuilder {
65    name: Option<String>,
66    config: GrpcServerConfig,
67    runtime: Runtime,
68    routes_builder: RoutesBuilder,
69    tls_config: Option<ServerTlsConfig>,
70    otel_arrow_service: Option<
71        InterceptedService<
72            ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
73            HeaderInterceptor,
74        >,
75    >,
76}
77
78impl GrpcServerBuilder {
79    pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
80        Self {
81            name: None,
82            config,
83            runtime,
84            routes_builder: RoutesBuilder::default(),
85            tls_config: None,
86            otel_arrow_service: None,
87        }
88    }
89
90    pub fn config(&self) -> &GrpcServerConfig {
91        &self.config
92    }
93
94    pub fn runtime(&self) -> &Runtime {
95        &self.runtime
96    }
97
98    pub fn name(self, name: Option<String>) -> Self {
99        Self { name, ..self }
100    }
101
102    /// Add handler for [DatabaseService] service.
103    pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
104        add_service!(
105            self,
106            GreptimeDatabaseServer::new(DatabaseService::new(database_handler))
107        );
108        self
109    }
110
111    /// Add handler for Prometheus-compatible PromQL queries ([PrometheusGateway]).
112    pub fn prometheus_handler(
113        mut self,
114        prometheus_handler: PrometheusHandlerRef,
115        user_provider: Option<UserProviderRef>,
116    ) -> Self {
117        add_service!(
118            self,
119            PrometheusGatewayServer::new(PrometheusGatewayService::new(
120                prometheus_handler,
121                user_provider,
122            ))
123        );
124        self
125    }
126
127    /// Add handler for [FlightService](arrow_flight::flight_service_server::FlightService).
128    pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self {
129        add_service!(
130            self,
131            FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
132        );
133        self
134    }
135
136    /// Add handler for Frontend gRPC service.
137    pub fn frontend_grpc_handler(mut self, handler: FrontendGrpcHandler) -> Self {
138        add_service!(self, FrontendServer::new(handler));
139        self
140    }
141
142    /// Add handler for [OtelArrowService].
143    pub fn otel_arrow_handler(
144        mut self,
145        handler: OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>,
146    ) -> Self {
147        let mut server = ArrowMetricsServiceServer::new(handler);
148        server = server
149            .max_decoding_message_size(self.config.max_recv_message_size)
150            .max_encoding_message_size(self.config.max_send_message_size)
151            .accept_compressed(CompressionEncoding::Zstd)
152            .send_compressed(CompressionEncoding::Zstd);
153        let svc = InterceptedService::new(server, HeaderInterceptor {});
154        self.otel_arrow_service = Some(svc);
155        self
156    }
157
158    /// Add handler for [RegionServer].
159    pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
160        let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone());
161        add_service!(self, RegionServer::new(handler));
162        self
163    }
164
165    pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
166        &mut self.routes_builder
167    }
168
169    pub fn with_tls_config(mut self, tls_option: TlsOption) -> Result<Self> {
170        // tonic does not support watching for tls config changes
171        // so we don't support it either for now
172        if tls_option.watch {
173            return Err(Error::NotSupported {
174                feat: "Certificates watch and reloading for gRPC is not supported at the moment"
175                    .to_string(),
176            });
177        }
178        self.tls_config = if tls_option.should_force_tls() {
179            let cert = std::fs::read_to_string(tls_option.cert_path)
180                .context(InvalidConfigFilePathSnafu)?;
181            let key =
182                std::fs::read_to_string(tls_option.key_path).context(InvalidConfigFilePathSnafu)?;
183            let identity = Identity::from_pem(cert, key);
184            Some(ServerTlsConfig::new().identity(identity))
185        } else {
186            None
187        };
188        Ok(self)
189    }
190
191    pub fn build(self) -> GrpcServer {
192        GrpcServer {
193            routes: Mutex::new(Some(self.routes_builder.routes())),
194            shutdown_tx: Mutex::new(None),
195            serve_state: Mutex::new(None),
196            tls_config: self.tls_config,
197            otel_arrow_service: Mutex::new(self.otel_arrow_service),
198            bind_addr: None,
199            name: self.name,
200        }
201    }
202}