1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use common_base::Plugins;
20use common_config::Configurable;
21use servers::error::Error as ServerError;
22use servers::grpc::builder::GrpcServerBuilder;
23use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
24use servers::grpc::greptime_handler::GreptimeRequestHandler;
25use servers::grpc::{GrpcOptions, GrpcServer};
26use servers::http::event::LogValidatorRef;
27use servers::http::{HttpServer, HttpServerBuilder};
28use servers::interceptor::LogIngestInterceptorRef;
29use servers::metrics_handler::MetricsHandler;
30use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
31use servers::otel_arrow::OtelArrowServiceHandler;
32use servers::postgres::PostgresServer;
33use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
34use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
35use servers::server::{Server, ServerHandlers};
36use servers::tls::{maybe_watch_tls_config, ReloadableTlsServerConfig};
37use snafu::ResultExt;
38
39use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
40use crate::frontend::FrontendOptions;
41use crate::instance::Instance;
42
43pub struct Services<T>
44where
45 T: Into<FrontendOptions> + Configurable + Clone,
46{
47 opts: T,
48 instance: Arc<Instance>,
49 grpc_server_builder: Option<GrpcServerBuilder>,
50 http_server_builder: Option<HttpServerBuilder>,
51 plugins: Plugins,
52}
53
54impl<T> Services<T>
55where
56 T: Into<FrontendOptions> + Configurable + Clone,
57{
58 pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
59 Self {
60 opts,
61 instance,
62 grpc_server_builder: None,
63 http_server_builder: None,
64 plugins,
65 }
66 }
67
68 pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
69 let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
70 .with_tls_config(opts.tls.clone())
71 .context(error::InvalidTlsConfigSnafu)?;
72 Ok(builder)
73 }
74
75 pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
76 let mut builder = HttpServerBuilder::new(opts.http.clone())
77 .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
78
79 let validator = self.plugins.get::<LogValidatorRef>();
80 let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
81 builder =
82 builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
83 builder = builder.with_logs_handler(self.instance.clone());
84
85 if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
86 builder = builder.with_user_provider(user_provider);
87 }
88
89 if opts.opentsdb.enable {
90 builder = builder.with_opentsdb_handler(self.instance.clone());
91 }
92
93 if opts.influxdb.enable {
94 builder = builder.with_influxdb_handler(self.instance.clone());
95 }
96
97 if opts.prom_store.enable {
98 builder = builder
99 .with_prom_handler(
100 self.instance.clone(),
101 Some(self.instance.clone()),
102 opts.prom_store.with_metric_engine,
103 opts.http.prom_validation_mode,
104 )
105 .with_prometheus_handler(self.instance.clone());
106 }
107
108 if opts.otlp.enable {
109 builder = builder.with_otlp_handler(self.instance.clone());
110 }
111
112 if opts.jaeger.enable {
113 builder = builder.with_jaeger_handler(self.instance.clone());
114 }
115
116 builder
117 }
118
119 pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
120 Self {
121 grpc_server_builder: Some(builder),
122 ..self
123 }
124 }
125
126 pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
127 Self {
128 http_server_builder: Some(builder),
129 ..self
130 }
131 }
132
133 fn build_grpc_server(&mut self, opts: &FrontendOptions) -> Result<GrpcServer> {
134 let builder = if let Some(builder) = self.grpc_server_builder.take() {
135 builder
136 } else {
137 self.grpc_server_builder(&opts.grpc)?
138 };
139
140 let user_provider = self.plugins.get::<UserProviderRef>();
141
142 let runtime = if opts.meta_client.is_none() {
144 Some(builder.runtime().clone())
145 } else {
146 None
147 };
148
149 let greptime_request_handler = GreptimeRequestHandler::new(
150 ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
151 user_provider.clone(),
152 runtime,
153 opts.grpc.flight_compression,
154 );
155
156 let frontend_grpc_handler =
157 FrontendGrpcHandler::new(self.instance.process_manager().clone());
158 let grpc_server = builder
159 .database_handler(greptime_request_handler.clone())
160 .prometheus_handler(self.instance.clone(), user_provider.clone())
161 .otel_arrow_handler(OtelArrowServiceHandler(self.instance.clone()))
162 .flight_handler(Arc::new(greptime_request_handler))
163 .frontend_grpc_handler(frontend_grpc_handler)
164 .build();
165 Ok(grpc_server)
166 }
167
168 fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result<HttpServer> {
169 let builder = if let Some(builder) = self.http_server_builder.take() {
170 builder
171 } else {
172 self.http_server_builder(opts)
173 };
174
175 let http_server = builder
176 .with_metrics_handler(MetricsHandler)
177 .with_plugins(self.plugins.clone())
178 .with_greptime_config_options(toml)
179 .build();
180 Ok(http_server)
181 }
182
183 pub fn build(mut self) -> Result<ServerHandlers> {
184 let opts = self.opts.clone();
185 let instance = self.instance.clone();
186
187 let toml = opts.to_toml().context(TomlFormatSnafu)?;
188 let opts: FrontendOptions = opts.into();
189
190 let handlers = ServerHandlers::default();
191
192 let user_provider = self.plugins.get::<UserProviderRef>();
193
194 {
195 let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
197 let grpc_server = self.build_grpc_server(&opts)?;
198 handlers.insert((Box::new(grpc_server), grpc_addr));
199 }
200
201 {
202 let http_options = &opts.http;
204 let http_addr = parse_addr(&http_options.addr)?;
205 let http_server = self.build_http_server(&opts, toml)?;
206 handlers.insert((Box::new(http_server), http_addr));
207 }
208
209 if opts.mysql.enable {
210 let opts = &opts.mysql;
212 let mysql_addr = parse_addr(&opts.addr)?;
213
214 let tls_server_config = Arc::new(
215 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
216 );
217
218 maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
220
221 let mysql_server = MysqlServer::create_server(
222 common_runtime::global_runtime(),
223 Arc::new(MysqlSpawnRef::new(
224 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
225 user_provider.clone(),
226 )),
227 Arc::new(MysqlSpawnConfig::new(
228 opts.tls.should_force_tls(),
229 tls_server_config,
230 opts.keep_alive.as_secs(),
231 opts.reject_no_database.unwrap_or(false),
232 )),
233 Some(instance.process_manager().clone()),
234 );
235 handlers.insert((mysql_server, mysql_addr));
236 }
237
238 if opts.postgres.enable {
239 let opts = &opts.postgres;
241 let pg_addr = parse_addr(&opts.addr)?;
242
243 let tls_server_config = Arc::new(
244 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
245 );
246
247 maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
248
249 let pg_server = Box::new(PostgresServer::new(
250 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
251 opts.tls.should_force_tls(),
252 tls_server_config,
253 opts.keep_alive.as_secs(),
254 common_runtime::global_runtime(),
255 user_provider.clone(),
256 Some(self.instance.process_manager().clone()),
257 )) as Box<dyn Server>;
258
259 handlers.insert((pg_server, pg_addr));
260 }
261
262 Ok(handlers)
263 }
264}
265
266fn parse_addr(addr: &str) -> Result<SocketAddr> {
267 addr.parse().context(error::ParseAddrSnafu { addr })
268}