1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use axum::extract::{Request, State};
20use axum::middleware::Next;
21use axum::response::IntoResponse;
22use common_base::Plugins;
23use common_config::Configurable;
24use common_telemetry::info;
25use meta_client::MetaClientOptions;
26use servers::error::Error as ServerError;
27use servers::grpc::builder::GrpcServerBuilder;
28use servers::grpc::flight::FlightCraftRef;
29use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
30use servers::grpc::greptime_handler::GreptimeRequestHandler;
31use servers::grpc::{GrpcOptions, GrpcServer};
32use servers::http::event::LogValidatorRef;
33use servers::http::result::error_result::ErrorResponse;
34use servers::http::utils::router::RouterConfigurator;
35use servers::http::{HttpServer, HttpServerBuilder};
36use servers::interceptor::LogIngestInterceptorRef;
37use servers::metrics_handler::MetricsHandler;
38use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
39use servers::otel_arrow::OtelArrowServiceHandler;
40use servers::postgres::PostgresServer;
41use servers::request_memory_limiter::ServerMemoryLimiter;
42use servers::server::{Server, ServerHandlers};
43use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
44use snafu::ResultExt;
45use tonic::Status;
46
47use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
48use crate::frontend::FrontendOptions;
49use crate::instance::Instance;
50
51pub struct Services<T>
52where
53 T: Into<FrontendOptions> + Configurable + Clone,
54{
55 opts: T,
56 instance: Arc<Instance>,
57 grpc_server_builder: Option<GrpcServerBuilder>,
58 http_server_builder: Option<HttpServerBuilder>,
59 plugins: Plugins,
60 flight_handler: Option<FlightCraftRef>,
61 pub server_memory_limiter: ServerMemoryLimiter,
62}
63
64impl<T> Services<T>
65where
66 T: Into<FrontendOptions> + Configurable + Clone,
67{
68 pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
69 let feopts = opts.clone().into();
70 let server_memory_limiter = ServerMemoryLimiter::new(
72 feopts.max_in_flight_write_bytes.as_bytes(),
73 feopts.write_bytes_exhausted_policy,
74 );
75
76 Self {
77 opts,
78 instance,
79 grpc_server_builder: None,
80 http_server_builder: None,
81 plugins,
82 flight_handler: None,
83 server_memory_limiter,
84 }
85 }
86
87 pub fn grpc_server_builder(
88 &self,
89 opts: &GrpcOptions,
90 request_memory_limiter: ServerMemoryLimiter,
91 ) -> Result<GrpcServerBuilder> {
92 let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
93 .with_memory_limiter(request_memory_limiter)
94 .with_tls_config(opts.tls.clone())
95 .context(error::InvalidTlsConfigSnafu)?;
96 Ok(builder)
97 }
98
99 pub fn http_server_builder(
100 &self,
101 opts: &FrontendOptions,
102 request_memory_limiter: ServerMemoryLimiter,
103 ) -> HttpServerBuilder {
104 let mut builder = HttpServerBuilder::new(opts.http.clone())
105 .with_memory_limiter(request_memory_limiter)
106 .with_sql_handler(self.instance.clone());
107
108 let validator = self.plugins.get::<LogValidatorRef>();
109 let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
110 builder =
111 builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
112 builder = builder.with_logs_handler(self.instance.clone());
113
114 if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
115 builder = builder.with_user_provider(user_provider);
116 }
117
118 if opts.opentsdb.enable {
119 builder = builder.with_opentsdb_handler(self.instance.clone());
120 }
121
122 if opts.influxdb.enable {
123 builder = builder.with_influxdb_handler(self.instance.clone());
124 }
125
126 if opts.prom_store.enable {
127 builder = builder
128 .with_prom_handler(
129 self.instance.clone(),
130 Some(self.instance.clone()),
131 opts.prom_store.with_metric_engine,
132 opts.http.prom_validation_mode,
133 )
134 .with_prometheus_handler(self.instance.clone());
135 }
136
137 if opts.otlp.enable {
138 builder = builder
139 .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
140 }
141
142 if opts.jaeger.enable {
143 builder = builder.with_jaeger_handler(self.instance.clone());
144 }
145
146 if let Some(configurator) = self.plugins.get::<RouterConfigurator>() {
147 info!("Adding extra router from plugins");
148 builder = builder.with_extra_router(configurator.router());
149 }
150
151 builder.add_layer(axum::middleware::from_fn_with_state(
152 self.instance.clone(),
153 async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
154 if state.is_suspended() {
155 return ErrorResponse::from_error(servers::error::SuspendedSnafu.build())
156 .into_response();
157 }
158 next.run(request).await
159 },
160 ))
161 }
162
163 pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
164 Self {
165 grpc_server_builder: Some(builder),
166 ..self
167 }
168 }
169
170 pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
171 Self {
172 http_server_builder: Some(builder),
173 ..self
174 }
175 }
176
177 pub fn with_flight_handler(self, flight_handler: FlightCraftRef) -> Self {
178 Self {
179 flight_handler: Some(flight_handler),
180 ..self
181 }
182 }
183
184 fn build_grpc_server(
185 &mut self,
186 grpc: &GrpcOptions,
187 meta_client: &Option<MetaClientOptions>,
188 name: Option<String>,
189 external: bool,
190 request_memory_limiter: ServerMemoryLimiter,
191 ) -> Result<GrpcServer> {
192 let builder = if let Some(builder) = self.grpc_server_builder.take() {
193 builder
194 } else {
195 self.grpc_server_builder(grpc, request_memory_limiter)?
196 };
197
198 let user_provider = if external {
199 self.plugins.get::<UserProviderRef>()
200 } else {
201 None
203 };
204
205 let runtime = if meta_client.is_none() {
207 Some(builder.runtime().clone())
208 } else {
209 None
210 };
211
212 let greptime_request_handler = GreptimeRequestHandler::new(
213 self.instance.clone(),
214 user_provider.clone(),
215 runtime,
216 grpc.flight_compression,
217 );
218
219 let flight_handler = self
221 .flight_handler
222 .clone()
223 .unwrap_or_else(|| Arc::new(greptime_request_handler.clone()) as FlightCraftRef);
224
225 let grpc_server = builder
226 .name(name)
227 .database_handler(greptime_request_handler.clone())
228 .prometheus_handler(self.instance.clone(), user_provider.clone())
229 .otel_arrow_handler(OtelArrowServiceHandler::new(
230 self.instance.clone(),
231 user_provider.clone(),
232 ))
233 .flight_handler(flight_handler)
234 .add_layer(axum::middleware::from_fn_with_state(
235 self.instance.clone(),
236 async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
237 if state.is_suspended() {
238 let status = Status::from(servers::error::SuspendedSnafu.build());
239 return status.into_http();
240 }
241 next.run(request).await
242 },
243 ));
244
245 let grpc_server = if !external {
246 let frontend_grpc_handler =
247 FrontendGrpcHandler::new(self.instance.process_manager().clone());
248 grpc_server.frontend_grpc_handler(frontend_grpc_handler)
249 } else {
250 grpc_server
251 }
252 .build();
253
254 Ok(grpc_server)
255 }
256
257 fn build_http_server(
258 &mut self,
259 opts: &FrontendOptions,
260 toml: String,
261 request_memory_limiter: ServerMemoryLimiter,
262 ) -> Result<HttpServer> {
263 let builder = if let Some(builder) = self.http_server_builder.take() {
264 builder
265 } else {
266 self.http_server_builder(opts, request_memory_limiter)
267 };
268
269 let http_server = builder
270 .with_metrics_handler(MetricsHandler)
271 .with_plugins(self.plugins.clone())
272 .with_greptime_config_options(toml)
273 .build();
274 Ok(http_server)
275 }
276
277 pub fn build(mut self) -> Result<ServerHandlers> {
278 let opts = self.opts.clone();
279 let instance = self.instance.clone();
280
281 let toml = opts.to_toml().context(TomlFormatSnafu)?;
282 let opts: FrontendOptions = opts.into();
283
284 let handlers = ServerHandlers::default();
285
286 let user_provider = self.plugins.get::<UserProviderRef>();
287
288 {
289 let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
291 let grpc_server = self.build_grpc_server(
292 &opts.grpc,
293 &opts.meta_client,
294 None,
295 true,
296 self.server_memory_limiter.clone(),
297 )?;
298 handlers.insert((Box::new(grpc_server), grpc_addr));
299 }
300
301 if let Some(internal_grpc) = &opts.internal_grpc {
302 let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
304 let grpc_server = self.build_grpc_server(
305 internal_grpc,
306 &opts.meta_client,
307 Some("INTERNAL_GRPC_SERVER".to_string()),
308 false,
309 self.server_memory_limiter.clone(),
310 )?;
311 handlers.insert((Box::new(grpc_server), grpc_addr));
312 }
313
314 {
315 let http_options = &opts.http;
317 let http_addr = parse_addr(&http_options.addr)?;
318 let http_server =
319 self.build_http_server(&opts, toml, self.server_memory_limiter.clone())?;
320 handlers.insert((Box::new(http_server), http_addr));
321 }
322
323 if opts.mysql.enable {
324 let opts = &opts.mysql;
326 let mysql_addr = parse_addr(&opts.addr)?;
327
328 let tls_server_config = Arc::new(
329 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
330 );
331
332 maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
334
335 let mysql_server = MysqlServer::create_server(
336 common_runtime::global_runtime(),
337 Arc::new(MysqlSpawnRef::new(instance.clone(), user_provider.clone())),
338 Arc::new(MysqlSpawnConfig::new(
339 opts.tls.should_force_tls(),
340 tls_server_config,
341 opts.keep_alive.as_secs(),
342 opts.reject_no_database.unwrap_or(false),
343 opts.prepared_stmt_cache_size,
344 )),
345 Some(instance.process_manager().clone()),
346 );
347 handlers.insert((mysql_server, mysql_addr));
348 }
349
350 if opts.postgres.enable {
351 let opts = &opts.postgres;
353 let pg_addr = parse_addr(&opts.addr)?;
354
355 let tls_server_config = Arc::new(
356 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
357 );
358
359 maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
360
361 let pg_server = Box::new(PostgresServer::new(
362 instance.clone(),
363 opts.tls.should_force_tls(),
364 tls_server_config,
365 opts.keep_alive.as_secs(),
366 common_runtime::global_runtime(),
367 user_provider.clone(),
368 Some(self.instance.process_manager().clone()),
369 )) as Box<dyn Server>;
370
371 handlers.insert((pg_server, pg_addr));
372 }
373
374 Ok(handlers)
375 }
376}
377
378fn parse_addr(addr: &str) -> Result<SocketAddr> {
379 addr.parse().context(error::ParseAddrSnafu { addr })
380}