frontend/
server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use axum::extract::{Request, State};
20use axum::middleware::Next;
21use axum::response::IntoResponse;
22use common_base::Plugins;
23use common_config::Configurable;
24use common_telemetry::info;
25use meta_client::MetaClientOptions;
26use servers::error::Error as ServerError;
27use servers::grpc::builder::GrpcServerBuilder;
28use servers::grpc::flight::FlightCraftRef;
29use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
30use servers::grpc::greptime_handler::GreptimeRequestHandler;
31use servers::grpc::{GrpcOptions, GrpcServer};
32use servers::http::event::LogValidatorRef;
33use servers::http::result::error_result::ErrorResponse;
34use servers::http::utils::router::RouterConfigurator;
35use servers::http::{HttpServer, HttpServerBuilder};
36use servers::interceptor::LogIngestInterceptorRef;
37use servers::metrics_handler::MetricsHandler;
38use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
39use servers::otel_arrow::OtelArrowServiceHandler;
40use servers::postgres::PostgresServer;
41use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
42use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
43use servers::server::{Server, ServerHandlers};
44use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
45use snafu::ResultExt;
46use tonic::Status;
47
48use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
49use crate::frontend::FrontendOptions;
50use crate::instance::Instance;
51
52pub struct Services<T>
53where
54    T: Into<FrontendOptions> + Configurable + Clone,
55{
56    opts: T,
57    instance: Arc<Instance>,
58    grpc_server_builder: Option<GrpcServerBuilder>,
59    http_server_builder: Option<HttpServerBuilder>,
60    plugins: Plugins,
61    flight_handler: Option<FlightCraftRef>,
62}
63
64impl<T> Services<T>
65where
66    T: Into<FrontendOptions> + Configurable + Clone,
67{
68    pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
69        Self {
70            opts,
71            instance,
72            grpc_server_builder: None,
73            http_server_builder: None,
74            plugins,
75            flight_handler: None,
76        }
77    }
78
79    pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
80        let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
81            .with_tls_config(opts.tls.clone())
82            .context(error::InvalidTlsConfigSnafu)?;
83        Ok(builder)
84    }
85
86    pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
87        let mut builder = HttpServerBuilder::new(opts.http.clone())
88            .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
89
90        let validator = self.plugins.get::<LogValidatorRef>();
91        let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
92        builder =
93            builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
94        builder = builder.with_logs_handler(self.instance.clone());
95
96        if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
97            builder = builder.with_user_provider(user_provider);
98        }
99
100        if opts.opentsdb.enable {
101            builder = builder.with_opentsdb_handler(self.instance.clone());
102        }
103
104        if opts.influxdb.enable {
105            builder = builder.with_influxdb_handler(self.instance.clone());
106        }
107
108        if opts.prom_store.enable {
109            builder = builder
110                .with_prom_handler(
111                    self.instance.clone(),
112                    Some(self.instance.clone()),
113                    opts.prom_store.with_metric_engine,
114                    opts.http.prom_validation_mode,
115                )
116                .with_prometheus_handler(self.instance.clone());
117        }
118
119        if opts.otlp.enable {
120            builder = builder
121                .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
122        }
123
124        if opts.jaeger.enable {
125            builder = builder.with_jaeger_handler(self.instance.clone());
126        }
127
128        if let Some(configurator) = self.plugins.get::<RouterConfigurator>() {
129            info!("Adding extra router from plugins");
130            builder = builder.with_extra_router(configurator.router());
131        }
132
133        builder.add_layer(axum::middleware::from_fn_with_state(
134            self.instance.clone(),
135            async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
136                if state.is_suspended() {
137                    return ErrorResponse::from_error(servers::error::SuspendedSnafu.build())
138                        .into_response();
139                }
140                next.run(request).await
141            },
142        ))
143    }
144
145    pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
146        Self {
147            grpc_server_builder: Some(builder),
148            ..self
149        }
150    }
151
152    pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
153        Self {
154            http_server_builder: Some(builder),
155            ..self
156        }
157    }
158
159    pub fn with_flight_handler(self, flight_handler: FlightCraftRef) -> Self {
160        Self {
161            flight_handler: Some(flight_handler),
162            ..self
163        }
164    }
165
166    fn build_grpc_server(
167        &mut self,
168        grpc: &GrpcOptions,
169        meta_client: &Option<MetaClientOptions>,
170        name: Option<String>,
171        external: bool,
172    ) -> Result<GrpcServer> {
173        let builder = if let Some(builder) = self.grpc_server_builder.take() {
174            builder
175        } else {
176            self.grpc_server_builder(grpc)?
177        };
178
179        let user_provider = if external {
180            self.plugins.get::<UserProviderRef>()
181        } else {
182            // skip authentication for internal grpc port
183            None
184        };
185
186        // Determine whether it is Standalone or Distributed mode based on whether the meta client is configured.
187        let runtime = if meta_client.is_none() {
188            Some(builder.runtime().clone())
189        } else {
190            None
191        };
192
193        let greptime_request_handler = GreptimeRequestHandler::new(
194            ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
195            user_provider.clone(),
196            runtime,
197            grpc.flight_compression,
198        );
199
200        // Use custom flight handler if provided, otherwise use the default GreptimeRequestHandler
201        let flight_handler = self
202            .flight_handler
203            .clone()
204            .unwrap_or_else(|| Arc::new(greptime_request_handler.clone()) as FlightCraftRef);
205
206        let grpc_server = builder
207            .name(name)
208            .database_handler(greptime_request_handler.clone())
209            .prometheus_handler(self.instance.clone(), user_provider.clone())
210            .otel_arrow_handler(OtelArrowServiceHandler::new(
211                self.instance.clone(),
212                user_provider.clone(),
213            ))
214            .flight_handler(flight_handler)
215            .add_layer(axum::middleware::from_fn_with_state(
216                self.instance.clone(),
217                async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
218                    if state.is_suspended() {
219                        let status = Status::from(servers::error::SuspendedSnafu.build());
220                        return status.into_http();
221                    }
222                    next.run(request).await
223                },
224            ));
225
226        let grpc_server = if !external {
227            let frontend_grpc_handler =
228                FrontendGrpcHandler::new(self.instance.process_manager().clone());
229            grpc_server.frontend_grpc_handler(frontend_grpc_handler)
230        } else {
231            grpc_server
232        }
233        .build();
234
235        Ok(grpc_server)
236    }
237
238    fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result<HttpServer> {
239        let builder = if let Some(builder) = self.http_server_builder.take() {
240            builder
241        } else {
242            self.http_server_builder(opts)
243        };
244
245        let http_server = builder
246            .with_metrics_handler(MetricsHandler)
247            .with_plugins(self.plugins.clone())
248            .with_greptime_config_options(toml)
249            .build();
250        Ok(http_server)
251    }
252
253    pub fn build(mut self) -> Result<ServerHandlers> {
254        let opts = self.opts.clone();
255        let instance = self.instance.clone();
256
257        let toml = opts.to_toml().context(TomlFormatSnafu)?;
258        let opts: FrontendOptions = opts.into();
259
260        let handlers = ServerHandlers::default();
261
262        let user_provider = self.plugins.get::<UserProviderRef>();
263
264        {
265            // Always init GRPC server
266            let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
267            let grpc_server = self.build_grpc_server(&opts.grpc, &opts.meta_client, None, true)?;
268            handlers.insert((Box::new(grpc_server), grpc_addr));
269        }
270
271        if let Some(internal_grpc) = &opts.internal_grpc {
272            // Always init Internal GRPC server
273            let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
274            let grpc_server = self.build_grpc_server(
275                internal_grpc,
276                &opts.meta_client,
277                Some("INTERNAL_GRPC_SERVER".to_string()),
278                false,
279            )?;
280            handlers.insert((Box::new(grpc_server), grpc_addr));
281        }
282
283        {
284            // Always init HTTP server
285            let http_options = &opts.http;
286            let http_addr = parse_addr(&http_options.addr)?;
287            let http_server = self.build_http_server(&opts, toml)?;
288            handlers.insert((Box::new(http_server), http_addr));
289        }
290
291        if opts.mysql.enable {
292            // Init MySQL server
293            let opts = &opts.mysql;
294            let mysql_addr = parse_addr(&opts.addr)?;
295
296            let tls_server_config = Arc::new(
297                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
298            );
299
300            // will not watch if watch is disabled in tls option
301            maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
302
303            let mysql_server = MysqlServer::create_server(
304                common_runtime::global_runtime(),
305                Arc::new(MysqlSpawnRef::new(
306                    ServerSqlQueryHandlerAdapter::arc(instance.clone()),
307                    user_provider.clone(),
308                )),
309                Arc::new(MysqlSpawnConfig::new(
310                    opts.tls.should_force_tls(),
311                    tls_server_config,
312                    opts.keep_alive.as_secs(),
313                    opts.reject_no_database.unwrap_or(false),
314                    opts.prepared_stmt_cache_size,
315                )),
316                Some(instance.process_manager().clone()),
317            );
318            handlers.insert((mysql_server, mysql_addr));
319        }
320
321        if opts.postgres.enable {
322            // Init PosgresSQL Server
323            let opts = &opts.postgres;
324            let pg_addr = parse_addr(&opts.addr)?;
325
326            let tls_server_config = Arc::new(
327                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
328            );
329
330            maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
331
332            let pg_server = Box::new(PostgresServer::new(
333                ServerSqlQueryHandlerAdapter::arc(instance.clone()),
334                opts.tls.should_force_tls(),
335                tls_server_config,
336                opts.keep_alive.as_secs(),
337                common_runtime::global_runtime(),
338                user_provider.clone(),
339                Some(self.instance.process_manager().clone()),
340            )) as Box<dyn Server>;
341
342            handlers.insert((pg_server, pg_addr));
343        }
344
345        Ok(handlers)
346    }
347}
348
349fn parse_addr(addr: &str) -> Result<SocketAddr> {
350    addr.parse().context(error::ParseAddrSnafu { addr })
351}