frontend/
server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17
18use auth::UserProviderRef;
19use axum::extract::{Request, State};
20use axum::middleware::Next;
21use axum::response::IntoResponse;
22use common_base::Plugins;
23use common_config::Configurable;
24use common_telemetry::info;
25use meta_client::MetaClientOptions;
26use servers::error::Error as ServerError;
27use servers::grpc::builder::GrpcServerBuilder;
28use servers::grpc::flight::FlightCraftRef;
29use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
30use servers::grpc::greptime_handler::GreptimeRequestHandler;
31use servers::grpc::{GrpcOptions, GrpcServer};
32use servers::http::event::LogValidatorRef;
33use servers::http::result::error_result::ErrorResponse;
34use servers::http::utils::router::RouterConfigurator;
35use servers::http::{HttpServer, HttpServerBuilder};
36use servers::interceptor::LogIngestInterceptorRef;
37use servers::metrics_handler::MetricsHandler;
38use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
39use servers::otel_arrow::OtelArrowServiceHandler;
40use servers::postgres::PostgresServer;
41use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
42use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
43use servers::request_memory_limiter::ServerMemoryLimiter;
44use servers::server::{Server, ServerHandlers};
45use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
46use snafu::ResultExt;
47use tonic::Status;
48
49use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
50use crate::frontend::FrontendOptions;
51use crate::instance::Instance;
52
53pub struct Services<T>
54where
55    T: Into<FrontendOptions> + Configurable + Clone,
56{
57    opts: T,
58    instance: Arc<Instance>,
59    grpc_server_builder: Option<GrpcServerBuilder>,
60    http_server_builder: Option<HttpServerBuilder>,
61    plugins: Plugins,
62    flight_handler: Option<FlightCraftRef>,
63    pub server_memory_limiter: ServerMemoryLimiter,
64}
65
66impl<T> Services<T>
67where
68    T: Into<FrontendOptions> + Configurable + Clone,
69{
70    pub fn new(opts: T, instance: Arc<Instance>, plugins: Plugins) -> Self {
71        let feopts = opts.clone().into();
72        // Create server request memory limiter for all server protocols
73        let server_memory_limiter = ServerMemoryLimiter::new(
74            feopts.max_in_flight_write_bytes.as_bytes(),
75            feopts.write_bytes_exhausted_policy,
76        );
77
78        Self {
79            opts,
80            instance,
81            grpc_server_builder: None,
82            http_server_builder: None,
83            plugins,
84            flight_handler: None,
85            server_memory_limiter,
86        }
87    }
88
89    pub fn grpc_server_builder(
90        &self,
91        opts: &GrpcOptions,
92        request_memory_limiter: ServerMemoryLimiter,
93    ) -> Result<GrpcServerBuilder> {
94        let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
95            .with_memory_limiter(request_memory_limiter)
96            .with_tls_config(opts.tls.clone())
97            .context(error::InvalidTlsConfigSnafu)?;
98        Ok(builder)
99    }
100
101    pub fn http_server_builder(
102        &self,
103        opts: &FrontendOptions,
104        request_memory_limiter: ServerMemoryLimiter,
105    ) -> HttpServerBuilder {
106        let mut builder = HttpServerBuilder::new(opts.http.clone())
107            .with_memory_limiter(request_memory_limiter)
108            .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
109
110        let validator = self.plugins.get::<LogValidatorRef>();
111        let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
112        builder =
113            builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor);
114        builder = builder.with_logs_handler(self.instance.clone());
115
116        if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
117            builder = builder.with_user_provider(user_provider);
118        }
119
120        if opts.opentsdb.enable {
121            builder = builder.with_opentsdb_handler(self.instance.clone());
122        }
123
124        if opts.influxdb.enable {
125            builder = builder.with_influxdb_handler(self.instance.clone());
126        }
127
128        if opts.prom_store.enable {
129            builder = builder
130                .with_prom_handler(
131                    self.instance.clone(),
132                    Some(self.instance.clone()),
133                    opts.prom_store.with_metric_engine,
134                    opts.http.prom_validation_mode,
135                )
136                .with_prometheus_handler(self.instance.clone());
137        }
138
139        if opts.otlp.enable {
140            builder = builder
141                .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
142        }
143
144        if opts.jaeger.enable {
145            builder = builder.with_jaeger_handler(self.instance.clone());
146        }
147
148        if let Some(configurator) = self.plugins.get::<RouterConfigurator>() {
149            info!("Adding extra router from plugins");
150            builder = builder.with_extra_router(configurator.router());
151        }
152
153        builder.add_layer(axum::middleware::from_fn_with_state(
154            self.instance.clone(),
155            async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
156                if state.is_suspended() {
157                    return ErrorResponse::from_error(servers::error::SuspendedSnafu.build())
158                        .into_response();
159                }
160                next.run(request).await
161            },
162        ))
163    }
164
165    pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
166        Self {
167            grpc_server_builder: Some(builder),
168            ..self
169        }
170    }
171
172    pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self {
173        Self {
174            http_server_builder: Some(builder),
175            ..self
176        }
177    }
178
179    pub fn with_flight_handler(self, flight_handler: FlightCraftRef) -> Self {
180        Self {
181            flight_handler: Some(flight_handler),
182            ..self
183        }
184    }
185
186    fn build_grpc_server(
187        &mut self,
188        grpc: &GrpcOptions,
189        meta_client: &Option<MetaClientOptions>,
190        name: Option<String>,
191        external: bool,
192        request_memory_limiter: ServerMemoryLimiter,
193    ) -> Result<GrpcServer> {
194        let builder = if let Some(builder) = self.grpc_server_builder.take() {
195            builder
196        } else {
197            self.grpc_server_builder(grpc, request_memory_limiter)?
198        };
199
200        let user_provider = if external {
201            self.plugins.get::<UserProviderRef>()
202        } else {
203            // skip authentication for internal grpc port
204            None
205        };
206
207        // Determine whether it is Standalone or Distributed mode based on whether the meta client is configured.
208        let runtime = if meta_client.is_none() {
209            Some(builder.runtime().clone())
210        } else {
211            None
212        };
213
214        let greptime_request_handler = GreptimeRequestHandler::new(
215            ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
216            user_provider.clone(),
217            runtime,
218            grpc.flight_compression,
219        );
220
221        // Use custom flight handler if provided, otherwise use the default GreptimeRequestHandler
222        let flight_handler = self
223            .flight_handler
224            .clone()
225            .unwrap_or_else(|| Arc::new(greptime_request_handler.clone()) as FlightCraftRef);
226
227        let grpc_server = builder
228            .name(name)
229            .database_handler(greptime_request_handler.clone())
230            .prometheus_handler(self.instance.clone(), user_provider.clone())
231            .otel_arrow_handler(OtelArrowServiceHandler::new(
232                self.instance.clone(),
233                user_provider.clone(),
234            ))
235            .flight_handler(flight_handler)
236            .add_layer(axum::middleware::from_fn_with_state(
237                self.instance.clone(),
238                async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
239                    if state.is_suspended() {
240                        let status = Status::from(servers::error::SuspendedSnafu.build());
241                        return status.into_http();
242                    }
243                    next.run(request).await
244                },
245            ));
246
247        let grpc_server = if !external {
248            let frontend_grpc_handler =
249                FrontendGrpcHandler::new(self.instance.process_manager().clone());
250            grpc_server.frontend_grpc_handler(frontend_grpc_handler)
251        } else {
252            grpc_server
253        }
254        .build();
255
256        Ok(grpc_server)
257    }
258
259    fn build_http_server(
260        &mut self,
261        opts: &FrontendOptions,
262        toml: String,
263        request_memory_limiter: ServerMemoryLimiter,
264    ) -> Result<HttpServer> {
265        let builder = if let Some(builder) = self.http_server_builder.take() {
266            builder
267        } else {
268            self.http_server_builder(opts, request_memory_limiter)
269        };
270
271        let http_server = builder
272            .with_metrics_handler(MetricsHandler)
273            .with_plugins(self.plugins.clone())
274            .with_greptime_config_options(toml)
275            .build();
276        Ok(http_server)
277    }
278
279    pub fn build(mut self) -> Result<ServerHandlers> {
280        let opts = self.opts.clone();
281        let instance = self.instance.clone();
282
283        let toml = opts.to_toml().context(TomlFormatSnafu)?;
284        let opts: FrontendOptions = opts.into();
285
286        let handlers = ServerHandlers::default();
287
288        let user_provider = self.plugins.get::<UserProviderRef>();
289
290        {
291            // Always init GRPC server
292            let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
293            let grpc_server = self.build_grpc_server(
294                &opts.grpc,
295                &opts.meta_client,
296                None,
297                true,
298                self.server_memory_limiter.clone(),
299            )?;
300            handlers.insert((Box::new(grpc_server), grpc_addr));
301        }
302
303        if let Some(internal_grpc) = &opts.internal_grpc {
304            // Always init Internal GRPC server
305            let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
306            let grpc_server = self.build_grpc_server(
307                internal_grpc,
308                &opts.meta_client,
309                Some("INTERNAL_GRPC_SERVER".to_string()),
310                false,
311                self.server_memory_limiter.clone(),
312            )?;
313            handlers.insert((Box::new(grpc_server), grpc_addr));
314        }
315
316        {
317            // Always init HTTP server
318            let http_options = &opts.http;
319            let http_addr = parse_addr(&http_options.addr)?;
320            let http_server =
321                self.build_http_server(&opts, toml, self.server_memory_limiter.clone())?;
322            handlers.insert((Box::new(http_server), http_addr));
323        }
324
325        if opts.mysql.enable {
326            // Init MySQL server
327            let opts = &opts.mysql;
328            let mysql_addr = parse_addr(&opts.addr)?;
329
330            let tls_server_config = Arc::new(
331                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
332            );
333
334            // will not watch if watch is disabled in tls option
335            maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
336
337            let mysql_server = MysqlServer::create_server(
338                common_runtime::global_runtime(),
339                Arc::new(MysqlSpawnRef::new(
340                    ServerSqlQueryHandlerAdapter::arc(instance.clone()),
341                    user_provider.clone(),
342                )),
343                Arc::new(MysqlSpawnConfig::new(
344                    opts.tls.should_force_tls(),
345                    tls_server_config,
346                    opts.keep_alive.as_secs(),
347                    opts.reject_no_database.unwrap_or(false),
348                    opts.prepared_stmt_cache_size,
349                )),
350                Some(instance.process_manager().clone()),
351            );
352            handlers.insert((mysql_server, mysql_addr));
353        }
354
355        if opts.postgres.enable {
356            // Init PosgresSQL Server
357            let opts = &opts.postgres;
358            let pg_addr = parse_addr(&opts.addr)?;
359
360            let tls_server_config = Arc::new(
361                ReloadableTlsServerConfig::try_new(opts.tls.clone()).context(StartServerSnafu)?,
362            );
363
364            maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
365
366            let pg_server = Box::new(PostgresServer::new(
367                ServerSqlQueryHandlerAdapter::arc(instance.clone()),
368                opts.tls.should_force_tls(),
369                tls_server_config,
370                opts.keep_alive.as_secs(),
371                common_runtime::global_runtime(),
372                user_provider.clone(),
373                Some(self.instance.process_manager().clone()),
374            )) as Box<dyn Server>;
375
376            handlers.insert((pg_server, pg_addr));
377        }
378
379        Ok(handlers)
380    }
381}
382
383fn parse_addr(addr: &str) -> Result<SocketAddr> {
384    addr.parse().context(error::ParseAddrSnafu { addr })
385}