1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use common_config::Configurable;
19use servers::grpc::builder::GrpcServerBuilder;
20use servers::grpc::{GrpcServer, GrpcServerConfig};
21use servers::http::HttpServerBuilder;
22use servers::metrics_handler::MetricsHandler;
23use servers::server::{ServerHandler, ServerHandlers};
24use snafu::ResultExt;
25
26use crate::config::DatanodeOptions;
27use crate::error::{ParseAddrSnafu, Result, TomlFormatSnafu};
28use crate::region_server::RegionServer;
29
30pub struct DatanodeServiceBuilder<'a> {
31 opts: &'a DatanodeOptions,
32 grpc_server: Option<GrpcServer>,
33 enable_http_service: bool,
34}
35
36impl<'a> DatanodeServiceBuilder<'a> {
37 pub fn new(opts: &'a DatanodeOptions) -> Self {
38 Self {
39 opts,
40 grpc_server: None,
41 enable_http_service: false,
42 }
43 }
44
45 pub fn with_grpc_server(self, grpc_server: GrpcServer) -> Self {
46 Self {
47 grpc_server: Some(grpc_server),
48 ..self
49 }
50 }
51
52 pub fn with_default_grpc_server(mut self, region_server: &RegionServer) -> Self {
53 let grpc_server = Self::grpc_server_builder(self.opts, region_server).build();
54 self.grpc_server = Some(grpc_server);
55 self
56 }
57
58 pub fn enable_http_service(self) -> Self {
59 Self {
60 enable_http_service: true,
61 ..self
62 }
63 }
64
65 pub fn build(mut self) -> Result<ServerHandlers> {
66 let handlers = ServerHandlers::default();
67
68 if let Some(grpc_server) = self.grpc_server.take() {
69 let addr: SocketAddr = self.opts.grpc.bind_addr.parse().context(ParseAddrSnafu {
70 addr: &self.opts.grpc.bind_addr,
71 })?;
72 let handler: ServerHandler = (Box::new(grpc_server), addr);
73 handlers.insert(handler);
74 }
75
76 if self.enable_http_service {
77 let http_server = HttpServerBuilder::new(self.opts.http.clone())
78 .with_metrics_handler(MetricsHandler)
79 .with_greptime_config_options(self.opts.to_toml().context(TomlFormatSnafu)?)
80 .build();
81 let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
82 addr: &self.opts.http.addr,
83 })?;
84 let handler: ServerHandler = (Box::new(http_server), addr);
85 handlers.insert(handler);
86 }
87
88 Ok(handlers)
89 }
90
91 pub fn grpc_server_builder(
92 opts: &DatanodeOptions,
93 region_server: &RegionServer,
94 ) -> GrpcServerBuilder {
95 let config = GrpcServerConfig {
96 max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
97 max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
98 tls: opts.grpc.tls.clone(),
99 };
100
101 GrpcServerBuilder::new(config, region_server.runtime())
102 .flight_handler(Arc::new(region_server.clone()))
103 .region_server_handler(Arc::new(region_server.clone()))
104 }
105}