1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use common_base::Plugins;
20use common_config::Configurable;
21use meta_client::MetaClientOptions;
22use servers::error::Error as ServerError;
23use servers::grpc::builder::GrpcServerBuilder;
24use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
25use servers::grpc::greptime_handler::GreptimeRequestHandler;
26use servers::grpc::{GrpcOptions, GrpcServer};
27use servers::http::event::LogValidatorRef;
28use servers::http::{HttpServer, HttpServerBuilder};
29use servers::interceptor::LogIngestInterceptorRef;
30use servers::metrics_handler::MetricsHandler;
31use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
32use servers::otel_arrow::OtelArrowServiceHandler;
33use servers::postgres::PostgresServer;
34use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
35use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
36use servers::server::{Server, ServerHandlers};
37use servers::tls::{maybe_watch_tls_config, ReloadableTlsServerConfig};
38use snafu::ResultExt;
39
40use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
41use crate::frontend::FrontendOptions;
42use crate::instance::Instance;
43
44pub struct Services<T>
45where
46 T: Into<FrontendOptions> + Configurable + Clone,
47{
48 opts: T,
49 instance: Arc<Instance>,
50 grpc_server_builder: Option<GrpcServerBuilder>,
51 http_server_builder: Option<HttpServerBuilder>,
52 plugins: Plugins,
53}
54
55impl<T> Services<T>
56where
57 T: Into<FrontendOptions> + Configurable + Clone,
58{
59 pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
60 Self {
61 opts,
62 instance,
63 grpc_server_builder: None,
64 http_server_builder: None,
65 plugins,
66 }
67 }
68
69 pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
70 let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
71 .with_tls_config(opts.tls.clone())
72 .context(error::InvalidTlsConfigSnafu)?;
73 Ok(builder)
74 }
75
76 pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
77 let mut builder = HttpServerBuilder::new(opts.http.clone())
78 .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
79
80 let validator = self.plugins.get::<LogValidatorRef>();
81 let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
82 builder =
83 builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
84 builder = builder.with_logs_handler(self.instance.clone());
85
86 if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
87 builder = builder.with_user_provider(user_provider);
88 }
89
90 if opts.opentsdb.enable {
91 builder = builder.with_opentsdb_handler(self.instance.clone());
92 }
93
94 if opts.influxdb.enable {
95 builder = builder.with_influxdb_handler(self.instance.clone());
96 }
97
98 if opts.prom_store.enable {
99 builder = builder
100 .with_prom_handler(
101 self.instance.clone(),
102 Some(self.instance.clone()),
103 opts.prom_store.with_metric_engine,
104 opts.http.prom_validation_mode,
105 )
106 .with_prometheus_handler(self.instance.clone());
107 }
108
109 if opts.otlp.enable {
110 builder = builder
111 .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
112 }
113
114 if opts.jaeger.enable {
115 builder = builder.with_jaeger_handler(self.instance.clone());
116 }
117
118 builder
119 }
120
121 pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
122 Self {
123 grpc_server_builder: Some(builder),
124 ..self
125 }
126 }
127
128 pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
129 Self {
130 http_server_builder: Some(builder),
131 ..self
132 }
133 }
134
135 fn build_grpc_server(
136 &mut self,
137 grpc: &GrpcOptions,
138 meta_client: &Option<MetaClientOptions>,
139 name: Option<String>,
140 external: bool,
141 ) -> Result<GrpcServer> {
142 let builder = if let Some(builder) = self.grpc_server_builder.take() {
143 builder
144 } else {
145 self.grpc_server_builder(grpc)?
146 };
147
148 let user_provider = if external {
149 self.plugins.get::<UserProviderRef>()
150 } else {
151 None
153 };
154
155 let runtime = if meta_client.is_none() {
157 Some(builder.runtime().clone())
158 } else {
159 None
160 };
161
162 let greptime_request_handler = GreptimeRequestHandler::new(
163 ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
164 user_provider.clone(),
165 runtime,
166 grpc.flight_compression,
167 );
168
169 let grpc_server = builder
170 .name(name)
171 .database_handler(greptime_request_handler.clone())
172 .prometheus_handler(self.instance.clone(), user_provider.clone())
173 .otel_arrow_handler(OtelArrowServiceHandler::new(
174 self.instance.clone(),
175 user_provider.clone(),
176 ))
177 .flight_handler(Arc::new(greptime_request_handler));
178
179 let grpc_server = if !external {
180 let frontend_grpc_handler =
181 FrontendGrpcHandler::new(self.instance.process_manager().clone());
182 grpc_server.frontend_grpc_handler(frontend_grpc_handler)
183 } else {
184 grpc_server
185 }
186 .build();
187
188 Ok(grpc_server)
189 }
190
191 fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result<HttpServer> {
192 let builder = if let Some(builder) = self.http_server_builder.take() {
193 builder
194 } else {
195 self.http_server_builder(opts)
196 };
197
198 let http_server = builder
199 .with_metrics_handler(MetricsHandler)
200 .with_plugins(self.plugins.clone())
201 .with_greptime_config_options(toml)
202 .build();
203 Ok(http_server)
204 }
205
206 pub fn build(mut self) -> Result<ServerHandlers> {
207 let opts = self.opts.clone();
208 let instance = self.instance.clone();
209
210 let toml = opts.to_toml().context(TomlFormatSnafu)?;
211 let opts: FrontendOptions = opts.into();
212
213 let handlers = ServerHandlers::default();
214
215 let user_provider = self.plugins.get::<UserProviderRef>();
216
217 {
218 let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
220 let grpc_server = self.build_grpc_server(&opts.grpc, &opts.meta_client, None, true)?;
221 handlers.insert((Box::new(grpc_server), grpc_addr));
222 }
223
224 if let Some(internal_grpc) = &opts.internal_grpc {
225 let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
227 let grpc_server = self.build_grpc_server(
228 internal_grpc,
229 &opts.meta_client,
230 Some("INTERNAL_GRPC_SERVER".to_string()),
231 false,
232 )?;
233 handlers.insert((Box::new(grpc_server), grpc_addr));
234 }
235
236 {
237 let http_options = &opts.http;
239 let http_addr = parse_addr(&http_options.addr)?;
240 let http_server = self.build_http_server(&opts, toml)?;
241 handlers.insert((Box::new(http_server), http_addr));
242 }
243
244 if opts.mysql.enable {
245 let opts = &opts.mysql;
247 let mysql_addr = parse_addr(&opts.addr)?;
248
249 let tls_server_config = Arc::new(
250 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
251 );
252
253 maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
255
256 let mysql_server = MysqlServer::create_server(
257 common_runtime::global_runtime(),
258 Arc::new(MysqlSpawnRef::new(
259 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
260 user_provider.clone(),
261 )),
262 Arc::new(MysqlSpawnConfig::new(
263 opts.tls.should_force_tls(),
264 tls_server_config,
265 opts.keep_alive.as_secs(),
266 opts.reject_no_database.unwrap_or(false),
267 opts.prepared_stmt_cache_size,
268 )),
269 Some(instance.process_manager().clone()),
270 );
271 handlers.insert((mysql_server, mysql_addr));
272 }
273
274 if opts.postgres.enable {
275 let opts = &opts.postgres;
277 let pg_addr = parse_addr(&opts.addr)?;
278
279 let tls_server_config = Arc::new(
280 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
281 );
282
283 maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
284
285 let pg_server = Box::new(PostgresServer::new(
286 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
287 opts.tls.should_force_tls(),
288 tls_server_config,
289 opts.keep_alive.as_secs(),
290 common_runtime::global_runtime(),
291 user_provider.clone(),
292 Some(self.instance.process_manager().clone()),
293 )) as Box<dyn Server>;
294
295 handlers.insert((pg_server, pg_addr));
296 }
297
298 Ok(handlers)
299 }
300}
301
302fn parse_addr(addr: &str) -> Result<SocketAddr> {
303 addr.parse().context(error::ParseAddrSnafu { addr })
304}