1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use common_base::Plugins;
20use common_config::Configurable;
21use common_telemetry::info;
22use meta_client::MetaClientOptions;
23use servers::error::Error as ServerError;
24use servers::grpc::builder::GrpcServerBuilder;
25use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
26use servers::grpc::greptime_handler::GreptimeRequestHandler;
27use servers::grpc::{GrpcOptions, GrpcServer};
28use servers::http::event::LogValidatorRef;
29use servers::http::utils::router::RouterConfigurator;
30use servers::http::{HttpServer, HttpServerBuilder};
31use servers::interceptor::LogIngestInterceptorRef;
32use servers::metrics_handler::MetricsHandler;
33use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
34use servers::otel_arrow::OtelArrowServiceHandler;
35use servers::postgres::PostgresServer;
36use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
37use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
38use servers::server::{Server, ServerHandlers};
39use servers::tls::{ReloadableTlsServerConfig, maybe_watch_tls_config};
40use snafu::ResultExt;
41
42use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
43use crate::frontend::FrontendOptions;
44use crate::instance::Instance;
45
46pub struct Services<T>
47where
48 T: Into<FrontendOptions> + Configurable + Clone,
49{
50 opts: T,
51 instance: Arc<Instance>,
52 grpc_server_builder: Option<GrpcServerBuilder>,
53 http_server_builder: Option<HttpServerBuilder>,
54 plugins: Plugins,
55}
56
57impl<T> Services<T>
58where
59 T: Into<FrontendOptions> + Configurable + Clone,
60{
61 pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
62 Self {
63 opts,
64 instance,
65 grpc_server_builder: None,
66 http_server_builder: None,
67 plugins,
68 }
69 }
70
71 fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
72 let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
73 .with_tls_config(opts.tls.clone())
74 .context(error::InvalidTlsConfigSnafu)?;
75 Ok(builder)
76 }
77
78 pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
79 let mut builder = HttpServerBuilder::new(opts.http.clone())
80 .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
81
82 let validator = self.plugins.get::<LogValidatorRef>();
83 let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
84 builder =
85 builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
86 builder = builder.with_logs_handler(self.instance.clone());
87
88 if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
89 builder = builder.with_user_provider(user_provider);
90 }
91
92 if opts.opentsdb.enable {
93 builder = builder.with_opentsdb_handler(self.instance.clone());
94 }
95
96 if opts.influxdb.enable {
97 builder = builder.with_influxdb_handler(self.instance.clone());
98 }
99
100 if opts.prom_store.enable {
101 builder = builder
102 .with_prom_handler(
103 self.instance.clone(),
104 Some(self.instance.clone()),
105 opts.prom_store.with_metric_engine,
106 opts.http.prom_validation_mode,
107 )
108 .with_prometheus_handler(self.instance.clone());
109 }
110
111 if opts.otlp.enable {
112 builder = builder
113 .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
114 }
115
116 if opts.jaeger.enable {
117 builder = builder.with_jaeger_handler(self.instance.clone());
118 }
119
120 if let Some(configurator) = self.plugins.get::<RouterConfigurator>() {
121 info!("Adding extra router from plugins");
122 builder = builder.with_extra_router(configurator.router());
123 }
124
125 builder
126 }
127
128 pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
129 Self {
130 grpc_server_builder: Some(builder),
131 ..self
132 }
133 }
134
135 pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
136 Self {
137 http_server_builder: Some(builder),
138 ..self
139 }
140 }
141
142 fn build_grpc_server(
143 &mut self,
144 grpc: &GrpcOptions,
145 meta_client: &Option<MetaClientOptions>,
146 name: Option<String>,
147 external: bool,
148 ) -> Result<GrpcServer> {
149 let builder = if let Some(builder) = self.grpc_server_builder.take() {
150 builder
151 } else {
152 self.grpc_server_builder(grpc)?
153 };
154
155 let user_provider = if external {
156 self.plugins.get::<UserProviderRef>()
157 } else {
158 None
160 };
161
162 let runtime = if meta_client.is_none() {
164 Some(builder.runtime().clone())
165 } else {
166 None
167 };
168
169 let greptime_request_handler = GreptimeRequestHandler::new(
170 ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
171 user_provider.clone(),
172 runtime,
173 grpc.flight_compression,
174 );
175
176 let grpc_server = builder
177 .name(name)
178 .database_handler(greptime_request_handler.clone())
179 .prometheus_handler(self.instance.clone(), user_provider.clone())
180 .otel_arrow_handler(OtelArrowServiceHandler::new(
181 self.instance.clone(),
182 user_provider.clone(),
183 ))
184 .flight_handler(Arc::new(greptime_request_handler));
185
186 let grpc_server = if !external {
187 let frontend_grpc_handler =
188 FrontendGrpcHandler::new(self.instance.process_manager().clone());
189 grpc_server.frontend_grpc_handler(frontend_grpc_handler)
190 } else {
191 grpc_server
192 }
193 .build();
194
195 Ok(grpc_server)
196 }
197
198 fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result<HttpServer> {
199 let builder = if let Some(builder) = self.http_server_builder.take() {
200 builder
201 } else {
202 self.http_server_builder(opts)
203 };
204
205 let http_server = builder
206 .with_metrics_handler(MetricsHandler)
207 .with_plugins(self.plugins.clone())
208 .with_greptime_config_options(toml)
209 .build();
210 Ok(http_server)
211 }
212
213 pub fn build(mut self) -> Result<ServerHandlers> {
214 let opts = self.opts.clone();
215 let instance = self.instance.clone();
216
217 let toml = opts.to_toml().context(TomlFormatSnafu)?;
218 let opts: FrontendOptions = opts.into();
219
220 let handlers = ServerHandlers::default();
221
222 let user_provider = self.plugins.get::<UserProviderRef>();
223
224 {
225 let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
227 let grpc_server = self.build_grpc_server(&opts.grpc, &opts.meta_client, None, true)?;
228 handlers.insert((Box::new(grpc_server), grpc_addr));
229 }
230
231 if let Some(internal_grpc) = &opts.internal_grpc {
232 let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
234 let grpc_server = self.build_grpc_server(
235 internal_grpc,
236 &opts.meta_client,
237 Some("INTERNAL_GRPC_SERVER".to_string()),
238 false,
239 )?;
240 handlers.insert((Box::new(grpc_server), grpc_addr));
241 }
242
243 {
244 let http_options = &opts.http;
246 let http_addr = parse_addr(&http_options.addr)?;
247 let http_server = self.build_http_server(&opts, toml)?;
248 handlers.insert((Box::new(http_server), http_addr));
249 }
250
251 if opts.mysql.enable {
252 let opts = &opts.mysql;
254 let mysql_addr = parse_addr(&opts.addr)?;
255
256 let tls_server_config = Arc::new(
257 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
258 );
259
260 maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
262
263 let mysql_server = MysqlServer::create_server(
264 common_runtime::global_runtime(),
265 Arc::new(MysqlSpawnRef::new(
266 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
267 user_provider.clone(),
268 )),
269 Arc::new(MysqlSpawnConfig::new(
270 opts.tls.should_force_tls(),
271 tls_server_config,
272 opts.keep_alive.as_secs(),
273 opts.reject_no_database.unwrap_or(false),
274 opts.prepared_stmt_cache_size,
275 )),
276 Some(instance.process_manager().clone()),
277 );
278 handlers.insert((mysql_server, mysql_addr));
279 }
280
281 if opts.postgres.enable {
282 let opts = &opts.postgres;
284 let pg_addr = parse_addr(&opts.addr)?;
285
286 let tls_server_config = Arc::new(
287 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
288 );
289
290 maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
291
292 let pg_server = Box::new(PostgresServer::new(
293 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
294 opts.tls.should_force_tls(),
295 tls_server_config,
296 opts.keep_alive.as_secs(),
297 common_runtime::global_runtime(),
298 user_provider.clone(),
299 Some(self.instance.process_manager().clone()),
300 )) as Box<dyn Server>;
301
302 handlers.insert((pg_server, pg_addr));
303 }
304
305 Ok(handlers)
306 }
307}
308
309fn parse_addr(addr: &str) -> Result<SocketAddr> {
310 addr.parse().context(error::ParseAddrSnafu { addr })
311}