frontend/
server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use common_base::Plugins;
20use common_config::Configurable;
21use servers::error::Error as ServerError;
22use servers::grpc::builder::GrpcServerBuilder;
23use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
24use servers::grpc::greptime_handler::GreptimeRequestHandler;
25use servers::grpc::{GrpcOptions, GrpcServer};
26use servers::http::event::LogValidatorRef;
27use servers::http::{HttpServer, HttpServerBuilder};
28use servers::interceptor::LogIngestInterceptorRef;
29use servers::metrics_handler::MetricsHandler;
30use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
31use servers::otel_arrow::OtelArrowServiceHandler;
32use servers::postgres::PostgresServer;
33use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
34use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
35use servers::server::{Server, ServerHandlers};
36use servers::tls::{maybe_watch_tls_config, ReloadableTlsServerConfig};
37use snafu::ResultExt;
38
39use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
40use crate::frontend::FrontendOptions;
41use crate::instance::Instance;
42
43pub struct Services<T>
44where
45    T: Into<FrontendOptions> + Configurable + Clone,
46{
47    opts: T,
48    instance: Arc<Instance>,
49    grpc_server_builder: Option<GrpcServerBuilder>,
50    http_server_builder: Option<HttpServerBuilder>,
51    plugins: Plugins,
52}
53
54impl<T> Services<T>
55where
56    T: Into<FrontendOptions> + Configurable + Clone,
57{
58    pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
59        Self {
60            opts,
61            instance,
62            grpc_server_builder: None,
63            http_server_builder: None,
64            plugins,
65        }
66    }
67
68    pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
69        let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
70            .with_tls_config(opts.tls.clone())
71            .context(error::InvalidTlsConfigSnafu)?;
72        Ok(builder)
73    }
74
75    pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
76        let mut builder = HttpServerBuilder::new(opts.http.clone())
77            .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
78
79        let validator = self.plugins.get::<LogValidatorRef>();
80        let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
81        builder =
82            builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
83        builder = builder.with_logs_handler(self.instance.clone());
84
85        if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
86            builder = builder.with_user_provider(user_provider);
87        }
88
89        if opts.opentsdb.enable {
90            builder = builder.with_opentsdb_handler(self.instance.clone());
91        }
92
93        if opts.influxdb.enable {
94            builder = builder.with_influxdb_handler(self.instance.clone());
95        }
96
97        if opts.prom_store.enable {
98            builder = builder
99                .with_prom_handler(
100                    self.instance.clone(),
101                    Some(self.instance.clone()),
102                    opts.prom_store.with_metric_engine,
103                    opts.http.prom_validation_mode,
104                )
105                .with_prometheus_handler(self.instance.clone());
106        }
107
108        if opts.otlp.enable {
109            builder = builder.with_otlp_handler(self.instance.clone());
110        }
111
112        if opts.jaeger.enable {
113            builder = builder.with_jaeger_handler(self.instance.clone());
114        }
115
116        builder
117    }
118
119    pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
120        Self {
121            grpc_server_builder: Some(builder),
122            ..self
123        }
124    }
125
126    pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
127        Self {
128            http_server_builder: Some(builder),
129            ..self
130        }
131    }
132
133    fn build_grpc_server(&mut self, opts: &FrontendOptions) -> Result<GrpcServer> {
134        let builder = if let Some(builder) = self.grpc_server_builder.take() {
135            builder
136        } else {
137            self.grpc_server_builder(&opts.grpc)?
138        };
139
140        let user_provider = self.plugins.get::<UserProviderRef>();
141
142        // Determine whether it is Standalone or Distributed mode based on whether the meta client is configured.
143        let runtime = if opts.meta_client.is_none() {
144            Some(builder.runtime().clone())
145        } else {
146            None
147        };
148
149        let greptime_request_handler = GreptimeRequestHandler::new(
150            ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
151            user_provider.clone(),
152            runtime,
153            opts.grpc.flight_compression,
154        );
155
156        let frontend_grpc_handler =
157            FrontendGrpcHandler::new(self.instance.process_manager().clone());
158        let grpc_server = builder
159            .database_handler(greptime_request_handler.clone())
160            .prometheus_handler(self.instance.clone(), user_provider.clone())
161            .otel_arrow_handler(OtelArrowServiceHandler(self.instance.clone()))
162            .flight_handler(Arc::new(greptime_request_handler))
163            .frontend_grpc_handler(frontend_grpc_handler)
164            .build();
165        Ok(grpc_server)
166    }
167
168    fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result<HttpServer> {
169        let builder = if let Some(builder) = self.http_server_builder.take() {
170            builder
171        } else {
172            self.http_server_builder(opts)
173        };
174
175        let http_server = builder
176            .with_metrics_handler(MetricsHandler)
177            .with_plugins(self.plugins.clone())
178            .with_greptime_config_options(toml)
179            .build();
180        Ok(http_server)
181    }
182
183    pub fn build(mut self) -> Result<ServerHandlers> {
184        let opts = self.opts.clone();
185        let instance = self.instance.clone();
186
187        let toml = opts.to_toml().context(TomlFormatSnafu)?;
188        let opts: FrontendOptions = opts.into();
189
190        let handlers = ServerHandlers::default();
191
192        let user_provider = self.plugins.get::<UserProviderRef>();
193
194        {
195            // Always init GRPC server
196            let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
197            let grpc_server = self.build_grpc_server(&opts)?;
198            handlers.insert((Box::new(grpc_server), grpc_addr));
199        }
200
201        {
202            // Always init HTTP server
203            let http_options = &opts.http;
204            let http_addr = parse_addr(&http_options.addr)?;
205            let http_server = self.build_http_server(&opts, toml)?;
206            handlers.insert((Box::new(http_server), http_addr));
207        }
208
209        if opts.mysql.enable {
210            // Init MySQL server
211            let opts = &opts.mysql;
212            let mysql_addr = parse_addr(&opts.addr)?;
213
214            let tls_server_config = Arc::new(
215                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
216            );
217
218            // will not watch if watch is disabled in tls option
219            maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
220
221            let mysql_server = MysqlServer::create_server(
222                common_runtime::global_runtime(),
223                Arc::new(MysqlSpawnRef::new(
224                    ServerSqlQueryHandlerAdapter::arc(instance.clone()),
225                    user_provider.clone(),
226                )),
227                Arc::new(MysqlSpawnConfig::new(
228                    opts.tls.should_force_tls(),
229                    tls_server_config,
230                    opts.keep_alive.as_secs(),
231                    opts.reject_no_database.unwrap_or(false),
232                )),
233                Some(instance.process_manager().clone()),
234            );
235            handlers.insert((mysql_server, mysql_addr));
236        }
237
238        if opts.postgres.enable {
239            // Init PosgresSQL Server
240            let opts = &opts.postgres;
241            let pg_addr = parse_addr(&opts.addr)?;
242
243            let tls_server_config = Arc::new(
244                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
245            );
246
247            maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
248
249            let pg_server = Box::new(PostgresServer::new(
250                ServerSqlQueryHandlerAdapter::arc(instance.clone()),
251                opts.tls.should_force_tls(),
252                tls_server_config,
253                opts.keep_alive.as_secs(),
254                common_runtime::global_runtime(),
255                user_provider.clone(),
256                Some(self.instance.process_manager().clone()),
257            )) as Box<dyn Server>;
258
259            handlers.insert((pg_server, pg_addr));
260        }
261
262        Ok(handlers)
263    }
264}
265
266fn parse_addr(addr: &str) -> Result<SocketAddr> {
267    addr.parse().context(error::ParseAddrSnafu { addr })
268}