datanode/
service.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use common_config::Configurable;
19use servers::grpc::builder::GrpcServerBuilder;
20use servers::grpc::{GrpcServer, GrpcServerConfig};
21use servers::http::HttpServerBuilder;
22use servers::metrics_handler::MetricsHandler;
23use servers::server::{ServerHandler, ServerHandlers};
24use snafu::ResultExt;
25
26use crate::config::DatanodeOptions;
27use crate::error::{ParseAddrSnafu, Result, TomlFormatSnafu};
28use crate::region_server::RegionServer;
29
30pub struct DatanodeServiceBuilder<'a> {
31    opts: &'a DatanodeOptions,
32    grpc_server: Option<GrpcServer>,
33    enable_http_service: bool,
34}
35
36impl<'a> DatanodeServiceBuilder<'a> {
37    pub fn new(opts: &'a DatanodeOptions) -> Self {
38        Self {
39            opts,
40            grpc_server: None,
41            enable_http_service: false,
42        }
43    }
44
45    pub fn with_grpc_server(self, grpc_server: GrpcServer) -> Self {
46        Self {
47            grpc_server: Some(grpc_server),
48            ..self
49        }
50    }
51
52    pub fn with_default_grpc_server(mut self, region_server: &RegionServer) -> Self {
53        let grpc_server = Self::grpc_server_builder(self.opts, region_server).build();
54        self.grpc_server = Some(grpc_server);
55        self
56    }
57
58    pub fn enable_http_service(self) -> Self {
59        Self {
60            enable_http_service: true,
61            ..self
62        }
63    }
64
65    pub fn build(mut self) -> Result<ServerHandlers> {
66        let handlers = ServerHandlers::default();
67
68        if let Some(grpc_server) = self.grpc_server.take() {
69            let addr: SocketAddr = self.opts.grpc.bind_addr.parse().context(ParseAddrSnafu {
70                addr: &self.opts.grpc.bind_addr,
71            })?;
72            let handler: ServerHandler = (Box::new(grpc_server), addr);
73            handlers.insert(handler);
74        }
75
76        if self.enable_http_service {
77            let http_server = HttpServerBuilder::new(self.opts.http.clone())
78                .with_metrics_handler(MetricsHandler)
79                .with_greptime_config_options(self.opts.to_toml().context(TomlFormatSnafu)?)
80                .build();
81            let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
82                addr: &self.opts.http.addr,
83            })?;
84            let handler: ServerHandler = (Box::new(http_server), addr);
85            handlers.insert(handler);
86        }
87
88        Ok(handlers)
89    }
90
91    pub fn grpc_server_builder(
92        opts: &DatanodeOptions,
93        region_server: &RegionServer,
94    ) -> GrpcServerBuilder {
95        let config = GrpcServerConfig {
96            max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
97            max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
98            tls: opts.grpc.tls.clone(),
99        };
100
101        GrpcServerBuilder::new(config, region_server.runtime())
102            .flight_handler(Arc::new(region_server.clone()))
103            .region_server_handler(Arc::new(region_server.clone()))
104    }
105}