1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use common_base::Plugins;
20use common_config::{Configurable, Mode};
21use servers::error::Error as ServerError;
22use servers::grpc::builder::GrpcServerBuilder;
23use servers::grpc::greptime_handler::GreptimeRequestHandler;
24use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
25use servers::http::event::LogValidatorRef;
26use servers::http::{HttpServer, HttpServerBuilder};
27use servers::interceptor::LogIngestInterceptorRef;
28use servers::metrics_handler::MetricsHandler;
29use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
30use servers::otel_arrow::OtelArrowServiceHandler;
31use servers::postgres::PostgresServer;
32use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
33use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
34use servers::server::{Server, ServerHandlers};
35use servers::tls::{maybe_watch_tls_config, ReloadableTlsServerConfig};
36use snafu::ResultExt;
37
38use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
39use crate::frontend::FrontendOptions;
40use crate::instance::Instance;
41
42pub struct Services<T>
43where
44 T: Into<FrontendOptions> + Configurable + Clone,
45{
46 opts: T,
47 instance: Arc<Instance>,
48 grpc_server_builder: Option<GrpcServerBuilder>,
49 http_server_builder: Option<HttpServerBuilder>,
50 plugins: Plugins,
51}
52
53impl<T> Services<T>
54where
55 T: Into<FrontendOptions> + Configurable + Clone,
56{
57 pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
58 Self {
59 opts,
60 instance,
61 grpc_server_builder: None,
62 http_server_builder: None,
63 plugins,
64 }
65 }
66
67 pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
68 let grpc_config = GrpcServerConfig {
69 max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize,
70 max_send_message_size: opts.max_send_message_size.as_bytes() as usize,
71 tls: opts.tls.clone(),
72 };
73 let builder = GrpcServerBuilder::new(grpc_config, common_runtime::global_runtime())
74 .with_tls_config(opts.tls.clone())
75 .context(error::InvalidTlsConfigSnafu)?;
76 Ok(builder)
77 }
78
79 pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
80 let mut builder = HttpServerBuilder::new(opts.http.clone())
81 .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
82
83 let validator = self.plugins.get::<LogValidatorRef>();
84 let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
85 builder =
86 builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
87 builder = builder.with_logs_handler(self.instance.clone());
88
89 if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
90 builder = builder.with_user_provider(user_provider);
91 }
92
93 if opts.opentsdb.enable {
94 builder = builder.with_opentsdb_handler(self.instance.clone());
95 }
96
97 if opts.influxdb.enable {
98 builder = builder.with_influxdb_handler(self.instance.clone());
99 }
100
101 if opts.prom_store.enable {
102 builder = builder
103 .with_prom_handler(
104 self.instance.clone(),
105 opts.prom_store.with_metric_engine,
106 opts.http.is_strict_mode,
107 )
108 .with_prometheus_handler(self.instance.clone());
109 }
110
111 if opts.otlp.enable {
112 builder = builder.with_otlp_handler(self.instance.clone());
113 }
114
115 if opts.jaeger.enable {
116 builder = builder.with_jaeger_handler(self.instance.clone());
117 }
118
119 builder
120 }
121
122 pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
123 Self {
124 grpc_server_builder: Some(builder),
125 ..self
126 }
127 }
128
129 pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
130 Self {
131 http_server_builder: Some(builder),
132 ..self
133 }
134 }
135
136 fn build_grpc_server(&mut self, opts: &FrontendOptions) -> Result<GrpcServer> {
137 let builder = if let Some(builder) = self.grpc_server_builder.take() {
138 builder
139 } else {
140 self.grpc_server_builder(&opts.grpc)?
141 };
142
143 let user_provider = self.plugins.get::<UserProviderRef>();
144
145 let mode = if opts.meta_client.is_none() {
147 Mode::Standalone
148 } else {
149 Mode::Distributed
150 };
151
152 let runtime = match mode {
153 Mode::Standalone => Some(builder.runtime().clone()),
154 _ => None,
155 };
156
157 let greptime_request_handler = GreptimeRequestHandler::new(
158 ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
159 user_provider.clone(),
160 runtime,
161 );
162
163 let grpc_server = builder
164 .database_handler(greptime_request_handler.clone())
165 .prometheus_handler(self.instance.clone(), user_provider.clone())
166 .otel_arrow_handler(OtelArrowServiceHandler(self.instance.clone()))
167 .flight_handler(Arc::new(greptime_request_handler))
168 .build();
169 Ok(grpc_server)
170 }
171
172 fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result<HttpServer> {
173 let builder = if let Some(builder) = self.http_server_builder.take() {
174 builder
175 } else {
176 self.http_server_builder(opts)
177 };
178
179 let http_server = builder
180 .with_metrics_handler(MetricsHandler)
181 .with_plugins(self.plugins.clone())
182 .with_greptime_config_options(toml)
183 .build();
184 Ok(http_server)
185 }
186
187 pub async fn build(mut self) -> Result<ServerHandlers> {
188 let opts = self.opts.clone();
189 let instance = self.instance.clone();
190
191 let toml = opts.to_toml().context(TomlFormatSnafu)?;
192 let opts: FrontendOptions = opts.into();
193
194 let handlers = ServerHandlers::default();
195
196 let user_provider = self.plugins.get::<UserProviderRef>();
197
198 {
199 let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
201 let grpc_server = self.build_grpc_server(&opts)?;
202 handlers.insert((Box::new(grpc_server), grpc_addr)).await;
203 }
204
205 {
206 let http_options = &opts.http;
208 let http_addr = parse_addr(&http_options.addr)?;
209 let http_server = self.build_http_server(&opts, toml)?;
210 handlers.insert((Box::new(http_server), http_addr)).await;
211 }
212
213 if opts.mysql.enable {
214 let opts = &opts.mysql;
216 let mysql_addr = parse_addr(&opts.addr)?;
217
218 let tls_server_config = Arc::new(
219 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
220 );
221
222 maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
224
225 let mysql_server = MysqlServer::create_server(
226 common_runtime::global_runtime(),
227 Arc::new(MysqlSpawnRef::new(
228 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
229 user_provider.clone(),
230 )),
231 Arc::new(MysqlSpawnConfig::new(
232 opts.tls.should_force_tls(),
233 tls_server_config,
234 opts.keep_alive.as_secs(),
235 opts.reject_no_database.unwrap_or(false),
236 )),
237 );
238 handlers.insert((mysql_server, mysql_addr)).await;
239 }
240
241 if opts.postgres.enable {
242 let opts = &opts.postgres;
244 let pg_addr = parse_addr(&opts.addr)?;
245
246 let tls_server_config = Arc::new(
247 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
248 );
249
250 maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
251
252 let pg_server = Box::new(PostgresServer::new(
253 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
254 opts.tls.should_force_tls(),
255 tls_server_config,
256 opts.keep_alive.as_secs(),
257 common_runtime::global_runtime(),
258 user_provider.clone(),
259 )) as Box<dyn Server>;
260
261 handlers.insert((pg_server, pg_addr)).await;
262 }
263
264 Ok(handlers)
265 }
266}
267
268fn parse_addr(addr: &str) -> Result<SocketAddr> {
269 addr.parse().context(error::ParseAddrSnafu { addr })
270}