frontend/
server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use axum::extract::{Request, State};
20use axum::middleware::Next;
21use axum::response::IntoResponse;
22use common_base::Plugins;
23use common_config::Configurable;
24use common_telemetry::info;
25use meta_client::MetaClientOptions;
26use servers::error::Error as ServerError;
27use servers::grpc::builder::GrpcServerBuilder;
28use servers::grpc::flight::FlightCraftRef;
29use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
30use servers::grpc::greptime_handler::GreptimeRequestHandler;
31use servers::grpc::{GrpcOptions, GrpcServer};
32use servers::http::event::LogValidatorRef;
33use servers::http::result::error_result::ErrorResponse;
34use servers::http::utils::router::RouterConfigurator;
35use servers::http::{HttpServer, HttpServerBuilder};
36use servers::interceptor::LogIngestInterceptorRef;
37use servers::metrics_handler::MetricsHandler;
38use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
39use servers::otel_arrow::OtelArrowServiceHandler;
40use servers::postgres::PostgresServer;
41use servers::request_memory_limiter::ServerMemoryLimiter;
42use servers::server::{Server, ServerHandlers};
43use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
44use snafu::ResultExt;
45use tonic::Status;
46
47use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
48use crate::frontend::FrontendOptions;
49use crate::instance::Instance;
50
51pub struct Services<T>
52where
53    T: Into<FrontendOptions> + Configurable + Clone,
54{
55    opts: T,
56    instance: Arc<Instance>,
57    grpc_server_builder: Option<GrpcServerBuilder>,
58    http_server_builder: Option<HttpServerBuilder>,
59    plugins: Plugins,
60    flight_handler: Option<FlightCraftRef>,
61    pub server_memory_limiter: ServerMemoryLimiter,
62}
63
64impl<T> Services<T>
65where
66    T: Into<FrontendOptions> + Configurable + Clone,
67{
68    pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
69        let feopts = opts.clone().into();
70        // Create server request memory limiter for all server protocols
71        let server_memory_limiter = ServerMemoryLimiter::new(
72            feopts.max_in_flight_write_bytes.as_bytes(),
73            feopts.write_bytes_exhausted_policy,
74        );
75
76        Self {
77            opts,
78            instance,
79            grpc_server_builder: None,
80            http_server_builder: None,
81            plugins,
82            flight_handler: None,
83            server_memory_limiter,
84        }
85    }
86
87    pub fn grpc_server_builder(
88        &self,
89        opts: &GrpcOptions,
90        request_memory_limiter: ServerMemoryLimiter,
91    ) -> Result<GrpcServerBuilder> {
92        let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
93            .with_memory_limiter(request_memory_limiter)
94            .with_tls_config(opts.tls.clone())
95            .context(error::InvalidTlsConfigSnafu)?;
96        Ok(builder)
97    }
98
99    pub fn http_server_builder(
100        &self,
101        opts: &FrontendOptions,
102        request_memory_limiter: ServerMemoryLimiter,
103    ) -> HttpServerBuilder {
104        let mut builder = HttpServerBuilder::new(opts.http.clone())
105            .with_memory_limiter(request_memory_limiter)
106            .with_sql_handler(self.instance.clone());
107
108        let validator = self.plugins.get::<LogValidatorRef>();
109        let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
110        builder =
111            builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
112        builder = builder.with_logs_handler(self.instance.clone());
113
114        if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
115            builder = builder.with_user_provider(user_provider);
116        }
117
118        if opts.opentsdb.enable {
119            builder = builder.with_opentsdb_handler(self.instance.clone());
120        }
121
122        if opts.influxdb.enable {
123            builder = builder.with_influxdb_handler(self.instance.clone());
124        }
125
126        if opts.prom_store.enable {
127            builder = builder
128                .with_prom_handler(
129                    self.instance.clone(),
130                    Some(self.instance.clone()),
131                    opts.prom_store.with_metric_engine,
132                    opts.http.prom_validation_mode,
133                )
134                .with_prometheus_handler(self.instance.clone());
135        }
136
137        if opts.otlp.enable {
138            builder = builder
139                .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
140        }
141
142        if opts.jaeger.enable {
143            builder = builder.with_jaeger_handler(self.instance.clone());
144        }
145
146        if let Some(configurator) = self.plugins.get::<RouterConfigurator>() {
147            info!("Adding extra router from plugins");
148            builder = builder.with_extra_router(configurator.router());
149        }
150
151        builder.add_layer(axum::middleware::from_fn_with_state(
152            self.instance.clone(),
153            async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
154                if state.is_suspended() {
155                    return ErrorResponse::from_error(servers::error::SuspendedSnafu.build())
156                        .into_response();
157                }
158                next.run(request).await
159            },
160        ))
161    }
162
163    pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
164        Self {
165            grpc_server_builder: Some(builder),
166            ..self
167        }
168    }
169
170    pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
171        Self {
172            http_server_builder: Some(builder),
173            ..self
174        }
175    }
176
177    pub fn with_flight_handler(self, flight_handler: FlightCraftRef) -> Self {
178        Self {
179            flight_handler: Some(flight_handler),
180            ..self
181        }
182    }
183
184    fn build_grpc_server(
185        &mut self,
186        grpc: &GrpcOptions,
187        meta_client: &Option<MetaClientOptions>,
188        name: Option<String>,
189        external: bool,
190        request_memory_limiter: ServerMemoryLimiter,
191    ) -> Result<GrpcServer> {
192        let builder = if let Some(builder) = self.grpc_server_builder.take() {
193            builder
194        } else {
195            self.grpc_server_builder(grpc, request_memory_limiter)?
196        };
197
198        let user_provider = if external {
199            self.plugins.get::<UserProviderRef>()
200        } else {
201            // skip authentication for internal grpc port
202            None
203        };
204
205        // Determine whether it is Standalone or Distributed mode based on whether the meta client is configured.
206        let runtime = if meta_client.is_none() {
207            Some(builder.runtime().clone())
208        } else {
209            None
210        };
211
212        let greptime_request_handler = GreptimeRequestHandler::new(
213            self.instance.clone(),
214            user_provider.clone(),
215            runtime,
216            grpc.flight_compression,
217        );
218
219        // Use custom flight handler if provided, otherwise use the default GreptimeRequestHandler
220        let flight_handler = self
221            .flight_handler
222            .clone()
223            .unwrap_or_else(|| Arc::new(greptime_request_handler.clone()) as FlightCraftRef);
224
225        let grpc_server = builder
226            .name(name)
227            .database_handler(greptime_request_handler.clone())
228            .prometheus_handler(self.instance.clone(), user_provider.clone())
229            .otel_arrow_handler(OtelArrowServiceHandler::new(
230                self.instance.clone(),
231                user_provider.clone(),
232            ))
233            .flight_handler(flight_handler)
234            .add_layer(axum::middleware::from_fn_with_state(
235                self.instance.clone(),
236                async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
237                    if state.is_suspended() {
238                        let status = Status::from(servers::error::SuspendedSnafu.build());
239                        return status.into_http();
240                    }
241                    next.run(request).await
242                },
243            ));
244
245        let grpc_server = if !external {
246            let frontend_grpc_handler =
247                FrontendGrpcHandler::new(self.instance.process_manager().clone());
248            grpc_server.frontend_grpc_handler(frontend_grpc_handler)
249        } else {
250            grpc_server
251        }
252        .build();
253
254        Ok(grpc_server)
255    }
256
257    fn build_http_server(
258        &mut self,
259        opts: &FrontendOptions,
260        toml: String,
261        request_memory_limiter: ServerMemoryLimiter,
262    ) -> Result<HttpServer> {
263        let builder = if let Some(builder) = self.http_server_builder.take() {
264            builder
265        } else {
266            self.http_server_builder(opts, request_memory_limiter)
267        };
268
269        let http_server = builder
270            .with_metrics_handler(MetricsHandler)
271            .with_plugins(self.plugins.clone())
272            .with_greptime_config_options(toml)
273            .build();
274        Ok(http_server)
275    }
276
277    pub fn build(mut self) -> Result<ServerHandlers> {
278        let opts = self.opts.clone();
279        let instance = self.instance.clone();
280
281        let toml = opts.to_toml().context(TomlFormatSnafu)?;
282        let opts: FrontendOptions = opts.into();
283
284        let handlers = ServerHandlers::default();
285
286        let user_provider = self.plugins.get::<UserProviderRef>();
287
288        {
289            // Always init GRPC server
290            let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
291            let grpc_server = self.build_grpc_server(
292                &opts.grpc,
293                &opts.meta_client,
294                None,
295                true,
296                self.server_memory_limiter.clone(),
297            )?;
298            handlers.insert((Box::new(grpc_server), grpc_addr));
299        }
300
301        if let Some(internal_grpc) = &opts.internal_grpc {
302            // Always init Internal GRPC server
303            let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
304            let grpc_server = self.build_grpc_server(
305                internal_grpc,
306                &opts.meta_client,
307                Some("INTERNAL_GRPC_SERVER".to_string()),
308                false,
309                self.server_memory_limiter.clone(),
310            )?;
311            handlers.insert((Box::new(grpc_server), grpc_addr));
312        }
313
314        {
315            // Always init HTTP server
316            let http_options = &opts.http;
317            let http_addr = parse_addr(&http_options.addr)?;
318            let http_server =
319                self.build_http_server(&opts, toml, self.server_memory_limiter.clone())?;
320            handlers.insert((Box::new(http_server), http_addr));
321        }
322
323        if opts.mysql.enable {
324            // Init MySQL server
325            let opts = &opts.mysql;
326            let mysql_addr = parse_addr(&opts.addr)?;
327
328            let tls_server_config = Arc::new(
329                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
330            );
331
332            // will not watch if watch is disabled in tls option
333            maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
334
335            let mysql_server = MysqlServer::create_server(
336                common_runtime::global_runtime(),
337                Arc::new(MysqlSpawnRef::new(instance.clone(), user_provider.clone())),
338                Arc::new(MysqlSpawnConfig::new(
339                    opts.tls.should_force_tls(),
340                    tls_server_config,
341                    opts.keep_alive.as_secs(),
342                    opts.reject_no_database.unwrap_or(false),
343                    opts.prepared_stmt_cache_size,
344                )),
345                Some(instance.process_manager().clone()),
346            );
347            handlers.insert((mysql_server, mysql_addr));
348        }
349
350        if opts.postgres.enable {
351            // Init PosgresSQL Server
352            let opts = &opts.postgres;
353            let pg_addr = parse_addr(&opts.addr)?;
354
355            let tls_server_config = Arc::new(
356                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
357            );
358
359            maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
360
361            let pg_server = Box::new(PostgresServer::new(
362                instance.clone(),
363                opts.tls.should_force_tls(),
364                tls_server_config,
365                opts.keep_alive.as_secs(),
366                common_runtime::global_runtime(),
367                user_provider.clone(),
368                Some(self.instance.process_manager().clone()),
369            )) as Box<dyn Server>;
370
371            handlers.insert((pg_server, pg_addr));
372        }
373
374        Ok(handlers)
375    }
376}
377
378fn parse_addr(addr: &str) -> Result<SocketAddr> {
379    addr.parse().context(error::ParseAddrSnafu { addr })
380}