frontend/
frontend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_base::readable_size::ReadableSize;
18use common_config::config::Configurable;
19use common_event_recorder::EventRecorderOptions;
20use common_memory_manager::OnExhaustedPolicy;
21use common_options::datanode::DatanodeClientOptions;
22use common_options::memory::MemoryOptions;
23use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
24use meta_client::MetaClientOptions;
25use query::options::QueryOptions;
26use serde::{Deserialize, Serialize};
27use servers::grpc::GrpcOptions;
28use servers::http::HttpOptions;
29use servers::server::ServerHandlers;
30use snafu::ResultExt;
31
32use crate::error;
33use crate::error::Result;
34use crate::heartbeat::HeartbeatTask;
35use crate::instance::Instance;
36use crate::service_config::{
37    InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions,
38    PromStoreOptions,
39};
40
41#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
42#[serde(default)]
43pub struct FrontendOptions {
44    pub node_id: Option<String>,
45    pub default_timezone: Option<String>,
46    pub default_column_prefix: Option<String>,
47    /// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
48    /// Set to 0 to disable the limit. Default: "0" (unlimited)
49    pub max_in_flight_write_bytes: ReadableSize,
50    /// Policy when write bytes quota is exhausted.
51    /// Options: "wait" (default, 10s), "wait(<duration>)", "fail"
52    pub write_bytes_exhausted_policy: OnExhaustedPolicy,
53    pub http: HttpOptions,
54    pub grpc: GrpcOptions,
55    /// The internal gRPC options for the frontend service.
56    /// it provide the same service as the public gRPC service, just only for internal use.
57    pub internal_grpc: Option<GrpcOptions>,
58    pub mysql: MysqlOptions,
59    pub postgres: PostgresOptions,
60    pub opentsdb: OpentsdbOptions,
61    pub influxdb: InfluxdbOptions,
62    pub prom_store: PromStoreOptions,
63    pub jaeger: JaegerOptions,
64    pub otlp: OtlpOptions,
65    pub meta_client: Option<MetaClientOptions>,
66    pub logging: LoggingOptions,
67    pub datanode: DatanodeClientOptions,
68    pub user_provider: Option<String>,
69    pub tracing: TracingOptions,
70    pub query: QueryOptions,
71    pub slow_query: SlowQueryOptions,
72    pub memory: MemoryOptions,
73    /// The event recorder options.
74    pub event_recorder: EventRecorderOptions,
75}
76
77impl Default for FrontendOptions {
78    fn default() -> Self {
79        Self {
80            node_id: None,
81            default_timezone: None,
82            default_column_prefix: None,
83            max_in_flight_write_bytes: ReadableSize(0),
84            write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
85            http: HttpOptions::default(),
86            grpc: GrpcOptions::default(),
87            internal_grpc: None,
88            mysql: MysqlOptions::default(),
89            postgres: PostgresOptions::default(),
90            opentsdb: OpentsdbOptions::default(),
91            influxdb: InfluxdbOptions::default(),
92            jaeger: JaegerOptions::default(),
93            prom_store: PromStoreOptions::default(),
94            otlp: OtlpOptions::default(),
95            meta_client: None,
96            logging: LoggingOptions::default(),
97            datanode: DatanodeClientOptions::default(),
98            user_provider: None,
99            tracing: TracingOptions::default(),
100            query: QueryOptions::default(),
101            slow_query: SlowQueryOptions::default(),
102            memory: MemoryOptions::default(),
103            event_recorder: EventRecorderOptions::default(),
104        }
105    }
106}
107
108impl Configurable for FrontendOptions {
109    fn env_list_keys() -> Option<&'static [&'static str]> {
110        Some(&["meta_client.metasrv_addrs"])
111    }
112}
113
114/// The [`Frontend`] struct is the main entry point for the frontend service
115/// which contains server handlers, frontend instance and some background tasks.
116pub struct Frontend {
117    pub instance: Arc<Instance>,
118    pub servers: ServerHandlers,
119    pub heartbeat_task: Option<HeartbeatTask>,
120}
121
122impl Frontend {
123    pub async fn start(&mut self) -> Result<()> {
124        if let Some(t) = &self.heartbeat_task {
125            t.start().await?;
126        }
127
128        self.servers
129            .start_all()
130            .await
131            .context(error::StartServerSnafu)
132    }
133
134    pub async fn shutdown(&mut self) -> Result<()> {
135        self.servers
136            .shutdown_all()
137            .await
138            .context(error::ShutdownServerSnafu)
139    }
140
141    pub fn server_handlers(&self) -> &ServerHandlers {
142        &self.servers
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use std::sync::atomic::{AtomicBool, Ordering};
149    use std::time::Duration;
150
151    use api::v1::meta::heartbeat_server::HeartbeatServer;
152    use api::v1::meta::mailbox_message::Payload;
153    use api::v1::meta::{
154        AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, MailboxMessage,
155        Peer, ResponseHeader, Role, heartbeat_server,
156    };
157    use async_trait::async_trait;
158    use client::{Client, Database};
159    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
160    use common_error::ext::ErrorExt;
161    use common_error::from_header_to_err_code_msg;
162    use common_error::status_code::StatusCode;
163    use common_grpc::channel_manager::ChannelManager;
164    use common_meta::heartbeat::handler::HandlerGroupExecutor;
165    use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
166    use common_meta::heartbeat::handler::suspend::SuspendHandler;
167    use common_meta::instruction::Instruction;
168    use common_stat::ResourceStatImpl;
169    use meta_client::MetaClientRef;
170    use meta_client::client::MetaClientBuilder;
171    use meta_srv::service::GrpcStream;
172    use servers::grpc::{FlightCompression, GRPC_SERVER};
173    use servers::http::HTTP_SERVER;
174    use servers::http::result::greptime_result_v1::GreptimedbV1Response;
175    use tokio::sync::mpsc;
176    use tonic::codec::CompressionEncoding;
177    use tonic::codegen::tokio_stream::StreamExt;
178    use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
179    use tonic::{Request, Response, Status, Streaming};
180
181    use super::*;
182    use crate::instance::builder::FrontendBuilder;
183    use crate::server::Services;
184
185    #[test]
186    fn test_toml() {
187        let opts = FrontendOptions::default();
188        let toml_string = toml::to_string(&opts).unwrap();
189        let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap();
190    }
191
192    struct SuspendableHeartbeatServer {
193        suspend: Arc<AtomicBool>,
194    }
195
196    #[async_trait]
197    impl heartbeat_server::Heartbeat for SuspendableHeartbeatServer {
198        type HeartbeatStream = GrpcStream<HeartbeatResponse>;
199
200        async fn heartbeat(
201            &self,
202            request: Request<Streaming<HeartbeatRequest>>,
203        ) -> std::result::Result<Response<Self::HeartbeatStream>, Status> {
204            let (tx, rx) = mpsc::channel(4);
205
206            common_runtime::spawn_global({
207                let mut requests = request.into_inner();
208                let suspend = self.suspend.clone();
209                async move {
210                    // Make the heartbeat interval short in unit tests to reduce the waiting time.
211                    // Only the handshake response needs to populate it (as metasrv does).
212                    let heartbeat_interval_ms = Duration::from_millis(200).as_millis() as u64;
213                    let mut is_handshake = true;
214                    while let Some(request) = requests.next().await {
215                        if let Err(e) = request {
216                            let _ = tx.send(Err(e)).await;
217                            return;
218                        }
219
220                        let mailbox_message =
221                            suspend.load(Ordering::Relaxed).then(|| MailboxMessage {
222                                payload: Some(Payload::Json(
223                                    serde_json::to_string(&Instruction::Suspend).unwrap(),
224                                )),
225                                ..Default::default()
226                            });
227                        let heartbeat_config =
228                            is_handshake.then_some(api::v1::meta::HeartbeatConfig {
229                                heartbeat_interval_ms,
230                                retry_interval_ms: heartbeat_interval_ms,
231                            });
232                        is_handshake = false;
233                        let response = HeartbeatResponse {
234                            header: Some(ResponseHeader::success()),
235                            mailbox_message,
236                            heartbeat_config,
237                            ..Default::default()
238                        };
239
240                        let _ = tx.send(Ok(response)).await;
241                    }
242                }
243            });
244
245            Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
246        }
247
248        async fn ask_leader(
249            &self,
250            _: Request<AskLeaderRequest>,
251        ) -> std::result::Result<Response<AskLeaderResponse>, Status> {
252            Ok(Response::new(AskLeaderResponse {
253                header: Some(ResponseHeader::success()),
254                leader: Some(Peer {
255                    addr: "localhost:0".to_string(),
256                    ..Default::default()
257                }),
258            }))
259        }
260    }
261
262    async fn create_meta_client(
263        options: &MetaClientOptions,
264        heartbeat_server: Arc<SuspendableHeartbeatServer>,
265    ) -> MetaClientRef {
266        let (client, server) = tokio::io::duplex(1024);
267
268        // create the heartbeat server:
269        common_runtime::spawn_global(async move {
270            let mut router = tonic::transport::Server::builder();
271            let router = router.add_service(
272                HeartbeatServer::from_arc(heartbeat_server)
273                    .accept_compressed(CompressionEncoding::Zstd)
274                    .send_compressed(CompressionEncoding::Zstd),
275            );
276            router
277                .serve_with_incoming(futures::stream::iter([Ok::<_, std::io::Error>(server)]))
278                .await
279        });
280
281        // Move client to an option so we can _move_ the inner value
282        // on the first attempt to connect. All other attempts will fail.
283        let mut client = Some(client);
284        let connector = tower::service_fn(move |_| {
285            let client = client.take();
286            async move {
287                if let Some(client) = client {
288                    Ok(hyper_util::rt::TokioIo::new(client))
289                } else {
290                    Err(std::io::Error::other("client already taken"))
291                }
292            }
293        });
294        let manager = ChannelManager::new();
295        manager
296            .reset_with_connector("localhost:0", connector)
297            .unwrap();
298
299        // create the heartbeat client:
300        let mut client = MetaClientBuilder::new(0, Role::Frontend)
301            .enable_heartbeat()
302            .heartbeat_channel_manager(manager)
303            .build();
304        client.start(&options.metasrv_addrs).await.unwrap();
305        Arc::new(client)
306    }
307
308    async fn create_frontend(
309        options: &FrontendOptions,
310        meta_client: MetaClientRef,
311    ) -> Result<Frontend> {
312        let instance = Arc::new(
313            FrontendBuilder::new_test(options, meta_client.clone())
314                .try_build()
315                .await?,
316        );
317
318        let servers =
319            Services::new(options.clone(), instance.clone(), Default::default()).build()?;
320
321        let executor = Arc::new(HandlerGroupExecutor::new(vec![
322            Arc::new(ParseMailboxMessageHandler),
323            Arc::new(SuspendHandler::new(instance.suspend_state())),
324        ]));
325        let heartbeat_task = Some(HeartbeatTask::new(
326            options,
327            meta_client,
328            executor,
329            Arc::new(ResourceStatImpl::default()),
330        ));
331
332        let mut frontend = Frontend {
333            instance,
334            servers,
335            heartbeat_task,
336        };
337        frontend.start().await?;
338        Ok(frontend)
339    }
340
341    async fn verify_suspend_state_by_http(
342        frontend: &Frontend,
343        expected: std::result::Result<&str, (StatusCode, &str)>,
344    ) {
345        let addr = frontend.server_handlers().addr(HTTP_SERVER).unwrap();
346        let response = reqwest::get(format!("http://{}/v1/sql?sql=SELECT 1", addr))
347            .await
348            .unwrap();
349
350        let headers = response.headers();
351        let response = if let Some((code, error)) = from_header_to_err_code_msg(headers) {
352            Err((code, error))
353        } else {
354            Ok(response.text().await.unwrap())
355        };
356
357        match (response, expected) {
358            (Ok(response), Ok(expected)) => {
359                let response: GreptimedbV1Response = serde_json::from_str(&response).unwrap();
360                let response = serde_json::to_string(response.output()).unwrap();
361                assert_eq!(&response, expected);
362            }
363            (Err(actual), Err(expected)) => assert_eq!(actual, expected),
364            _ => unreachable!(),
365        }
366    }
367
368    async fn verify_suspend_state_by_grpc(
369        frontend: &Frontend,
370        expected: std::result::Result<&str, (StatusCode, &str)>,
371    ) {
372        let addr = frontend.server_handlers().addr(GRPC_SERVER).unwrap();
373        let client = Client::with_urls([addr.to_string()]);
374        let client = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
375        let response = client.sql("SELECT 1").await;
376
377        match (response, expected) {
378            (Ok(response), Ok(expected)) => {
379                let response = response.data.pretty_print().await;
380                assert_eq!(&response, expected.trim());
381            }
382            (Err(actual), Err(expected)) => {
383                assert_eq!(actual.status_code(), expected.0);
384                assert_eq!(actual.output_msg(), expected.1);
385            }
386            _ => unreachable!(),
387        }
388    }
389
390    async fn wait_for_suspend_state(frontend: &Frontend, expected: bool) {
391        let check = || frontend.instance.is_suspended() == expected;
392        if check() {
393            return;
394        }
395
396        tokio::time::timeout(Duration::from_secs(5), async move {
397            while !check() {
398                tokio::time::sleep(Duration::from_millis(20)).await;
399            }
400        })
401        .await
402        .unwrap();
403    }
404
405    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
406    async fn test_suspend_frontend() -> Result<()> {
407        common_telemetry::init_default_ut_logging();
408
409        let meta_client_options = MetaClientOptions {
410            metasrv_addrs: vec!["localhost:0".to_string()],
411            ..Default::default()
412        };
413        let options = FrontendOptions {
414            http: HttpOptions {
415                addr: "127.0.0.1:0".to_string(),
416                ..Default::default()
417            },
418            grpc: GrpcOptions {
419                bind_addr: "127.0.0.1:0".to_string(),
420                flight_compression: FlightCompression::None,
421                ..Default::default()
422            },
423            mysql: MysqlOptions {
424                enable: false,
425                ..Default::default()
426            },
427            postgres: PostgresOptions {
428                enable: false,
429                ..Default::default()
430            },
431            meta_client: Some(meta_client_options.clone()),
432            ..Default::default()
433        };
434
435        let server = Arc::new(SuspendableHeartbeatServer {
436            suspend: Arc::new(AtomicBool::new(false)),
437        });
438        let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
439        let frontend = create_frontend(&options, meta_client).await?;
440
441        // initial state: not suspend:
442        assert!(!frontend.instance.is_suspended());
443        verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
444        verify_suspend_state_by_grpc(
445            &frontend,
446            Ok(r#"
447+----------+
448| Int64(1) |
449+----------+
450| 1        |
451+----------+"#),
452        )
453        .await;
454
455        // make heartbeat server returned "suspend" instruction,
456        server.suspend.store(true, Ordering::Relaxed);
457        wait_for_suspend_state(&frontend, true).await;
458        // ... then the frontend is suspended:
459        assert!(frontend.instance.is_suspended());
460        verify_suspend_state_by_http(
461            &frontend,
462            Err((
463                StatusCode::Suspended,
464                "error: Service suspended, execution_time_ms: 0",
465            )),
466        )
467        .await;
468        verify_suspend_state_by_grpc(&frontend, Err((StatusCode::Suspended, "Service suspended")))
469            .await;
470
471        // make heartbeat server NOT returned "suspend" instruction,
472        server.suspend.store(false, Ordering::Relaxed);
473        wait_for_suspend_state(&frontend, false).await;
474        // ... then frontend's suspend state is cleared:
475        assert!(!frontend.instance.is_suspended());
476        verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
477        verify_suspend_state_by_grpc(
478            &frontend,
479            Ok(r#"
480+----------+
481| Int64(1) |
482+----------+
483| 1        |
484+----------+"#),
485        )
486        .await;
487        Ok(())
488    }
489}