1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use axum::extract::{Request, State};
20use axum::middleware::Next;
21use axum::response::IntoResponse;
22use common_base::Plugins;
23use common_config::Configurable;
24use common_telemetry::info;
25use meta_client::MetaClientOptions;
26use servers::error::Error as ServerError;
27use servers::grpc::builder::GrpcServerBuilder;
28use servers::grpc::flight::FlightCraftRef;
29use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
30use servers::grpc::greptime_handler::GreptimeRequestHandler;
31use servers::grpc::{GrpcOptions, GrpcServer};
32use servers::http::event::LogValidatorRef;
33use servers::http::result::error_result::ErrorResponse;
34use servers::http::utils::router::RouterConfigurator;
35use servers::http::{HttpServer, HttpServerBuilder};
36use servers::interceptor::LogIngestInterceptorRef;
37use servers::metrics_handler::MetricsHandler;
38use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
39use servers::otel_arrow::OtelArrowServiceHandler;
40use servers::postgres::PostgresServer;
41use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
42use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
43use servers::request_memory_limiter::ServerMemoryLimiter;
44use servers::server::{Server, ServerHandlers};
45use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
46use snafu::ResultExt;
47use tonic::Status;
48
49use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
50use crate::frontend::FrontendOptions;
51use crate::instance::Instance;
52
53pub struct Services<T>
54where
55 T: Into<FrontendOptions> + Configurable + Clone,
56{
57 opts: T,
58 instance: Arc<Instance>,
59 grpc_server_builder: Option<GrpcServerBuilder>,
60 http_server_builder: Option<HttpServerBuilder>,
61 plugins: Plugins,
62 flight_handler: Option<FlightCraftRef>,
63 pub server_memory_limiter: ServerMemoryLimiter,
64}
65
66impl<T> Services<T>
67where
68 T: Into<FrontendOptions> + Configurable + Clone,
69{
70 pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
71 let feopts = opts.clone().into();
72 let server_memory_limiter = ServerMemoryLimiter::new(
74 feopts.max_in_flight_write_bytes.as_bytes(),
75 feopts.write_bytes_exhausted_policy,
76 );
77
78 Self {
79 opts,
80 instance,
81 grpc_server_builder: None,
82 http_server_builder: None,
83 plugins,
84 flight_handler: None,
85 server_memory_limiter,
86 }
87 }
88
89 pub fn grpc_server_builder(
90 &self,
91 opts: &GrpcOptions,
92 request_memory_limiter: ServerMemoryLimiter,
93 ) -> Result<GrpcServerBuilder> {
94 let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
95 .with_memory_limiter(request_memory_limiter)
96 .with_tls_config(opts.tls.clone())
97 .context(error::InvalidTlsConfigSnafu)?;
98 Ok(builder)
99 }
100
101 pub fn http_server_builder(
102 &self,
103 opts: &FrontendOptions,
104 request_memory_limiter: ServerMemoryLimiter,
105 ) -> HttpServerBuilder {
106 let mut builder = HttpServerBuilder::new(opts.http.clone())
107 .with_memory_limiter(request_memory_limiter)
108 .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
109
110 let validator = self.plugins.get::<LogValidatorRef>();
111 let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
112 builder =
113 builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
114 builder = builder.with_logs_handler(self.instance.clone());
115
116 if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
117 builder = builder.with_user_provider(user_provider);
118 }
119
120 if opts.opentsdb.enable {
121 builder = builder.with_opentsdb_handler(self.instance.clone());
122 }
123
124 if opts.influxdb.enable {
125 builder = builder.with_influxdb_handler(self.instance.clone());
126 }
127
128 if opts.prom_store.enable {
129 builder = builder
130 .with_prom_handler(
131 self.instance.clone(),
132 Some(self.instance.clone()),
133 opts.prom_store.with_metric_engine,
134 opts.http.prom_validation_mode,
135 )
136 .with_prometheus_handler(self.instance.clone());
137 }
138
139 if opts.otlp.enable {
140 builder = builder
141 .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
142 }
143
144 if opts.jaeger.enable {
145 builder = builder.with_jaeger_handler(self.instance.clone());
146 }
147
148 if let Some(configurator) = self.plugins.get::<RouterConfigurator>() {
149 info!("Adding extra router from plugins");
150 builder = builder.with_extra_router(configurator.router());
151 }
152
153 builder.add_layer(axum::middleware::from_fn_with_state(
154 self.instance.clone(),
155 async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
156 if state.is_suspended() {
157 return ErrorResponse::from_error(servers::error::SuspendedSnafu.build())
158 .into_response();
159 }
160 next.run(request).await
161 },
162 ))
163 }
164
165 pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
166 Self {
167 grpc_server_builder: Some(builder),
168 ..self
169 }
170 }
171
172 pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
173 Self {
174 http_server_builder: Some(builder),
175 ..self
176 }
177 }
178
179 pub fn with_flight_handler(self, flight_handler: FlightCraftRef) -> Self {
180 Self {
181 flight_handler: Some(flight_handler),
182 ..self
183 }
184 }
185
186 fn build_grpc_server(
187 &mut self,
188 grpc: &GrpcOptions,
189 meta_client: &Option<MetaClientOptions>,
190 name: Option<String>,
191 external: bool,
192 request_memory_limiter: ServerMemoryLimiter,
193 ) -> Result<GrpcServer> {
194 let builder = if let Some(builder) = self.grpc_server_builder.take() {
195 builder
196 } else {
197 self.grpc_server_builder(grpc, request_memory_limiter)?
198 };
199
200 let user_provider = if external {
201 self.plugins.get::<UserProviderRef>()
202 } else {
203 None
205 };
206
207 let runtime = if meta_client.is_none() {
209 Some(builder.runtime().clone())
210 } else {
211 None
212 };
213
214 let greptime_request_handler = GreptimeRequestHandler::new(
215 ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
216 user_provider.clone(),
217 runtime,
218 grpc.flight_compression,
219 );
220
221 let flight_handler = self
223 .flight_handler
224 .clone()
225 .unwrap_or_else(|| Arc::new(greptime_request_handler.clone()) as FlightCraftRef);
226
227 let grpc_server = builder
228 .name(name)
229 .database_handler(greptime_request_handler.clone())
230 .prometheus_handler(self.instance.clone(), user_provider.clone())
231 .otel_arrow_handler(OtelArrowServiceHandler::new(
232 self.instance.clone(),
233 user_provider.clone(),
234 ))
235 .flight_handler(flight_handler)
236 .add_layer(axum::middleware::from_fn_with_state(
237 self.instance.clone(),
238 async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
239 if state.is_suspended() {
240 let status = Status::from(servers::error::SuspendedSnafu.build());
241 return status.into_http();
242 }
243 next.run(request).await
244 },
245 ));
246
247 let grpc_server = if !external {
248 let frontend_grpc_handler =
249 FrontendGrpcHandler::new(self.instance.process_manager().clone());
250 grpc_server.frontend_grpc_handler(frontend_grpc_handler)
251 } else {
252 grpc_server
253 }
254 .build();
255
256 Ok(grpc_server)
257 }
258
259 fn build_http_server(
260 &mut self,
261 opts: &FrontendOptions,
262 toml: String,
263 request_memory_limiter: ServerMemoryLimiter,
264 ) -> Result<HttpServer> {
265 let builder = if let Some(builder) = self.http_server_builder.take() {
266 builder
267 } else {
268 self.http_server_builder(opts, request_memory_limiter)
269 };
270
271 let http_server = builder
272 .with_metrics_handler(MetricsHandler)
273 .with_plugins(self.plugins.clone())
274 .with_greptime_config_options(toml)
275 .build();
276 Ok(http_server)
277 }
278
279 pub fn build(mut self) -> Result<ServerHandlers> {
280 let opts = self.opts.clone();
281 let instance = self.instance.clone();
282
283 let toml = opts.to_toml().context(TomlFormatSnafu)?;
284 let opts: FrontendOptions = opts.into();
285
286 let handlers = ServerHandlers::default();
287
288 let user_provider = self.plugins.get::<UserProviderRef>();
289
290 {
291 let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
293 let grpc_server = self.build_grpc_server(
294 &opts.grpc,
295 &opts.meta_client,
296 None,
297 true,
298 self.server_memory_limiter.clone(),
299 )?;
300 handlers.insert((Box::new(grpc_server), grpc_addr));
301 }
302
303 if let Some(internal_grpc) = &opts.internal_grpc {
304 let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
306 let grpc_server = self.build_grpc_server(
307 internal_grpc,
308 &opts.meta_client,
309 Some("INTERNAL_GRPC_SERVER".to_string()),
310 false,
311 self.server_memory_limiter.clone(),
312 )?;
313 handlers.insert((Box::new(grpc_server), grpc_addr));
314 }
315
316 {
317 let http_options = &opts.http;
319 let http_addr = parse_addr(&http_options.addr)?;
320 let http_server =
321 self.build_http_server(&opts, toml, self.server_memory_limiter.clone())?;
322 handlers.insert((Box::new(http_server), http_addr));
323 }
324
325 if opts.mysql.enable {
326 let opts = &opts.mysql;
328 let mysql_addr = parse_addr(&opts.addr)?;
329
330 let tls_server_config = Arc::new(
331 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
332 );
333
334 maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
336
337 let mysql_server = MysqlServer::create_server(
338 common_runtime::global_runtime(),
339 Arc::new(MysqlSpawnRef::new(
340 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
341 user_provider.clone(),
342 )),
343 Arc::new(MysqlSpawnConfig::new(
344 opts.tls.should_force_tls(),
345 tls_server_config,
346 opts.keep_alive.as_secs(),
347 opts.reject_no_database.unwrap_or(false),
348 opts.prepared_stmt_cache_size,
349 )),
350 Some(instance.process_manager().clone()),
351 );
352 handlers.insert((mysql_server, mysql_addr));
353 }
354
355 if opts.postgres.enable {
356 let opts = &opts.postgres;
358 let pg_addr = parse_addr(&opts.addr)?;
359
360 let tls_server_config = Arc::new(
361 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
362 );
363
364 maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
365
366 let pg_server = Box::new(PostgresServer::new(
367 ServerSqlQueryHandlerAdapter::arc(instance.clone()),
368 opts.tls.should_force_tls(),
369 tls_server_config,
370 opts.keep_alive.as_secs(),
371 common_runtime::global_runtime(),
372 user_provider.clone(),
373 Some(self.instance.process_manager().clone()),
374 )) as Box<dyn Server>;
375
376 handlers.insert((pg_server, pg_addr));
377 }
378
379 Ok(handlers)
380 }
381}
382
383fn parse_addr(addr: &str) -> Result<SocketAddr> {
384 addr.parse().context(error::ParseAddrSnafu { addr })
385}