Skip to main content

frontend/
server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use axum::extract::{Request, State};
20use axum::middleware::Next;
21use axum::response::IntoResponse;
22use common_base::Plugins;
23use common_config::Configurable;
24use common_telemetry::info;
25use meta_client::MetaClientOptions;
26use servers::error::Error as ServerError;
27use servers::grpc::builder::GrpcServerBuilder;
28use servers::grpc::flight::FlightCraftRef;
29use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
30use servers::grpc::greptime_handler::GreptimeRequestHandler;
31use servers::grpc::{GrpcOptions, GrpcServer};
32use servers::http::event::LogValidatorRef;
33use servers::http::result::error_result::ErrorResponse;
34use servers::http::utils::router::RouterConfigurator;
35use servers::http::{HttpServer, HttpServerBuilder};
36use servers::interceptor::LogIngestInterceptorRef;
37use servers::metrics_handler::MetricsHandler;
38use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
39use servers::otel_arrow::OtelArrowServiceHandler;
40use servers::pending_rows_batcher::PendingRowsBatcher;
41use servers::postgres::PostgresServer;
42use servers::request_memory_limiter::ServerMemoryLimiter;
43use servers::server::{Server, ServerHandlers};
44use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
45use snafu::ResultExt;
46use tonic::Status;
47
48use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
49use crate::frontend::FrontendOptions;
50use crate::instance::Instance;
51
52pub struct Services<T>
53where
54    T: Into<FrontendOptions> + Configurable + Clone,
55{
56    opts: T,
57    instance: Arc<Instance>,
58    grpc_server_builder: Option<GrpcServerBuilder>,
59    http_server_builder: Option<HttpServerBuilder>,
60    plugins: Plugins,
61    flight_handler: Option<FlightCraftRef>,
62    pub server_memory_limiter: ServerMemoryLimiter,
63}
64
65impl<T> Services<T>
66where
67    T: Into<FrontendOptions> + Configurable + Clone,
68{
69    pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
70        let feopts = opts.clone().into();
71        // Create server request memory limiter for all server protocols
72        let server_memory_limiter = ServerMemoryLimiter::new(
73            feopts.max_in_flight_write_bytes.as_bytes(),
74            feopts.write_bytes_exhausted_policy,
75        );
76
77        Self {
78            opts,
79            instance,
80            grpc_server_builder: None,
81            http_server_builder: None,
82            plugins,
83            flight_handler: None,
84            server_memory_limiter,
85        }
86    }
87
88    pub fn grpc_server_builder(
89        &self,
90        opts: &GrpcOptions,
91        request_memory_limiter: ServerMemoryLimiter,
92    ) -> Result<GrpcServerBuilder> {
93        let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
94            .with_memory_limiter(request_memory_limiter)
95            .with_tls_config(opts.tls.clone())
96            .context(error::InvalidTlsConfigSnafu)?;
97        Ok(builder)
98    }
99
100    pub fn http_server_builder(
101        &self,
102        opts: &FrontendOptions,
103        request_memory_limiter: ServerMemoryLimiter,
104    ) -> HttpServerBuilder {
105        let mut builder = HttpServerBuilder::new(opts.http.clone())
106            .with_memory_limiter(request_memory_limiter)
107            .with_sql_handler(self.instance.clone());
108
109        let validator = self.plugins.get::<LogValidatorRef>();
110        let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
111        builder =
112            builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
113        builder = builder.with_logs_handler(self.instance.clone());
114
115        if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
116            builder = builder.with_user_provider(user_provider);
117        }
118
119        if opts.opentsdb.enable {
120            builder = builder.with_opentsdb_handler(self.instance.clone());
121        }
122
123        if opts.influxdb.enable {
124            builder = builder.with_influxdb_handler(self.instance.clone());
125        }
126
127        if opts.prom_store.enable {
128            let pending_rows_batcher = if opts.prom_store.with_metric_engine {
129                PendingRowsBatcher::try_new(
130                    self.instance.partition_manager().clone(),
131                    self.instance.node_manager().clone(),
132                    self.instance.catalog_manager().clone(),
133                    opts.prom_store.with_metric_engine,
134                    self.instance.clone(),
135                    opts.prom_store.pending_rows_flush_interval,
136                    opts.prom_store.max_batch_rows,
137                    opts.prom_store.max_concurrent_flushes,
138                    opts.prom_store.worker_channel_capacity,
139                    opts.prom_store.max_inflight_requests,
140                )
141            } else {
142                None
143            };
144            builder = builder
145                .with_prom_handler(
146                    self.instance.clone(),
147                    Some(self.instance.clone()),
148                    opts.prom_store.with_metric_engine,
149                    opts.http.prom_validation_mode,
150                    pending_rows_batcher,
151                )
152                .with_prometheus_handler(self.instance.clone());
153        }
154
155        if opts.otlp.enable {
156            builder = builder
157                .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
158        }
159
160        if opts.jaeger.enable {
161            builder = builder.with_jaeger_handler(self.instance.clone());
162        }
163
164        builder = builder.with_dashboard_handler(self.instance.clone());
165
166        if let Some(configurator) = self.plugins.get::<RouterConfigurator>() {
167            info!("Adding extra router from plugins");
168            builder = builder.with_extra_router(configurator.router());
169        }
170
171        builder.add_layer(axum::middleware::from_fn_with_state(
172            self.instance.clone(),
173            async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
174                if state.is_suspended() {
175                    return ErrorResponse::from_error(servers::error::SuspendedSnafu.build())
176                        .into_response();
177                }
178                next.run(request).await
179            },
180        ))
181    }
182
183    pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
184        Self {
185            grpc_server_builder: Some(builder),
186            ..self
187        }
188    }
189
190    pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
191        Self {
192            http_server_builder: Some(builder),
193            ..self
194        }
195    }
196
197    pub fn with_flight_handler(self, flight_handler: FlightCraftRef) -> Self {
198        Self {
199            flight_handler: Some(flight_handler),
200            ..self
201        }
202    }
203
204    fn build_grpc_server(
205        &mut self,
206        grpc: &GrpcOptions,
207        meta_client: &Option<MetaClientOptions>,
208        name: Option<String>,
209        external: bool,
210        request_memory_limiter: ServerMemoryLimiter,
211    ) -> Result<GrpcServer> {
212        let builder = if let Some(builder) = self.grpc_server_builder.take() {
213            builder
214        } else {
215            self.grpc_server_builder(grpc, request_memory_limiter)?
216        };
217
218        let user_provider = if external {
219            self.plugins.get::<UserProviderRef>()
220        } else {
221            // skip authentication for internal grpc port
222            None
223        };
224
225        // Determine whether it is Standalone or Distributed mode based on whether the meta client is configured.
226        let runtime = if meta_client.is_none() {
227            Some(builder.runtime().clone())
228        } else {
229            None
230        };
231
232        let greptime_request_handler = GreptimeRequestHandler::new(
233            self.instance.clone(),
234            user_provider.clone(),
235            runtime,
236            grpc.flight_compression,
237        );
238
239        // Use custom flight handler if provided, otherwise use the default GreptimeRequestHandler
240        let flight_handler = self
241            .flight_handler
242            .clone()
243            .unwrap_or_else(|| Arc::new(greptime_request_handler.clone()) as FlightCraftRef);
244
245        let grpc_server = builder
246            .name(name)
247            .database_handler(greptime_request_handler.clone())
248            .prometheus_handler(self.instance.clone(), user_provider.clone())
249            .otel_arrow_handler(OtelArrowServiceHandler::new(
250                self.instance.clone(),
251                user_provider.clone(),
252            ))
253            .flight_handler(flight_handler)
254            .add_layer(axum::middleware::from_fn_with_state(
255                self.instance.clone(),
256                async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
257                    if state.is_suspended() {
258                        let status = Status::from(servers::error::SuspendedSnafu.build());
259                        return status.into_http();
260                    }
261                    next.run(request).await
262                },
263            ));
264
265        let grpc_server = if !external {
266            let frontend_grpc_handler =
267                FrontendGrpcHandler::new(self.instance.process_manager().clone());
268            grpc_server.frontend_grpc_handler(frontend_grpc_handler)
269        } else {
270            grpc_server
271        }
272        .build();
273
274        Ok(grpc_server)
275    }
276
277    fn build_http_server(
278        &mut self,
279        opts: &FrontendOptions,
280        toml: String,
281        request_memory_limiter: ServerMemoryLimiter,
282    ) -> Result<HttpServer> {
283        let builder = if let Some(builder) = self.http_server_builder.take() {
284            builder
285        } else {
286            self.http_server_builder(opts, request_memory_limiter)
287        };
288
289        let http_server = builder
290            .with_metrics_handler(MetricsHandler)
291            .with_plugins(self.plugins.clone())
292            .with_greptime_config_options(toml)
293            .build();
294        Ok(http_server)
295    }
296
297    pub fn build(mut self) -> Result<ServerHandlers> {
298        let opts = self.opts.clone();
299        let instance = self.instance.clone();
300
301        let toml = opts.to_toml().context(TomlFormatSnafu)?;
302        let opts: FrontendOptions = opts.into();
303
304        let handlers = ServerHandlers::default();
305
306        let user_provider = self.plugins.get::<UserProviderRef>();
307
308        {
309            // Always init GRPC server
310            let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
311            let grpc_server = self.build_grpc_server(
312                &opts.grpc,
313                &opts.meta_client,
314                None,
315                true,
316                self.server_memory_limiter.clone(),
317            )?;
318            handlers.insert((Box::new(grpc_server), grpc_addr));
319        }
320
321        if let Some(internal_grpc) = &opts.internal_grpc {
322            // Always init Internal GRPC server
323            let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
324            let grpc_server = self.build_grpc_server(
325                internal_grpc,
326                &opts.meta_client,
327                Some("INTERNAL_GRPC_SERVER".to_string()),
328                false,
329                self.server_memory_limiter.clone(),
330            )?;
331            handlers.insert((Box::new(grpc_server), grpc_addr));
332        }
333
334        {
335            // Always init HTTP server
336            let http_options = &opts.http;
337            let http_addr = parse_addr(&http_options.addr)?;
338            let http_server =
339                self.build_http_server(&opts, toml, self.server_memory_limiter.clone())?;
340            handlers.insert((Box::new(http_server), http_addr));
341        }
342
343        if opts.mysql.enable {
344            // Init MySQL server
345            let opts = &opts.mysql;
346            let mysql_addr = parse_addr(&opts.addr)?;
347
348            let tls_server_config = Arc::new(
349                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
350            );
351
352            // will not watch if watch is disabled in tls option
353            maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
354
355            let mysql_server = MysqlServer::create_server(
356                common_runtime::global_runtime(),
357                Arc::new(MysqlSpawnRef::new(instance.clone(), user_provider.clone())),
358                Arc::new(MysqlSpawnConfig::new(
359                    opts.tls.should_force_tls(),
360                    tls_server_config,
361                    opts.keep_alive.as_secs(),
362                    opts.reject_no_database.unwrap_or(false),
363                    opts.prepared_stmt_cache_size,
364                )),
365                Some(instance.process_manager().clone()),
366            );
367            handlers.insert((mysql_server, mysql_addr));
368        }
369
370        if opts.postgres.enable {
371            // Init PosgresSQL Server
372            let opts = &opts.postgres;
373            let pg_addr = parse_addr(&opts.addr)?;
374
375            let tls_server_config = Arc::new(
376                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
377            );
378
379            maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
380
381            let pg_server = Box::new(PostgresServer::new(
382                instance.clone(),
383                opts.tls.should_force_tls(),
384                tls_server_config,
385                opts.keep_alive.as_secs(),
386                common_runtime::global_runtime(),
387                user_provider.clone(),
388                Some(self.instance.process_manager().clone()),
389            )) as Box<dyn Server>;
390
391            handlers.insert((pg_server, pg_addr));
392        }
393
394        Ok(handlers)
395    }
396}
397
398fn parse_addr(addr: &str) -> Result<SocketAddr> {
399    addr.parse().context(error::ParseAddrSnafu { addr })
400}