1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use axum::extract::{Request, State};
20use axum::middleware::Next;
21use axum::response::IntoResponse;
22use common_base::Plugins;
23use common_config::Configurable;
24use common_telemetry::info;
25use meta_client::MetaClientOptions;
26use servers::error::Error as ServerError;
27use servers::grpc::builder::GrpcServerBuilder;
28use servers::grpc::flight::FlightCraftRef;
29use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
30use servers::grpc::greptime_handler::GreptimeRequestHandler;
31use servers::grpc::{GrpcOptions, GrpcServer};
32use servers::http::event::LogValidatorRef;
33use servers::http::result::error_result::ErrorResponse;
34use servers::http::utils::router::RouterConfigurator;
35use servers::http::{HttpServer, HttpServerBuilder};
36use servers::interceptor::LogIngestInterceptorRef;
37use servers::metrics_handler::MetricsHandler;
38use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
39use servers::otel_arrow::OtelArrowServiceHandler;
40use servers::postgres::PostgresServer;
41use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
42use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
43use servers::server::{Server, ServerHandlers};
44use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
45use snafu::ResultExt;
46use tonic::Status;
47
48use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
49use crate::frontend::FrontendOptions;
50use crate::instance::Instance;
51
52pub struct Services<T>
53where
54 T: Into<FrontendOptions> + Configurable + Clone,
55{
56 opts: T,
57 instance: Arc<Instance>,
58 grpc_server_builder: Option<GrpcServerBuilder>,
59 http_server_builder: Option<HttpServerBuilder>,
60 plugins: Plugins,
61 flight_handler: Option<FlightCraftRef>,
62}
63
64impl<T> Services<T>
65where
66 T: Into<FrontendOptions> + Configurable + Clone,
67{
68 pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
69 Self {
70 opts,
71 instance,
72 grpc_server_builder: None,
73 http_server_builder: None,
74 plugins,
75 flight_handler: None,
76 }
77 }
78
79 pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
80 let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
81 .with_tls_config(opts.tls.clone())
82 .context(error::InvalidTlsConfigSnafu)?;
83 Ok(builder)
84 }
85
86 pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
87 let mut builder = HttpServerBuilder::new(opts.http.clone())
88 .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
89
90 let validator = self.plugins.get::<LogValidatorRef>();
91 let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
92 builder =
93 builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
94 builder = builder.with_logs_handler(self.instance.clone());
95
96 if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
97 builder = builder.with_user_provider(user_provider);
98 }
99
100 if opts.opentsdb.enable {
101 builder = builder.with_opentsdb_handler(self.instance.clone());
102 }
103
104 if opts.influxdb.enable {
105 builder = builder.with_influxdb_handler(self.instance.clone());
106 }
107
108 if opts.prom_store.enable {
109 builder = builder
110 .with_prom_handler(
111 self.instance.clone(),
112 Some(self.instance.clone()),
113 opts.prom_store.with_metric_engine,
114 opts.http.prom_validation_mode,
115 )
116 .with_prometheus_handler(self.instance.clone());
117 }
118
119 if opts.otlp.enable {
120 builder = builder
121 .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
122 }
123
124 if opts.jaeger.enable {
125 builder = builder.with_jaeger_handler(self.instance.clone());
126 }
127
128 if let Some(configurator) = self.plugins.get::<RouterConfigurator>() {
129 info!("Adding extra router from plugins");
130 builder = builder.with_extra_router(configurator.router());
131 }
132
133 builder.add_layer(axum::middleware::from_fn_with_state(
134 self.instance.clone(),
135 async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
136 if state.is_suspended() {
137 return ErrorResponse::from_error(servers::error::SuspendedSnafu.build())
138 .into_response();
139 }
140 next.run(request).await
141 },
142 ))
143 }
144
145 pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
146 Self {
147 grpc_server_builder: Some(builder),
148 ..self
149 }
150 }
151
152 pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
153 Self {
154 http_server_builder: Some(builder),
155 ..self
156 }
157 }
158
159 pub fn with_flight_handler(self, flight_handler: FlightCraftRef) -> Self {
160 Self {
161 flight_handler: Some(flight_handler),
162 ..self
163 }
164 }
165
166 fn build_grpc_server(
167 &mut self,
168 grpc: &GrpcOptions,
169 meta_client: &Option<MetaClientOptions>,
170 name: Option<String>,
171 external: bool,
172 ) -> Result<GrpcServer> {
173 let builder = if let Some(builder) = self.grpc_server_builder.take() {
174 builder
175 } else {
176 self.grpc_server_builder(grpc)?
177 };
178
179 let user_provider = if external {
180 self.plugins.get::<UserProviderRef>()
181 } else {
182 None
184 };
185
186 let runtime = if meta_client.is_none() {
188 Some(builder.runtime().clone())
189 } else {
190 None
191 };
192
193 let greptime_request_handler = GreptimeRequestHandler::new(
194 ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
195 user_provider.clone(),
196 runtime,
197 grpc.flight_compression,
198 );
199
200 let flight_handler = self
202 .flight_handler
203 .clone()
204 .unwrap_or_else(|| Arc::new(greptime_request_handler.clone()) as FlightCraftRef);
205
206 let grpc_server = builder
207 .name(name)
208 .database_handler(greptime_request_handler.clone())
209 .prometheus_handler(self.instance.clone(), user_provider.clone())
210 .otel_arrow_handler(OtelArrowServiceHandler::new(
211 self.instance.clone(),
212 user_provider.clone(),
213 ))
214 .flight_handler(flight_handler)
215 .add_layer(axum::middleware::from_fn_with_state(
216 self.instance.clone(),
217 async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
218 if state.is_suspended() {
219 let status = Status::from(servers::error::SuspendedSnafu.build());
220 return status.into_http();
221 }
222 next.run(request).await
223 },
224 ));
225
226 let grpc_server = if !external {
227 let frontend_grpc_handler =
228 FrontendGrpcHandler::new(self.instance.process_manager().clone());
229 grpc_server.frontend_grpc_handler(frontend_grpc_handler)
230 } else {
231 grpc_server
232 }
233 .build();
234
235 Ok(grpc_server)
236 }
237
238 fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result<HttpServer> {
239 let builder = if let Some(builder) = self.http_server_builder.take() {
240 builder
241 } else {
242 self.http_server_builder(opts)
243 };
244
245 let http_server = builder
246 .with_metrics_handler(MetricsHandler)
247 .with_plugins(self.plugins.clone())
248 .with_greptime_config_options(toml)
249 .build();
250 Ok(http_server)
251 }
252
253 pub fn build(mut self) -> Result<ServerHandlers> {
254 let opts = self.opts.clone();
255 let instance = self.instance.clone();
256
257 let toml = opts.to_toml().context(TomlFormatSnafu)?;
258 let opts: FrontendOptions = opts.into();
259
260 let handlers = ServerHandlers::default();
261
262 let user_provider = self.plugins.get::<UserProviderRef>();
263
264 {
265 let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
267 let grpc_server = self.build_grpc_server(&opts.grpc, &opts.meta_client, None, true)?;
268 handlers.insert((Box::new(grpc_server), grpc_addr));
269 }
270
271 if let Some(internal_grpc) = &opts.internal_grpc {
272 let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
274 let grpc_server = self.build_grpc_server(
275 internal_grpc,
276 &opts.meta_client,
277 Some("INTERNAL_GRPC_SERVER".to_string()),
278 false,
279 )?;
280 handlers.insert((Box::new(grpc_server), grpc_addr));
281 }
282
283 {
284 let http_options = &opts.http;
286 let http_addr = parse_addr(&http_options.addr)?;
287 let http_server = self.build_http_server(&opts, toml)?;
288 handlers.insert((Box::new(http_server), http_addr));
289 }
290
291 if opts.mysql.enable {
292 let opts = &opts.mysql;
294 let mysql_addr = parse_addr(&opts.addr)?;
295
296 let tls_server_config = Arc::new(
297 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
298 );
299
300 maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
302
303 let mysql_server = MysqlServer::create_server(
304 common_runtime::global_runtime(),
305 Arc::new(MysqlSpawnRef::new(
306 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
307 user_provider.clone(),
308 )),
309 Arc::new(MysqlSpawnConfig::new(
310 opts.tls.should_force_tls(),
311 tls_server_config,
312 opts.keep_alive.as_secs(),
313 opts.reject_no_database.unwrap_or(false),
314 opts.prepared_stmt_cache_size,
315 )),
316 Some(instance.process_manager().clone()),
317 );
318 handlers.insert((mysql_server, mysql_addr));
319 }
320
321 if opts.postgres.enable {
322 let opts = &opts.postgres;
324 let pg_addr = parse_addr(&opts.addr)?;
325
326 let tls_server_config = Arc::new(
327 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
328 );
329
330 maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
331
332 let pg_server = Box::new(PostgresServer::new(
333 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
334 opts.tls.should_force_tls(),
335 tls_server_config,
336 opts.keep_alive.as_secs(),
337 common_runtime::global_runtime(),
338 user_provider.clone(),
339 Some(self.instance.process_manager().clone()),
340 )) as Box<dyn Server>;
341
342 handlers.insert((pg_server, pg_addr));
343 }
344
345 Ok(handlers)
346 }
347}
348
349fn parse_addr(addr: &str) -> Result<SocketAddr> {
350 addr.parse().context(error::ParseAddrSnafu { addr })
351}