frontend/
server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use common_base::Plugins;
20use common_config::Configurable;
21use common_telemetry::info;
22use meta_client::MetaClientOptions;
23use servers::error::Error as ServerError;
24use servers::grpc::builder::GrpcServerBuilder;
25use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
26use servers::grpc::greptime_handler::GreptimeRequestHandler;
27use servers::grpc::{GrpcOptions, GrpcServer};
28use servers::http::event::LogValidatorRef;
29use servers::http::utils::router::RouterConfigurator;
30use servers::http::{HttpServer, HttpServerBuilder};
31use servers::interceptor::LogIngestInterceptorRef;
32use servers::metrics_handler::MetricsHandler;
33use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
34use servers::otel_arrow::OtelArrowServiceHandler;
35use servers::postgres::PostgresServer;
36use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
37use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
38use servers::server::{Server, ServerHandlers};
39use servers::tls::{ReloadableTlsServerConfig, maybe_watch_tls_config};
40use snafu::ResultExt;
41
42use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
43use crate::frontend::FrontendOptions;
44use crate::instance::Instance;
45
46pub struct Services<T>
47where
48    T: Into<FrontendOptions> + Configurable + Clone,
49{
50    opts: T,
51    instance: Arc<Instance>,
52    grpc_server_builder: Option<GrpcServerBuilder>,
53    http_server_builder: Option<HttpServerBuilder>,
54    plugins: Plugins,
55}
56
57impl<T> Services<T>
58where
59    T: Into<FrontendOptions> + Configurable + Clone,
60{
61    pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
62        Self {
63            opts,
64            instance,
65            grpc_server_builder: None,
66            http_server_builder: None,
67            plugins,
68        }
69    }
70
71    fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
72        let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
73            .with_tls_config(opts.tls.clone())
74            .context(error::InvalidTlsConfigSnafu)?;
75        Ok(builder)
76    }
77
78    pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
79        let mut builder = HttpServerBuilder::new(opts.http.clone())
80            .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
81
82        let validator = self.plugins.get::<LogValidatorRef>();
83        let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
84        builder =
85            builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
86        builder = builder.with_logs_handler(self.instance.clone());
87
88        if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
89            builder = builder.with_user_provider(user_provider);
90        }
91
92        if opts.opentsdb.enable {
93            builder = builder.with_opentsdb_handler(self.instance.clone());
94        }
95
96        if opts.influxdb.enable {
97            builder = builder.with_influxdb_handler(self.instance.clone());
98        }
99
100        if opts.prom_store.enable {
101            builder = builder
102                .with_prom_handler(
103                    self.instance.clone(),
104                    Some(self.instance.clone()),
105                    opts.prom_store.with_metric_engine,
106                    opts.http.prom_validation_mode,
107                )
108                .with_prometheus_handler(self.instance.clone());
109        }
110
111        if opts.otlp.enable {
112            builder = builder
113                .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
114        }
115
116        if opts.jaeger.enable {
117            builder = builder.with_jaeger_handler(self.instance.clone());
118        }
119
120        if let Some(configurator) = self.plugins.get::<RouterConfigurator>() {
121            info!("Adding extra router from plugins");
122            builder = builder.with_extra_router(configurator.router());
123        }
124
125        builder
126    }
127
128    pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
129        Self {
130            grpc_server_builder: Some(builder),
131            ..self
132        }
133    }
134
135    pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
136        Self {
137            http_server_builder: Some(builder),
138            ..self
139        }
140    }
141
142    fn build_grpc_server(
143        &mut self,
144        grpc: &GrpcOptions,
145        meta_client: &Option<MetaClientOptions>,
146        name: Option<String>,
147        external: bool,
148    ) -> Result<GrpcServer> {
149        let builder = if let Some(builder) = self.grpc_server_builder.take() {
150            builder
151        } else {
152            self.grpc_server_builder(grpc)?
153        };
154
155        let user_provider = if external {
156            self.plugins.get::<UserProviderRef>()
157        } else {
158            // skip authentication for internal grpc port
159            None
160        };
161
162        // Determine whether it is Standalone or Distributed mode based on whether the meta client is configured.
163        let runtime = if meta_client.is_none() {
164            Some(builder.runtime().clone())
165        } else {
166            None
167        };
168
169        let greptime_request_handler = GreptimeRequestHandler::new(
170            ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
171            user_provider.clone(),
172            runtime,
173            grpc.flight_compression,
174        );
175
176        let grpc_server = builder
177            .name(name)
178            .database_handler(greptime_request_handler.clone())
179            .prometheus_handler(self.instance.clone(), user_provider.clone())
180            .otel_arrow_handler(OtelArrowServiceHandler::new(
181                self.instance.clone(),
182                user_provider.clone(),
183            ))
184            .flight_handler(Arc::new(greptime_request_handler));
185
186        let grpc_server = if !external {
187            let frontend_grpc_handler =
188                FrontendGrpcHandler::new(self.instance.process_manager().clone());
189            grpc_server.frontend_grpc_handler(frontend_grpc_handler)
190        } else {
191            grpc_server
192        }
193        .build();
194
195        Ok(grpc_server)
196    }
197
198    fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result<HttpServer> {
199        let builder = if let Some(builder) = self.http_server_builder.take() {
200            builder
201        } else {
202            self.http_server_builder(opts)
203        };
204
205        let http_server = builder
206            .with_metrics_handler(MetricsHandler)
207            .with_plugins(self.plugins.clone())
208            .with_greptime_config_options(toml)
209            .build();
210        Ok(http_server)
211    }
212
213    pub fn build(mut self) -> Result<ServerHandlers> {
214        let opts = self.opts.clone();
215        let instance = self.instance.clone();
216
217        let toml = opts.to_toml().context(TomlFormatSnafu)?;
218        let opts: FrontendOptions = opts.into();
219
220        let handlers = ServerHandlers::default();
221
222        let user_provider = self.plugins.get::<UserProviderRef>();
223
224        {
225            // Always init GRPC server
226            let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
227            let grpc_server = self.build_grpc_server(&opts.grpc, &opts.meta_client, None, true)?;
228            handlers.insert((Box::new(grpc_server), grpc_addr));
229        }
230
231        if let Some(internal_grpc) = &opts.internal_grpc {
232            // Always init Internal GRPC server
233            let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
234            let grpc_server = self.build_grpc_server(
235                internal_grpc,
236                &opts.meta_client,
237                Some("INTERNAL_GRPC_SERVER".to_string()),
238                false,
239            )?;
240            handlers.insert((Box::new(grpc_server), grpc_addr));
241        }
242
243        {
244            // Always init HTTP server
245            let http_options = &opts.http;
246            let http_addr = parse_addr(&http_options.addr)?;
247            let http_server = self.build_http_server(&opts, toml)?;
248            handlers.insert((Box::new(http_server), http_addr));
249        }
250
251        if opts.mysql.enable {
252            // Init MySQL server
253            let opts = &opts.mysql;
254            let mysql_addr = parse_addr(&opts.addr)?;
255
256            let tls_server_config = Arc::new(
257                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
258            );
259
260            // will not watch if watch is disabled in tls option
261            maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
262
263            let mysql_server = MysqlServer::create_server(
264                common_runtime::global_runtime(),
265                Arc::new(MysqlSpawnRef::new(
266                    ServerSqlQueryHandlerAdapter::arc(instance.clone()),
267                    user_provider.clone(),
268                )),
269                Arc::new(MysqlSpawnConfig::new(
270                    opts.tls.should_force_tls(),
271                    tls_server_config,
272                    opts.keep_alive.as_secs(),
273                    opts.reject_no_database.unwrap_or(false),
274                    opts.prepared_stmt_cache_size,
275                )),
276                Some(instance.process_manager().clone()),
277            );
278            handlers.insert((mysql_server, mysql_addr));
279        }
280
281        if opts.postgres.enable {
282            // Init PosgresSQL Server
283            let opts = &opts.postgres;
284            let pg_addr = parse_addr(&opts.addr)?;
285
286            let tls_server_config = Arc::new(
287                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
288            );
289
290            maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
291
292            let pg_server = Box::new(PostgresServer::new(
293                ServerSqlQueryHandlerAdapter::arc(instance.clone()),
294                opts.tls.should_force_tls(),
295                tls_server_config,
296                opts.keep_alive.as_secs(),
297                common_runtime::global_runtime(),
298                user_provider.clone(),
299                Some(self.instance.process_manager().clone()),
300            )) as Box<dyn Server>;
301
302            handlers.insert((pg_server, pg_addr));
303        }
304
305        Ok(handlers)
306    }
307}
308
309fn parse_addr(addr: &str) -> Result<SocketAddr> {
310    addr.parse().context(error::ParseAddrSnafu { addr })
311}