frontend/
server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use common_base::Plugins;
20use common_config::Configurable;
21use meta_client::MetaClientOptions;
22use servers::error::Error as ServerError;
23use servers::grpc::builder::GrpcServerBuilder;
24use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
25use servers::grpc::greptime_handler::GreptimeRequestHandler;
26use servers::grpc::{GrpcOptions, GrpcServer};
27use servers::http::event::LogValidatorRef;
28use servers::http::{HttpServer, HttpServerBuilder};
29use servers::interceptor::LogIngestInterceptorRef;
30use servers::metrics_handler::MetricsHandler;
31use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
32use servers::otel_arrow::OtelArrowServiceHandler;
33use servers::postgres::PostgresServer;
34use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
35use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
36use servers::server::{Server, ServerHandlers};
37use servers::tls::{maybe_watch_tls_config, ReloadableTlsServerConfig};
38use snafu::ResultExt;
39
40use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
41use crate::frontend::FrontendOptions;
42use crate::instance::Instance;
43
44pub struct Services<T>
45where
46    T: Into<FrontendOptions> + Configurable + Clone,
47{
48    opts: T,
49    instance: Arc<Instance>,
50    grpc_server_builder: Option<GrpcServerBuilder>,
51    http_server_builder: Option<HttpServerBuilder>,
52    plugins: Plugins,
53}
54
55impl<T> Services<T>
56where
57    T: Into<FrontendOptions> + Configurable + Clone,
58{
59    pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
60        Self {
61            opts,
62            instance,
63            grpc_server_builder: None,
64            http_server_builder: None,
65            plugins,
66        }
67    }
68
69    pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
70        let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
71            .with_tls_config(opts.tls.clone())
72            .context(error::InvalidTlsConfigSnafu)?;
73        Ok(builder)
74    }
75
76    pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
77        let mut builder = HttpServerBuilder::new(opts.http.clone())
78            .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
79
80        let validator = self.plugins.get::<LogValidatorRef>();
81        let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
82        builder =
83            builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
84        builder = builder.with_logs_handler(self.instance.clone());
85
86        if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
87            builder = builder.with_user_provider(user_provider);
88        }
89
90        if opts.opentsdb.enable {
91            builder = builder.with_opentsdb_handler(self.instance.clone());
92        }
93
94        if opts.influxdb.enable {
95            builder = builder.with_influxdb_handler(self.instance.clone());
96        }
97
98        if opts.prom_store.enable {
99            builder = builder
100                .with_prom_handler(
101                    self.instance.clone(),
102                    Some(self.instance.clone()),
103                    opts.prom_store.with_metric_engine,
104                    opts.http.prom_validation_mode,
105                )
106                .with_prometheus_handler(self.instance.clone());
107        }
108
109        if opts.otlp.enable {
110            builder = builder
111                .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
112        }
113
114        if opts.jaeger.enable {
115            builder = builder.with_jaeger_handler(self.instance.clone());
116        }
117
118        builder
119    }
120
121    pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
122        Self {
123            grpc_server_builder: Some(builder),
124            ..self
125        }
126    }
127
128    pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
129        Self {
130            http_server_builder: Some(builder),
131            ..self
132        }
133    }
134
135    fn build_grpc_server(
136        &mut self,
137        grpc: &GrpcOptions,
138        meta_client: &Option<MetaClientOptions>,
139        name: Option<String>,
140        external: bool,
141    ) -> Result<GrpcServer> {
142        let builder = if let Some(builder) = self.grpc_server_builder.take() {
143            builder
144        } else {
145            self.grpc_server_builder(grpc)?
146        };
147
148        let user_provider = if external {
149            self.plugins.get::<UserProviderRef>()
150        } else {
151            // skip authentication for internal grpc port
152            None
153        };
154
155        // Determine whether it is Standalone or Distributed mode based on whether the meta client is configured.
156        let runtime = if meta_client.is_none() {
157            Some(builder.runtime().clone())
158        } else {
159            None
160        };
161
162        let greptime_request_handler = GreptimeRequestHandler::new(
163            ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
164            user_provider.clone(),
165            runtime,
166            grpc.flight_compression,
167        );
168
169        let grpc_server = builder
170            .name(name)
171            .database_handler(greptime_request_handler.clone())
172            .prometheus_handler(self.instance.clone(), user_provider.clone())
173            .otel_arrow_handler(OtelArrowServiceHandler::new(
174                self.instance.clone(),
175                user_provider.clone(),
176            ))
177            .flight_handler(Arc::new(greptime_request_handler));
178
179        let grpc_server = if !external {
180            let frontend_grpc_handler =
181                FrontendGrpcHandler::new(self.instance.process_manager().clone());
182            grpc_server.frontend_grpc_handler(frontend_grpc_handler)
183        } else {
184            grpc_server
185        }
186        .build();
187
188        Ok(grpc_server)
189    }
190
191    fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result<HttpServer> {
192        let builder = if let Some(builder) = self.http_server_builder.take() {
193            builder
194        } else {
195            self.http_server_builder(opts)
196        };
197
198        let http_server = builder
199            .with_metrics_handler(MetricsHandler)
200            .with_plugins(self.plugins.clone())
201            .with_greptime_config_options(toml)
202            .build();
203        Ok(http_server)
204    }
205
206    pub fn build(mut self) -> Result<ServerHandlers> {
207        let opts = self.opts.clone();
208        let instance = self.instance.clone();
209
210        let toml = opts.to_toml().context(TomlFormatSnafu)?;
211        let opts: FrontendOptions = opts.into();
212
213        let handlers = ServerHandlers::default();
214
215        let user_provider = self.plugins.get::<UserProviderRef>();
216
217        {
218            // Always init GRPC server
219            let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
220            let grpc_server = self.build_grpc_server(&opts.grpc, &opts.meta_client, None, true)?;
221            handlers.insert((Box::new(grpc_server), grpc_addr));
222        }
223
224        if let Some(internal_grpc) = &opts.internal_grpc {
225            // Always init Internal GRPC server
226            let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
227            let grpc_server = self.build_grpc_server(
228                internal_grpc,
229                &opts.meta_client,
230                Some("INTERNAL_GRPC_SERVER".to_string()),
231                false,
232            )?;
233            handlers.insert((Box::new(grpc_server), grpc_addr));
234        }
235
236        {
237            // Always init HTTP server
238            let http_options = &opts.http;
239            let http_addr = parse_addr(&http_options.addr)?;
240            let http_server = self.build_http_server(&opts, toml)?;
241            handlers.insert((Box::new(http_server), http_addr));
242        }
243
244        if opts.mysql.enable {
245            // Init MySQL server
246            let opts = &opts.mysql;
247            let mysql_addr = parse_addr(&opts.addr)?;
248
249            let tls_server_config = Arc::new(
250                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
251            );
252
253            // will not watch if watch is disabled in tls option
254            maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
255
256            let mysql_server = MysqlServer::create_server(
257                common_runtime::global_runtime(),
258                Arc::new(MysqlSpawnRef::new(
259                    ServerSqlQueryHandlerAdapter::arc(instance.clone()),
260                    user_provider.clone(),
261                )),
262                Arc::new(MysqlSpawnConfig::new(
263                    opts.tls.should_force_tls(),
264                    tls_server_config,
265                    opts.keep_alive.as_secs(),
266                    opts.reject_no_database.unwrap_or(false),
267                    opts.prepared_stmt_cache_size,
268                )),
269                Some(instance.process_manager().clone()),
270            );
271            handlers.insert((mysql_server, mysql_addr));
272        }
273
274        if opts.postgres.enable {
275            // Init PosgresSQL Server
276            let opts = &opts.postgres;
277            let pg_addr = parse_addr(&opts.addr)?;
278
279            let tls_server_config = Arc::new(
280                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
281            );
282
283            maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
284
285            let pg_server = Box::new(PostgresServer::new(
286                ServerSqlQueryHandlerAdapter::arc(instance.clone()),
287                opts.tls.should_force_tls(),
288                tls_server_config,
289                opts.keep_alive.as_secs(),
290                common_runtime::global_runtime(),
291                user_provider.clone(),
292                Some(self.instance.process_manager().clone()),
293            )) as Box<dyn Server>;
294
295            handlers.insert((pg_server, pg_addr));
296        }
297
298        Ok(handlers)
299    }
300}
301
302fn parse_addr(addr: &str) -> Result<SocketAddr> {
303    addr.parse().context(error::ParseAddrSnafu { addr })
304}