servers/grpc/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::greptime_database_server::GreptimeDatabaseServer;
16use api::v1::prometheus_gateway_server::PrometheusGatewayServer;
17use api::v1::region::region_server::RegionServer;
18use arrow_flight::flight_service_server::FlightServiceServer;
19use auth::UserProviderRef;
20use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
21use common_runtime::Runtime;
22use otel_arrow_rust::opentelemetry::ArrowMetricsServiceServer;
23use snafu::ResultExt;
24use tokio::sync::Mutex;
25use tonic::codec::CompressionEncoding;
26use tonic::service::interceptor::InterceptedService;
27use tonic::service::RoutesBuilder;
28use tonic::transport::{Identity, ServerTlsConfig};
29
30use crate::grpc::database::DatabaseService;
31use crate::grpc::flight::{FlightCraftRef, FlightCraftWrapper};
32use crate::grpc::greptime_handler::GreptimeRequestHandler;
33use crate::grpc::prom_query_gateway::PrometheusGatewayService;
34use crate::grpc::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
35use crate::grpc::{GrpcServer, GrpcServerConfig};
36use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
37use crate::prometheus_handler::PrometheusHandlerRef;
38use crate::query_handler::OpenTelemetryProtocolHandlerRef;
39use crate::tls::TlsOption;
40
41/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
42/// This macro will automatically add some gRPC properties to the service.
43#[macro_export]
44macro_rules! add_service {
45    ($builder: ident, $service: expr) => {
46        let max_recv_message_size = $builder.config().max_recv_message_size;
47        let max_send_message_size = $builder.config().max_send_message_size;
48
49        use tonic::codec::CompressionEncoding;
50        let service_builder = $service
51            .max_decoding_message_size(max_recv_message_size)
52            .max_encoding_message_size(max_send_message_size)
53            .accept_compressed(CompressionEncoding::Gzip)
54            .accept_compressed(CompressionEncoding::Zstd)
55            .send_compressed(CompressionEncoding::Gzip)
56            .send_compressed(CompressionEncoding::Zstd);
57
58        $builder.routes_builder_mut().add_service(service_builder);
59    };
60}
61
62pub struct GrpcServerBuilder {
63    config: GrpcServerConfig,
64    runtime: Runtime,
65    routes_builder: RoutesBuilder,
66    tls_config: Option<ServerTlsConfig>,
67    otel_arrow_service: Option<
68        InterceptedService<
69            ArrowMetricsServiceServer<OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>>,
70            HeaderInterceptor,
71        >,
72    >,
73}
74
75impl GrpcServerBuilder {
76    pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
77        Self {
78            config,
79            runtime,
80            routes_builder: RoutesBuilder::default(),
81            tls_config: None,
82            otel_arrow_service: None,
83        }
84    }
85
86    pub fn config(&self) -> &GrpcServerConfig {
87        &self.config
88    }
89
90    pub fn runtime(&self) -> &Runtime {
91        &self.runtime
92    }
93
94    /// Add handler for [DatabaseService] service.
95    pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
96        add_service!(
97            self,
98            GreptimeDatabaseServer::new(DatabaseService::new(database_handler))
99        );
100        self
101    }
102
103    /// Add handler for Prometheus-compatible PromQL queries ([PrometheusGateway]).
104    pub fn prometheus_handler(
105        mut self,
106        prometheus_handler: PrometheusHandlerRef,
107        user_provider: Option<UserProviderRef>,
108    ) -> Self {
109        add_service!(
110            self,
111            PrometheusGatewayServer::new(PrometheusGatewayService::new(
112                prometheus_handler,
113                user_provider,
114            ))
115        );
116        self
117    }
118
119    /// Add handler for [FlightService](arrow_flight::flight_service_server::FlightService).
120    pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self {
121        add_service!(
122            self,
123            FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))
124        );
125        self
126    }
127
128    /// Add handler for [OtelArrowService].
129    pub fn otel_arrow_handler(
130        mut self,
131        handler: OtelArrowServiceHandler<OpenTelemetryProtocolHandlerRef>,
132    ) -> Self {
133        let mut server = ArrowMetricsServiceServer::new(handler);
134        server = server
135            .max_decoding_message_size(self.config.max_recv_message_size)
136            .max_encoding_message_size(self.config.max_send_message_size)
137            .accept_compressed(CompressionEncoding::Zstd)
138            .send_compressed(CompressionEncoding::Zstd);
139        let svc = InterceptedService::new(server, HeaderInterceptor {});
140        self.otel_arrow_service = Some(svc);
141        self
142    }
143
144    /// Add handler for [RegionServer].
145    pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
146        let handler = RegionServerRequestHandler::new(region_server_handler, self.runtime.clone());
147        add_service!(self, RegionServer::new(handler));
148        self
149    }
150
151    pub fn routes_builder_mut(&mut self) -> &mut RoutesBuilder {
152        &mut self.routes_builder
153    }
154
155    pub fn with_tls_config(mut self, tls_option: TlsOption) -> Result<Self> {
156        // tonic does not support watching for tls config changes
157        // so we don't support it either for now
158        if tls_option.watch {
159            return Err(Error::NotSupported {
160                feat: "Certificates watch and reloading for gRPC is not supported at the moment"
161                    .to_string(),
162            });
163        }
164        self.tls_config = if tls_option.should_force_tls() {
165            let cert = std::fs::read_to_string(tls_option.cert_path)
166                .context(InvalidConfigFilePathSnafu)?;
167            let key =
168                std::fs::read_to_string(tls_option.key_path).context(InvalidConfigFilePathSnafu)?;
169            let identity = Identity::from_pem(cert, key);
170            Some(ServerTlsConfig::new().identity(identity))
171        } else {
172            None
173        };
174        Ok(self)
175    }
176
177    pub fn build(self) -> GrpcServer {
178        GrpcServer {
179            routes: Mutex::new(Some(self.routes_builder.routes())),
180            shutdown_tx: Mutex::new(None),
181            serve_state: Mutex::new(None),
182            tls_config: self.tls_config,
183            otel_arrow_service: Mutex::new(self.otel_arrow_service),
184            bind_addr: None,
185        }
186    }
187}