frontend/
frontend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_base::readable_size::ReadableSize;
18use common_config::config::Configurable;
19use common_event_recorder::EventRecorderOptions;
20use common_memory_manager::OnExhaustedPolicy;
21use common_options::datanode::DatanodeClientOptions;
22use common_options::memory::MemoryOptions;
23use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
24use meta_client::MetaClientOptions;
25use query::options::QueryOptions;
26use serde::{Deserialize, Serialize};
27use servers::grpc::GrpcOptions;
28use servers::heartbeat_options::HeartbeatOptions;
29use servers::http::HttpOptions;
30use servers::server::ServerHandlers;
31use snafu::ResultExt;
32
33use crate::error;
34use crate::error::Result;
35use crate::heartbeat::HeartbeatTask;
36use crate::instance::Instance;
37use crate::service_config::{
38    InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions,
39    PromStoreOptions,
40};
41
42#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
43#[serde(default)]
44pub struct FrontendOptions {
45    pub node_id: Option<String>,
46    pub default_timezone: Option<String>,
47    pub default_column_prefix: Option<String>,
48    pub heartbeat: HeartbeatOptions,
49    /// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
50    /// Set to 0 to disable the limit. Default: "0" (unlimited)
51    pub max_in_flight_write_bytes: ReadableSize,
52    /// Policy when write bytes quota is exhausted.
53    /// Options: "wait" (default, 10s), "wait(<duration>)", "fail"
54    pub write_bytes_exhausted_policy: OnExhaustedPolicy,
55    pub http: HttpOptions,
56    pub grpc: GrpcOptions,
57    /// The internal gRPC options for the frontend service.
58    /// it provide the same service as the public gRPC service, just only for internal use.
59    pub internal_grpc: Option<GrpcOptions>,
60    pub mysql: MysqlOptions,
61    pub postgres: PostgresOptions,
62    pub opentsdb: OpentsdbOptions,
63    pub influxdb: InfluxdbOptions,
64    pub prom_store: PromStoreOptions,
65    pub jaeger: JaegerOptions,
66    pub otlp: OtlpOptions,
67    pub meta_client: Option<MetaClientOptions>,
68    pub logging: LoggingOptions,
69    pub datanode: DatanodeClientOptions,
70    pub user_provider: Option<String>,
71    pub tracing: TracingOptions,
72    pub query: QueryOptions,
73    pub slow_query: SlowQueryOptions,
74    pub memory: MemoryOptions,
75    /// The event recorder options.
76    pub event_recorder: EventRecorderOptions,
77}
78
79impl Default for FrontendOptions {
80    fn default() -> Self {
81        Self {
82            node_id: None,
83            default_timezone: None,
84            default_column_prefix: None,
85            heartbeat: HeartbeatOptions::frontend_default(),
86            max_in_flight_write_bytes: ReadableSize(0),
87            write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
88            http: HttpOptions::default(),
89            grpc: GrpcOptions::default(),
90            internal_grpc: None,
91            mysql: MysqlOptions::default(),
92            postgres: PostgresOptions::default(),
93            opentsdb: OpentsdbOptions::default(),
94            influxdb: InfluxdbOptions::default(),
95            jaeger: JaegerOptions::default(),
96            prom_store: PromStoreOptions::default(),
97            otlp: OtlpOptions::default(),
98            meta_client: None,
99            logging: LoggingOptions::default(),
100            datanode: DatanodeClientOptions::default(),
101            user_provider: None,
102            tracing: TracingOptions::default(),
103            query: QueryOptions::default(),
104            slow_query: SlowQueryOptions::default(),
105            memory: MemoryOptions::default(),
106            event_recorder: EventRecorderOptions::default(),
107        }
108    }
109}
110
111impl Configurable for FrontendOptions {
112    fn env_list_keys() -> Option<&'static [&'static str]> {
113        Some(&["meta_client.metasrv_addrs"])
114    }
115}
116
117/// The [`Frontend`] struct is the main entry point for the frontend service
118/// which contains server handlers, frontend instance and some background tasks.
119pub struct Frontend {
120    pub instance: Arc<Instance>,
121    pub servers: ServerHandlers,
122    pub heartbeat_task: Option<HeartbeatTask>,
123}
124
125impl Frontend {
126    pub async fn start(&mut self) -> Result<()> {
127        if let Some(t) = &self.heartbeat_task {
128            t.start().await?;
129        }
130
131        self.servers
132            .start_all()
133            .await
134            .context(error::StartServerSnafu)
135    }
136
137    pub async fn shutdown(&mut self) -> Result<()> {
138        self.servers
139            .shutdown_all()
140            .await
141            .context(error::ShutdownServerSnafu)
142    }
143
144    pub fn server_handlers(&self) -> &ServerHandlers {
145        &self.servers
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use std::sync::atomic::{AtomicBool, Ordering};
152    use std::time::Duration;
153
154    use api::v1::meta::heartbeat_server::HeartbeatServer;
155    use api::v1::meta::mailbox_message::Payload;
156    use api::v1::meta::{
157        AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, MailboxMessage,
158        Peer, ResponseHeader, Role, heartbeat_server,
159    };
160    use async_trait::async_trait;
161    use client::{Client, Database};
162    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
163    use common_error::ext::ErrorExt;
164    use common_error::from_header_to_err_code_msg;
165    use common_error::status_code::StatusCode;
166    use common_grpc::channel_manager::ChannelManager;
167    use common_meta::heartbeat::handler::HandlerGroupExecutor;
168    use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
169    use common_meta::heartbeat::handler::suspend::SuspendHandler;
170    use common_meta::instruction::Instruction;
171    use common_stat::ResourceStatImpl;
172    use meta_client::MetaClientRef;
173    use meta_client::client::MetaClientBuilder;
174    use meta_srv::service::GrpcStream;
175    use servers::grpc::{FlightCompression, GRPC_SERVER};
176    use servers::http::HTTP_SERVER;
177    use servers::http::result::greptime_result_v1::GreptimedbV1Response;
178    use tokio::sync::mpsc;
179    use tonic::codec::CompressionEncoding;
180    use tonic::codegen::tokio_stream::StreamExt;
181    use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
182    use tonic::{Request, Response, Status, Streaming};
183
184    use super::*;
185    use crate::instance::builder::FrontendBuilder;
186    use crate::server::Services;
187
188    #[test]
189    fn test_toml() {
190        let opts = FrontendOptions::default();
191        let toml_string = toml::to_string(&opts).unwrap();
192        let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap();
193    }
194
195    struct SuspendableHeartbeatServer {
196        suspend: Arc<AtomicBool>,
197    }
198
199    #[async_trait]
200    impl heartbeat_server::Heartbeat for SuspendableHeartbeatServer {
201        type HeartbeatStream = GrpcStream<HeartbeatResponse>;
202
203        async fn heartbeat(
204            &self,
205            request: Request<Streaming<HeartbeatRequest>>,
206        ) -> std::result::Result<Response<Self::HeartbeatStream>, Status> {
207            let (tx, rx) = mpsc::channel(4);
208
209            common_runtime::spawn_global({
210                let mut requests = request.into_inner();
211                let suspend = self.suspend.clone();
212                async move {
213                    while let Some(request) = requests.next().await {
214                        if let Err(e) = request {
215                            let _ = tx.send(Err(e)).await;
216                            return;
217                        }
218
219                        let mailbox_message =
220                            suspend.load(Ordering::Relaxed).then(|| MailboxMessage {
221                                payload: Some(Payload::Json(
222                                    serde_json::to_string(&Instruction::Suspend).unwrap(),
223                                )),
224                                ..Default::default()
225                            });
226                        let response = HeartbeatResponse {
227                            header: Some(ResponseHeader::success()),
228                            mailbox_message,
229                            ..Default::default()
230                        };
231
232                        let _ = tx.send(Ok(response)).await;
233                    }
234                }
235            });
236
237            Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
238        }
239
240        async fn ask_leader(
241            &self,
242            _: Request<AskLeaderRequest>,
243        ) -> std::result::Result<Response<AskLeaderResponse>, Status> {
244            Ok(Response::new(AskLeaderResponse {
245                header: Some(ResponseHeader::success()),
246                leader: Some(Peer {
247                    addr: "localhost:0".to_string(),
248                    ..Default::default()
249                }),
250            }))
251        }
252    }
253
254    async fn create_meta_client(
255        options: &MetaClientOptions,
256        heartbeat_server: Arc<SuspendableHeartbeatServer>,
257    ) -> MetaClientRef {
258        let (client, server) = tokio::io::duplex(1024);
259
260        // create the heartbeat server:
261        common_runtime::spawn_global(async move {
262            let mut router = tonic::transport::Server::builder();
263            let router = router.add_service(
264                HeartbeatServer::from_arc(heartbeat_server)
265                    .accept_compressed(CompressionEncoding::Zstd)
266                    .send_compressed(CompressionEncoding::Zstd),
267            );
268            router
269                .serve_with_incoming(futures::stream::iter([Ok::<_, std::io::Error>(server)]))
270                .await
271        });
272
273        // Move client to an option so we can _move_ the inner value
274        // on the first attempt to connect. All other attempts will fail.
275        let mut client = Some(client);
276        let connector = tower::service_fn(move |_| {
277            let client = client.take();
278            async move {
279                if let Some(client) = client {
280                    Ok(hyper_util::rt::TokioIo::new(client))
281                } else {
282                    Err(std::io::Error::other("client already taken"))
283                }
284            }
285        });
286        let manager = ChannelManager::new();
287        manager
288            .reset_with_connector("localhost:0", connector)
289            .unwrap();
290
291        // create the heartbeat client:
292        let mut client = MetaClientBuilder::new(0, Role::Frontend)
293            .enable_heartbeat()
294            .heartbeat_channel_manager(manager)
295            .build();
296        client.start(&options.metasrv_addrs).await.unwrap();
297        Arc::new(client)
298    }
299
300    async fn create_frontend(
301        options: &FrontendOptions,
302        meta_client: MetaClientRef,
303    ) -> Result<Frontend> {
304        let instance = Arc::new(
305            FrontendBuilder::new_test(options, meta_client.clone())
306                .try_build()
307                .await?,
308        );
309
310        let servers =
311            Services::new(options.clone(), instance.clone(), Default::default()).build()?;
312
313        let executor = Arc::new(HandlerGroupExecutor::new(vec![
314            Arc::new(ParseMailboxMessageHandler),
315            Arc::new(SuspendHandler::new(instance.suspend_state())),
316        ]));
317        let heartbeat_task = Some(HeartbeatTask::new(
318            options,
319            meta_client,
320            executor,
321            Arc::new(ResourceStatImpl::default()),
322        ));
323
324        let mut frontend = Frontend {
325            instance,
326            servers,
327            heartbeat_task,
328        };
329        frontend.start().await?;
330        Ok(frontend)
331    }
332
333    async fn verify_suspend_state_by_http(
334        frontend: &Frontend,
335        expected: std::result::Result<&str, (StatusCode, &str)>,
336    ) {
337        let addr = frontend.server_handlers().addr(HTTP_SERVER).unwrap();
338        let response = reqwest::get(format!("http://{}/v1/sql?sql=SELECT 1", addr))
339            .await
340            .unwrap();
341
342        let headers = response.headers();
343        let response = if let Some((code, error)) = from_header_to_err_code_msg(headers) {
344            Err((code, error))
345        } else {
346            Ok(response.text().await.unwrap())
347        };
348
349        match (response, expected) {
350            (Ok(response), Ok(expected)) => {
351                let response: GreptimedbV1Response = serde_json::from_str(&response).unwrap();
352                let response = serde_json::to_string(response.output()).unwrap();
353                assert_eq!(&response, expected);
354            }
355            (Err(actual), Err(expected)) => assert_eq!(actual, expected),
356            _ => unreachable!(),
357        }
358    }
359
360    async fn verify_suspend_state_by_grpc(
361        frontend: &Frontend,
362        expected: std::result::Result<&str, (StatusCode, &str)>,
363    ) {
364        let addr = frontend.server_handlers().addr(GRPC_SERVER).unwrap();
365        let client = Client::with_urls([addr.to_string()]);
366        let client = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
367        let response = client.sql("SELECT 1").await;
368
369        match (response, expected) {
370            (Ok(response), Ok(expected)) => {
371                let response = response.data.pretty_print().await;
372                assert_eq!(&response, expected.trim());
373            }
374            (Err(actual), Err(expected)) => {
375                assert_eq!(actual.status_code(), expected.0);
376                assert_eq!(actual.output_msg(), expected.1);
377            }
378            _ => unreachable!(),
379        }
380    }
381
382    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
383    async fn test_suspend_frontend() -> Result<()> {
384        common_telemetry::init_default_ut_logging();
385
386        let meta_client_options = MetaClientOptions {
387            metasrv_addrs: vec!["localhost:0".to_string()],
388            ..Default::default()
389        };
390        let options = FrontendOptions {
391            http: HttpOptions {
392                addr: "127.0.0.1:0".to_string(),
393                ..Default::default()
394            },
395            grpc: GrpcOptions {
396                bind_addr: "127.0.0.1:0".to_string(),
397                flight_compression: FlightCompression::None,
398                ..Default::default()
399            },
400            mysql: MysqlOptions {
401                enable: false,
402                ..Default::default()
403            },
404            postgres: PostgresOptions {
405                enable: false,
406                ..Default::default()
407            },
408            meta_client: Some(meta_client_options.clone()),
409            heartbeat: HeartbeatOptions {
410                interval: Duration::from_secs(1),
411                ..Default::default()
412            },
413            ..Default::default()
414        };
415
416        let server = Arc::new(SuspendableHeartbeatServer {
417            suspend: Arc::new(AtomicBool::new(false)),
418        });
419        let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
420        let frontend = create_frontend(&options, meta_client).await?;
421
422        let frontend_heartbeat_interval = options.heartbeat.interval;
423        tokio::time::sleep(frontend_heartbeat_interval).await;
424        // initial state: not suspend:
425        assert!(!frontend.instance.is_suspended());
426        verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
427        verify_suspend_state_by_grpc(
428            &frontend,
429            Ok(r#"
430+----------+
431| Int64(1) |
432+----------+
433| 1        |
434+----------+"#),
435        )
436        .await;
437
438        // make heartbeat server returned "suspend" instruction,
439        server.suspend.store(true, Ordering::Relaxed);
440        tokio::time::sleep(frontend_heartbeat_interval).await;
441        // ... then the frontend is suspended:
442        assert!(frontend.instance.is_suspended());
443        verify_suspend_state_by_http(
444            &frontend,
445            Err((
446                StatusCode::Suspended,
447                "error: Service suspended, execution_time_ms: 0",
448            )),
449        )
450        .await;
451        verify_suspend_state_by_grpc(&frontend, Err((StatusCode::Suspended, "Service suspended")))
452            .await;
453
454        // make heartbeat server NOT returned "suspend" instruction,
455        server.suspend.store(false, Ordering::Relaxed);
456        tokio::time::sleep(frontend_heartbeat_interval).await;
457        // ... then frontend's suspend state is cleared:
458        assert!(!frontend.instance.is_suspended());
459        verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
460        verify_suspend_state_by_grpc(
461            &frontend,
462            Ok(r#"
463+----------+
464| Int64(1) |
465+----------+
466| 1        |
467+----------+"#),
468        )
469        .await;
470        Ok(())
471    }
472}