1use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use axum::extract::{Request, State};
20use axum::middleware::Next;
21use axum::response::IntoResponse;
22use common_base::Plugins;
23use common_config::Configurable;
24use common_telemetry::info;
25use meta_client::MetaClientOptions;
26use servers::error::Error as ServerError;
27use servers::grpc::builder::GrpcServerBuilder;
28use servers::grpc::flight::FlightCraftRef;
29use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
30use servers::grpc::greptime_handler::GreptimeRequestHandler;
31use servers::grpc::{GrpcOptions, GrpcServer};
32use servers::http::event::LogValidatorRef;
33use servers::http::result::error_result::ErrorResponse;
34use servers::http::utils::router::RouterConfigurator;
35use servers::http::{HttpServer, HttpServerBuilder};
36use servers::interceptor::LogIngestInterceptorRef;
37use servers::metrics_handler::MetricsHandler;
38use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
39use servers::otel_arrow::OtelArrowServiceHandler;
40use servers::pending_rows_batcher::PendingRowsBatcher;
41use servers::postgres::PostgresServer;
42use servers::request_memory_limiter::ServerMemoryLimiter;
43use servers::server::{Server, ServerHandlers};
44use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
45use snafu::ResultExt;
46use tonic::Status;
47
48use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
49use crate::frontend::FrontendOptions;
50use crate::instance::Instance;
51
52pub struct Services<T>
53where
54 T: Into<FrontendOptions> + Configurable + Clone,
55{
56 opts: T,
57 instance: Arc<Instance>,
58 grpc_server_builder: Option<GrpcServerBuilder>,
59 http_server_builder: Option<HttpServerBuilder>,
60 plugins: Plugins,
61 flight_handler: Option<FlightCraftRef>,
62 pub server_memory_limiter: ServerMemoryLimiter,
63}
64
65impl<T> Services<T>
66where
67 T: Into<FrontendOptions> + Configurable + Clone,
68{
69 pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
70 let feopts = opts.clone().into();
71 let server_memory_limiter = ServerMemoryLimiter::new(
73 feopts.max_in_flight_write_bytes.as_bytes(),
74 feopts.write_bytes_exhausted_policy,
75 );
76
77 Self {
78 opts,
79 instance,
80 grpc_server_builder: None,
81 http_server_builder: None,
82 plugins,
83 flight_handler: None,
84 server_memory_limiter,
85 }
86 }
87
88 pub fn grpc_server_builder(
89 &self,
90 opts: &GrpcOptions,
91 request_memory_limiter: ServerMemoryLimiter,
92 ) -> Result<GrpcServerBuilder> {
93 let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
94 .with_memory_limiter(request_memory_limiter)
95 .with_tls_config(opts.tls.clone())
96 .context(error::InvalidTlsConfigSnafu)?;
97 Ok(builder)
98 }
99
100 pub fn http_server_builder(
101 &self,
102 opts: &FrontendOptions,
103 request_memory_limiter: ServerMemoryLimiter,
104 ) -> HttpServerBuilder {
105 let mut builder = HttpServerBuilder::new(opts.http.clone())
106 .with_memory_limiter(request_memory_limiter)
107 .with_sql_handler(self.instance.clone());
108
109 let validator = self.plugins.get::<LogValidatorRef>();
110 let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
111 builder =
112 builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
113 builder = builder.with_logs_handler(self.instance.clone());
114
115 if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
116 builder = builder.with_user_provider(user_provider);
117 }
118
119 if opts.opentsdb.enable {
120 builder = builder.with_opentsdb_handler(self.instance.clone());
121 }
122
123 if opts.influxdb.enable {
124 builder = builder.with_influxdb_handler(self.instance.clone());
125 }
126
127 if opts.prom_store.enable {
128 let pending_rows_batcher = if opts.prom_store.with_metric_engine {
129 PendingRowsBatcher::try_new(
130 self.instance.partition_manager().clone(),
131 self.instance.node_manager().clone(),
132 self.instance.catalog_manager().clone(),
133 opts.prom_store.with_metric_engine,
134 self.instance.clone(),
135 opts.prom_store.pending_rows_flush_interval,
136 opts.prom_store.max_batch_rows,
137 opts.prom_store.max_concurrent_flushes,
138 opts.prom_store.worker_channel_capacity,
139 opts.prom_store.max_inflight_requests,
140 )
141 } else {
142 None
143 };
144 builder = builder
145 .with_prom_handler(
146 self.instance.clone(),
147 Some(self.instance.clone()),
148 opts.prom_store.with_metric_engine,
149 opts.http.prom_validation_mode,
150 pending_rows_batcher,
151 )
152 .with_prometheus_handler(self.instance.clone());
153 }
154
155 if opts.otlp.enable {
156 builder = builder
157 .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
158 }
159
160 if opts.jaeger.enable {
161 builder = builder.with_jaeger_handler(self.instance.clone());
162 }
163
164 builder = builder.with_dashboard_handler(self.instance.clone());
165
166 if let Some(configurator) = self.plugins.get::<RouterConfigurator>() {
167 info!("Adding extra router from plugins");
168 builder = builder.with_extra_router(configurator.router());
169 }
170
171 builder.add_layer(axum::middleware::from_fn_with_state(
172 self.instance.clone(),
173 async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
174 if state.is_suspended() {
175 return ErrorResponse::from_error(servers::error::SuspendedSnafu.build())
176 .into_response();
177 }
178 next.run(request).await
179 },
180 ))
181 }
182
183 pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
184 Self {
185 grpc_server_builder: Some(builder),
186 ..self
187 }
188 }
189
190 pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
191 Self {
192 http_server_builder: Some(builder),
193 ..self
194 }
195 }
196
197 pub fn with_flight_handler(self, flight_handler: FlightCraftRef) -> Self {
198 Self {
199 flight_handler: Some(flight_handler),
200 ..self
201 }
202 }
203
204 fn build_grpc_server(
205 &mut self,
206 grpc: &GrpcOptions,
207 meta_client: &Option<MetaClientOptions>,
208 name: Option<String>,
209 external: bool,
210 request_memory_limiter: ServerMemoryLimiter,
211 ) -> Result<GrpcServer> {
212 let builder = if let Some(builder) = self.grpc_server_builder.take() {
213 builder
214 } else {
215 self.grpc_server_builder(grpc, request_memory_limiter)?
216 };
217
218 let user_provider = if external {
219 self.plugins.get::<UserProviderRef>()
220 } else {
221 None
223 };
224
225 let runtime = if meta_client.is_none() {
227 Some(builder.runtime().clone())
228 } else {
229 None
230 };
231
232 let greptime_request_handler = GreptimeRequestHandler::new(
233 self.instance.clone(),
234 user_provider.clone(),
235 runtime,
236 grpc.flight_compression,
237 );
238
239 let flight_handler = self
241 .flight_handler
242 .clone()
243 .unwrap_or_else(|| Arc::new(greptime_request_handler.clone()) as FlightCraftRef);
244
245 let grpc_server = builder
246 .name(name)
247 .database_handler(greptime_request_handler.clone())
248 .prometheus_handler(self.instance.clone(), user_provider.clone())
249 .otel_arrow_handler(OtelArrowServiceHandler::new(
250 self.instance.clone(),
251 user_provider.clone(),
252 ))
253 .flight_handler(flight_handler)
254 .add_layer(axum::middleware::from_fn_with_state(
255 self.instance.clone(),
256 async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
257 if state.is_suspended() {
258 let status = Status::from(servers::error::SuspendedSnafu.build());
259 return status.into_http();
260 }
261 next.run(request).await
262 },
263 ));
264
265 let grpc_server = if !external {
266 let frontend_grpc_handler =
267 FrontendGrpcHandler::new(self.instance.process_manager().clone());
268 grpc_server.frontend_grpc_handler(frontend_grpc_handler)
269 } else {
270 grpc_server
271 }
272 .build();
273
274 Ok(grpc_server)
275 }
276
277 fn build_http_server(
278 &mut self,
279 opts: &FrontendOptions,
280 toml: String,
281 request_memory_limiter: ServerMemoryLimiter,
282 ) -> Result<HttpServer> {
283 let builder = if let Some(builder) = self.http_server_builder.take() {
284 builder
285 } else {
286 self.http_server_builder(opts, request_memory_limiter)
287 };
288
289 let http_server = builder
290 .with_metrics_handler(MetricsHandler)
291 .with_plugins(self.plugins.clone())
292 .with_greptime_config_options(toml)
293 .build();
294 Ok(http_server)
295 }
296
297 pub fn build(mut self) -> Result<ServerHandlers> {
298 let opts = self.opts.clone();
299 let instance = self.instance.clone();
300
301 let toml = opts.to_toml().context(TomlFormatSnafu)?;
302 let opts: FrontendOptions = opts.into();
303
304 let handlers = ServerHandlers::default();
305
306 let user_provider = self.plugins.get::<UserProviderRef>();
307
308 {
309 let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
311 let grpc_server = self.build_grpc_server(
312 &opts.grpc,
313 &opts.meta_client,
314 None,
315 true,
316 self.server_memory_limiter.clone(),
317 )?;
318 handlers.insert((Box::new(grpc_server), grpc_addr));
319 }
320
321 if let Some(internal_grpc) = &opts.internal_grpc {
322 let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
324 let grpc_server = self.build_grpc_server(
325 internal_grpc,
326 &opts.meta_client,
327 Some("INTERNAL_GRPC_SERVER".to_string()),
328 false,
329 self.server_memory_limiter.clone(),
330 )?;
331 handlers.insert((Box::new(grpc_server), grpc_addr));
332 }
333
334 {
335 let http_options = &opts.http;
337 let http_addr = parse_addr(&http_options.addr)?;
338 let http_server =
339 self.build_http_server(&opts, toml, self.server_memory_limiter.clone())?;
340 handlers.insert((Box::new(http_server), http_addr));
341 }
342
343 if opts.mysql.enable {
344 let opts = &opts.mysql;
346 let mysql_addr = parse_addr(&opts.addr)?;
347
348 let tls_server_config = Arc::new(
349 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
350 );
351
352 maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
354
355 let mysql_server = MysqlServer::create_server(
356 common_runtime::global_runtime(),
357 Arc::new(MysqlSpawnRef::new(instance.clone(), user_provider.clone())),
358 Arc::new(MysqlSpawnConfig::new(
359 opts.tls.should_force_tls(),
360 tls_server_config,
361 opts.keep_alive.as_secs(),
362 opts.reject_no_database.unwrap_or(false),
363 opts.prepared_stmt_cache_size,
364 )),
365 Some(instance.process_manager().clone()),
366 );
367 handlers.insert((mysql_server, mysql_addr));
368 }
369
370 if opts.postgres.enable {
371 let opts = &opts.postgres;
373 let pg_addr = parse_addr(&opts.addr)?;
374
375 let tls_server_config = Arc::new(
376 ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
377 );
378
379 maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
380
381 let pg_server = Box::new(PostgresServer::new(
382 instance.clone(),
383 opts.tls.should_force_tls(),
384 tls_server_config,
385 opts.keep_alive.as_secs(),
386 common_runtime::global_runtime(),
387 user_provider.clone(),
388 Some(self.instance.process_manager().clone()),
389 )) as Box<dyn Server>;
390
391 handlers.insert((pg_server, pg_addr));
392 }
393
394 Ok(handlers)
395 }
396}
397
398fn parse_addr(addr: &str) -> Result<SocketAddr> {
399 addr.parse().context(error::ParseAddrSnafu { addr })
400}