frontend/
server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use common_base::Plugins;
20use common_config::{Configurable, Mode};
21use servers::error::Error as ServerError;
22use servers::grpc::builder::GrpcServerBuilder;
23use servers::grpc::greptime_handler::GreptimeRequestHandler;
24use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
25use servers::http::event::LogValidatorRef;
26use servers::http::{HttpServer, HttpServerBuilder};
27use servers::interceptor::LogIngestInterceptorRef;
28use servers::metrics_handler::MetricsHandler;
29use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
30use servers::otel_arrow::OtelArrowServiceHandler;
31use servers::postgres::PostgresServer;
32use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
33use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
34use servers::server::{Server, ServerHandlers};
35use servers::tls::{maybe_watch_tls_config, ReloadableTlsServerConfig};
36use snafu::ResultExt;
37
38use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
39use crate::frontend::FrontendOptions;
40use crate::instance::Instance;
41
42pub struct Services<T>
43where
44    T: Into<FrontendOptions> + Configurable + Clone,
45{
46    opts: T,
47    instance: Arc<Instance>,
48    grpc_server_builder: Option<GrpcServerBuilder>,
49    http_server_builder: Option<HttpServerBuilder>,
50    plugins: Plugins,
51}
52
53impl<T> Services<T>
54where
55    T: Into<FrontendOptions> + Configurable + Clone,
56{
57    pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
58        Self {
59            opts,
60            instance,
61            grpc_server_builder: None,
62            http_server_builder: None,
63            plugins,
64        }
65    }
66
67    pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
68        let grpc_config = GrpcServerConfig {
69            max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize,
70            max_send_message_size: opts.max_send_message_size.as_bytes() as usize,
71            tls: opts.tls.clone(),
72        };
73        let builder = GrpcServerBuilder::new(grpc_config, common_runtime::global_runtime())
74            .with_tls_config(opts.tls.clone())
75            .context(error::InvalidTlsConfigSnafu)?;
76        Ok(builder)
77    }
78
79    pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
80        let mut builder = HttpServerBuilder::new(opts.http.clone())
81            .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
82
83        let validator = self.plugins.get::<LogValidatorRef>();
84        let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
85        builder =
86            builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
87        builder = builder.with_logs_handler(self.instance.clone());
88
89        if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
90            builder = builder.with_user_provider(user_provider);
91        }
92
93        if opts.opentsdb.enable {
94            builder = builder.with_opentsdb_handler(self.instance.clone());
95        }
96
97        if opts.influxdb.enable {
98            builder = builder.with_influxdb_handler(self.instance.clone());
99        }
100
101        if opts.prom_store.enable {
102            builder = builder
103                .with_prom_handler(
104                    self.instance.clone(),
105                    opts.prom_store.with_metric_engine,
106                    opts.http.is_strict_mode,
107                )
108                .with_prometheus_handler(self.instance.clone());
109        }
110
111        if opts.otlp.enable {
112            builder = builder.with_otlp_handler(self.instance.clone());
113        }
114
115        if opts.jaeger.enable {
116            builder = builder.with_jaeger_handler(self.instance.clone());
117        }
118
119        builder
120    }
121
122    pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
123        Self {
124            grpc_server_builder: Some(builder),
125            ..self
126        }
127    }
128
129    pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
130        Self {
131            http_server_builder: Some(builder),
132            ..self
133        }
134    }
135
136    fn build_grpc_server(&mut self, opts: &FrontendOptions) -> Result<GrpcServer> {
137        let builder = if let Some(builder) = self.grpc_server_builder.take() {
138            builder
139        } else {
140            self.grpc_server_builder(&opts.grpc)?
141        };
142
143        let user_provider = self.plugins.get::<UserProviderRef>();
144
145        // Determine whether it is Standalone or Distributed mode based on whether the meta client is configured.
146        let mode = if opts.meta_client.is_none() {
147            Mode::Standalone
148        } else {
149            Mode::Distributed
150        };
151
152        let runtime = match mode {
153            Mode::Standalone => Some(builder.runtime().clone()),
154            _ => None,
155        };
156
157        let greptime_request_handler = GreptimeRequestHandler::new(
158            ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
159            user_provider.clone(),
160            runtime,
161        );
162
163        let grpc_server = builder
164            .database_handler(greptime_request_handler.clone())
165            .prometheus_handler(self.instance.clone(), user_provider.clone())
166            .otel_arrow_handler(OtelArrowServiceHandler(self.instance.clone()))
167            .flight_handler(Arc::new(greptime_request_handler))
168            .build();
169        Ok(grpc_server)
170    }
171
172    fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result<HttpServer> {
173        let builder = if let Some(builder) = self.http_server_builder.take() {
174            builder
175        } else {
176            self.http_server_builder(opts)
177        };
178
179        let http_server = builder
180            .with_metrics_handler(MetricsHandler)
181            .with_plugins(self.plugins.clone())
182            .with_greptime_config_options(toml)
183            .build();
184        Ok(http_server)
185    }
186
187    pub async fn build(mut self) -> Result<ServerHandlers> {
188        let opts = self.opts.clone();
189        let instance = self.instance.clone();
190
191        let toml = opts.to_toml().context(TomlFormatSnafu)?;
192        let opts: FrontendOptions = opts.into();
193
194        let handlers = ServerHandlers::default();
195
196        let user_provider = self.plugins.get::<UserProviderRef>();
197
198        {
199            // Always init GRPC server
200            let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
201            let grpc_server = self.build_grpc_server(&opts)?;
202            handlers.insert((Box::new(grpc_server), grpc_addr)).await;
203        }
204
205        {
206            // Always init HTTP server
207            let http_options = &opts.http;
208            let http_addr = parse_addr(&http_options.addr)?;
209            let http_server = self.build_http_server(&opts, toml)?;
210            handlers.insert((Box::new(http_server), http_addr)).await;
211        }
212
213        if opts.mysql.enable {
214            // Init MySQL server
215            let opts = &opts.mysql;
216            let mysql_addr = parse_addr(&opts.addr)?;
217
218            let tls_server_config = Arc::new(
219                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
220            );
221
222            // will not watch if watch is disabled in tls option
223            maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
224
225            let mysql_server = MysqlServer::create_server(
226                common_runtime::global_runtime(),
227                Arc::new(MysqlSpawnRef::new(
228                    ServerSqlQueryHandlerAdapter::arc(instance.clone()),
229                    user_provider.clone(),
230                )),
231                Arc::new(MysqlSpawnConfig::new(
232                    opts.tls.should_force_tls(),
233                    tls_server_config,
234                    opts.keep_alive.as_secs(),
235                    opts.reject_no_database.unwrap_or(false),
236                )),
237            );
238            handlers.insert((mysql_server, mysql_addr)).await;
239        }
240
241        if opts.postgres.enable {
242            // Init PosgresSQL Server
243            let opts = &opts.postgres;
244            let pg_addr = parse_addr(&opts.addr)?;
245
246            let tls_server_config = Arc::new(
247                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
248            );
249
250            maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
251
252            let pg_server = Box::new(PostgresServer::new(
253                ServerSqlQueryHandlerAdapter::arc(instance.clone()),
254                opts.tls.should_force_tls(),
255                tls_server_config,
256                opts.keep_alive.as_secs(),
257                common_runtime::global_runtime(),
258                user_provider.clone(),
259            )) as Box<dyn Server>;
260
261            handlers.insert((pg_server, pg_addr)).await;
262        }
263
264        Ok(handlers)
265    }
266}
267
268fn parse_addr(addr: &str) -> Result<SocketAddr> {
269    addr.parse().context(error::ParseAddrSnafu { addr })
270}