frontend/
frontend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_base::readable_size::ReadableSize;
18use common_config::config::Configurable;
19use common_event_recorder::EventRecorderOptions;
20use common_memory_manager::OnExhaustedPolicy;
21use common_options::datanode::DatanodeClientOptions;
22use common_options::memory::MemoryOptions;
23use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
24use meta_client::MetaClientOptions;
25use query::options::QueryOptions;
26use serde::{Deserialize, Serialize};
27use servers::grpc::GrpcOptions;
28use servers::http::HttpOptions;
29use servers::server::ServerHandlers;
30use snafu::ResultExt;
31
32use crate::error;
33use crate::error::Result;
34use crate::heartbeat::HeartbeatTask;
35use crate::instance::Instance;
36use crate::service_config::{
37    InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions,
38    PromStoreOptions,
39};
40
41#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
42#[serde(default)]
43pub struct FrontendOptions {
44    pub node_id: Option<String>,
45    pub default_timezone: Option<String>,
46    pub default_column_prefix: Option<String>,
47    /// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
48    /// Set to 0 to disable the limit. Default: "0" (unlimited)
49    pub max_in_flight_write_bytes: ReadableSize,
50    /// Policy when write bytes quota is exhausted.
51    /// Options: "wait" (default, 10s), "wait(<duration>)", "fail"
52    pub write_bytes_exhausted_policy: OnExhaustedPolicy,
53    pub http: HttpOptions,
54    pub grpc: GrpcOptions,
55    /// The internal gRPC options for the frontend service.
56    /// it provide the same service as the public gRPC service, just only for internal use.
57    pub internal_grpc: Option<GrpcOptions>,
58    pub mysql: MysqlOptions,
59    pub postgres: PostgresOptions,
60    pub opentsdb: OpentsdbOptions,
61    pub influxdb: InfluxdbOptions,
62    pub prom_store: PromStoreOptions,
63    pub jaeger: JaegerOptions,
64    pub otlp: OtlpOptions,
65    pub meta_client: Option<MetaClientOptions>,
66    pub logging: LoggingOptions,
67    pub datanode: DatanodeClientOptions,
68    pub user_provider: Option<String>,
69    pub tracing: TracingOptions,
70    pub query: QueryOptions,
71    pub slow_query: SlowQueryOptions,
72    pub memory: MemoryOptions,
73    /// The event recorder options.
74    pub event_recorder: EventRecorderOptions,
75}
76
77impl Default for FrontendOptions {
78    fn default() -> Self {
79        Self {
80            node_id: None,
81            default_timezone: None,
82            default_column_prefix: None,
83            max_in_flight_write_bytes: ReadableSize(0),
84            write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
85            http: HttpOptions::default(),
86            grpc: GrpcOptions::default(),
87            internal_grpc: None,
88            mysql: MysqlOptions::default(),
89            postgres: PostgresOptions::default(),
90            opentsdb: OpentsdbOptions::default(),
91            influxdb: InfluxdbOptions::default(),
92            jaeger: JaegerOptions::default(),
93            prom_store: PromStoreOptions::default(),
94            otlp: OtlpOptions::default(),
95            meta_client: None,
96            logging: LoggingOptions::default(),
97            datanode: DatanodeClientOptions::default(),
98            user_provider: None,
99            tracing: TracingOptions::default(),
100            query: QueryOptions::default(),
101            slow_query: SlowQueryOptions::default(),
102            memory: MemoryOptions::default(),
103            event_recorder: EventRecorderOptions::default(),
104        }
105    }
106}
107
108impl Configurable for FrontendOptions {
109    fn env_list_keys() -> Option<&'static [&'static str]> {
110        Some(&["meta_client.metasrv_addrs"])
111    }
112}
113
114/// The [`Frontend`] struct is the main entry point for the frontend service
115/// which contains server handlers, frontend instance and some background tasks.
116pub struct Frontend {
117    pub instance: Arc<Instance>,
118    pub servers: ServerHandlers,
119    pub heartbeat_task: Option<HeartbeatTask>,
120}
121
122impl Frontend {
123    pub async fn start(&mut self) -> Result<()> {
124        if let Some(t) = &self.heartbeat_task {
125            t.start().await?;
126        }
127
128        self.servers
129            .start_all()
130            .await
131            .context(error::StartServerSnafu)
132    }
133
134    pub async fn shutdown(&mut self) -> Result<()> {
135        self.servers
136            .shutdown_all()
137            .await
138            .context(error::ShutdownServerSnafu)
139    }
140
141    pub fn server_handlers(&self) -> &ServerHandlers {
142        &self.servers
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use std::sync::atomic::{AtomicBool, Ordering};
149    use std::time::Duration;
150
151    use api::v1::meta::heartbeat_server::HeartbeatServer;
152    use api::v1::meta::mailbox_message::Payload;
153    use api::v1::meta::{
154        AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, MailboxMessage,
155        Peer, ResponseHeader, Role, heartbeat_server,
156    };
157    use async_trait::async_trait;
158    use client::{Client, Database};
159    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
160    use common_error::ext::ErrorExt;
161    use common_error::from_header_to_err_code_msg;
162    use common_error::status_code::StatusCode;
163    use common_grpc::channel_manager::ChannelManager;
164    use common_meta::heartbeat::handler::HandlerGroupExecutor;
165    use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
166    use common_meta::heartbeat::handler::suspend::SuspendHandler;
167    use common_meta::instruction::Instruction;
168    use common_stat::ResourceStatImpl;
169    use meta_client::MetaClientRef;
170    use meta_client::client::MetaClientBuilder;
171    use meta_srv::service::GrpcStream;
172    use servers::grpc::{FlightCompression, GRPC_SERVER};
173    use servers::http::HTTP_SERVER;
174    use servers::http::result::greptime_result_v1::GreptimedbV1Response;
175    use tokio::sync::mpsc;
176    use tonic::codec::CompressionEncoding;
177    use tonic::codegen::tokio_stream::StreamExt;
178    use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
179    use tonic::{Request, Response, Status, Streaming};
180
181    use super::*;
182    use crate::instance::builder::FrontendBuilder;
183    use crate::server::Services;
184
185    #[test]
186    fn test_toml() {
187        let opts = FrontendOptions::default();
188        let toml_string = toml::to_string(&opts).unwrap();
189        let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap();
190    }
191
192    struct SuspendableHeartbeatServer {
193        suspend: Arc<AtomicBool>,
194    }
195
196    #[async_trait]
197    impl heartbeat_server::Heartbeat for SuspendableHeartbeatServer {
198        type HeartbeatStream = GrpcStream<HeartbeatResponse>;
199
200        async fn heartbeat(
201            &self,
202            request: Request<Streaming<HeartbeatRequest>>,
203        ) -> std::result::Result<Response<Self::HeartbeatStream>, Status> {
204            let (tx, rx) = mpsc::channel(4);
205
206            common_runtime::spawn_global({
207                let mut requests = request.into_inner();
208                let suspend = self.suspend.clone();
209                async move {
210                    while let Some(request) = requests.next().await {
211                        if let Err(e) = request {
212                            let _ = tx.send(Err(e)).await;
213                            return;
214                        }
215
216                        let mailbox_message =
217                            suspend.load(Ordering::Relaxed).then(|| MailboxMessage {
218                                payload: Some(Payload::Json(
219                                    serde_json::to_string(&Instruction::Suspend).unwrap(),
220                                )),
221                                ..Default::default()
222                            });
223                        let response = HeartbeatResponse {
224                            header: Some(ResponseHeader::success()),
225                            mailbox_message,
226                            ..Default::default()
227                        };
228
229                        let _ = tx.send(Ok(response)).await;
230                    }
231                }
232            });
233
234            Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
235        }
236
237        async fn ask_leader(
238            &self,
239            _: Request<AskLeaderRequest>,
240        ) -> std::result::Result<Response<AskLeaderResponse>, Status> {
241            Ok(Response::new(AskLeaderResponse {
242                header: Some(ResponseHeader::success()),
243                leader: Some(Peer {
244                    addr: "localhost:0".to_string(),
245                    ..Default::default()
246                }),
247            }))
248        }
249    }
250
251    async fn create_meta_client(
252        options: &MetaClientOptions,
253        heartbeat_server: Arc<SuspendableHeartbeatServer>,
254    ) -> MetaClientRef {
255        let (client, server) = tokio::io::duplex(1024);
256
257        // create the heartbeat server:
258        common_runtime::spawn_global(async move {
259            let mut router = tonic::transport::Server::builder();
260            let router = router.add_service(
261                HeartbeatServer::from_arc(heartbeat_server)
262                    .accept_compressed(CompressionEncoding::Zstd)
263                    .send_compressed(CompressionEncoding::Zstd),
264            );
265            router
266                .serve_with_incoming(futures::stream::iter([Ok::<_, std::io::Error>(server)]))
267                .await
268        });
269
270        // Move client to an option so we can _move_ the inner value
271        // on the first attempt to connect. All other attempts will fail.
272        let mut client = Some(client);
273        let connector = tower::service_fn(move |_| {
274            let client = client.take();
275            async move {
276                if let Some(client) = client {
277                    Ok(hyper_util::rt::TokioIo::new(client))
278                } else {
279                    Err(std::io::Error::other("client already taken"))
280                }
281            }
282        });
283        let manager = ChannelManager::new();
284        manager
285            .reset_with_connector("localhost:0", connector)
286            .unwrap();
287
288        // create the heartbeat client:
289        let mut client = MetaClientBuilder::new(0, Role::Frontend)
290            .enable_heartbeat()
291            .heartbeat_channel_manager(manager)
292            .build();
293        client.start(&options.metasrv_addrs).await.unwrap();
294        Arc::new(client)
295    }
296
297    async fn create_frontend(
298        options: &FrontendOptions,
299        meta_client: MetaClientRef,
300    ) -> Result<Frontend> {
301        let instance = Arc::new(
302            FrontendBuilder::new_test(options, meta_client.clone())
303                .try_build()
304                .await?,
305        );
306
307        let servers =
308            Services::new(options.clone(), instance.clone(), Default::default()).build()?;
309
310        let executor = Arc::new(HandlerGroupExecutor::new(vec![
311            Arc::new(ParseMailboxMessageHandler),
312            Arc::new(SuspendHandler::new(instance.suspend_state())),
313        ]));
314        let heartbeat_task = Some(HeartbeatTask::new(
315            options,
316            meta_client,
317            executor,
318            Arc::new(ResourceStatImpl::default()),
319        ));
320
321        let mut frontend = Frontend {
322            instance,
323            servers,
324            heartbeat_task,
325        };
326        frontend.start().await?;
327        Ok(frontend)
328    }
329
330    async fn verify_suspend_state_by_http(
331        frontend: &Frontend,
332        expected: std::result::Result<&str, (StatusCode, &str)>,
333    ) {
334        let addr = frontend.server_handlers().addr(HTTP_SERVER).unwrap();
335        let response = reqwest::get(format!("http://{}/v1/sql?sql=SELECT 1", addr))
336            .await
337            .unwrap();
338
339        let headers = response.headers();
340        let response = if let Some((code, error)) = from_header_to_err_code_msg(headers) {
341            Err((code, error))
342        } else {
343            Ok(response.text().await.unwrap())
344        };
345
346        match (response, expected) {
347            (Ok(response), Ok(expected)) => {
348                let response: GreptimedbV1Response = serde_json::from_str(&response).unwrap();
349                let response = serde_json::to_string(response.output()).unwrap();
350                assert_eq!(&response, expected);
351            }
352            (Err(actual), Err(expected)) => assert_eq!(actual, expected),
353            _ => unreachable!(),
354        }
355    }
356
357    async fn verify_suspend_state_by_grpc(
358        frontend: &Frontend,
359        expected: std::result::Result<&str, (StatusCode, &str)>,
360    ) {
361        let addr = frontend.server_handlers().addr(GRPC_SERVER).unwrap();
362        let client = Client::with_urls([addr.to_string()]);
363        let client = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
364        let response = client.sql("SELECT 1").await;
365
366        match (response, expected) {
367            (Ok(response), Ok(expected)) => {
368                let response = response.data.pretty_print().await;
369                assert_eq!(&response, expected.trim());
370            }
371            (Err(actual), Err(expected)) => {
372                assert_eq!(actual.status_code(), expected.0);
373                assert_eq!(actual.output_msg(), expected.1);
374            }
375            _ => unreachable!(),
376        }
377    }
378
379    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
380    async fn test_suspend_frontend() -> Result<()> {
381        common_telemetry::init_default_ut_logging();
382
383        let meta_client_options = MetaClientOptions {
384            metasrv_addrs: vec!["localhost:0".to_string()],
385            ..Default::default()
386        };
387        let options = FrontendOptions {
388            http: HttpOptions {
389                addr: "127.0.0.1:0".to_string(),
390                ..Default::default()
391            },
392            grpc: GrpcOptions {
393                bind_addr: "127.0.0.1:0".to_string(),
394                flight_compression: FlightCompression::None,
395                ..Default::default()
396            },
397            mysql: MysqlOptions {
398                enable: false,
399                ..Default::default()
400            },
401            postgres: PostgresOptions {
402                enable: false,
403                ..Default::default()
404            },
405            meta_client: Some(meta_client_options.clone()),
406            ..Default::default()
407        };
408
409        let server = Arc::new(SuspendableHeartbeatServer {
410            suspend: Arc::new(AtomicBool::new(false)),
411        });
412        let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
413        let frontend = create_frontend(&options, meta_client).await?;
414
415        use common_meta::distributed_time_constants::{
416            BASE_HEARTBEAT_INTERVAL, frontend_heartbeat_interval,
417        };
418        let frontend_heartbeat_interval =
419            frontend_heartbeat_interval(BASE_HEARTBEAT_INTERVAL) + Duration::from_secs(1);
420        tokio::time::sleep(frontend_heartbeat_interval).await;
421        // initial state: not suspend:
422        assert!(!frontend.instance.is_suspended());
423        verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
424        verify_suspend_state_by_grpc(
425            &frontend,
426            Ok(r#"
427+----------+
428| Int64(1) |
429+----------+
430| 1        |
431+----------+"#),
432        )
433        .await;
434
435        // make heartbeat server returned "suspend" instruction,
436        server.suspend.store(true, Ordering::Relaxed);
437        tokio::time::sleep(frontend_heartbeat_interval).await;
438        // ... then the frontend is suspended:
439        assert!(frontend.instance.is_suspended());
440        verify_suspend_state_by_http(
441            &frontend,
442            Err((
443                StatusCode::Suspended,
444                "error: Service suspended, execution_time_ms: 0",
445            )),
446        )
447        .await;
448        verify_suspend_state_by_grpc(&frontend, Err((StatusCode::Suspended, "Service suspended")))
449            .await;
450
451        // make heartbeat server NOT returned "suspend" instruction,
452        server.suspend.store(false, Ordering::Relaxed);
453        tokio::time::sleep(frontend_heartbeat_interval).await;
454        // ... then frontend's suspend state is cleared:
455        assert!(!frontend.instance.is_suspended());
456        verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
457        verify_suspend_state_by_grpc(
458            &frontend,
459            Ok(r#"
460+----------+
461| Int64(1) |
462+----------+
463| 1        |
464+----------+"#),
465        )
466        .await;
467        Ok(())
468    }
469}