frontend/
frontend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_base::readable_size::ReadableSize;
18use common_config::config::Configurable;
19use common_event_recorder::EventRecorderOptions;
20use common_options::datanode::DatanodeClientOptions;
21use common_options::memory::MemoryOptions;
22use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
23use meta_client::MetaClientOptions;
24use query::options::QueryOptions;
25use serde::{Deserialize, Serialize};
26use servers::grpc::GrpcOptions;
27use servers::heartbeat_options::HeartbeatOptions;
28use servers::http::HttpOptions;
29use servers::server::ServerHandlers;
30use snafu::ResultExt;
31
32use crate::error;
33use crate::error::Result;
34use crate::heartbeat::HeartbeatTask;
35use crate::instance::Instance;
36use crate::service_config::{
37    InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions,
38    PromStoreOptions,
39};
40
41#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
42#[serde(default)]
43pub struct FrontendOptions {
44    pub node_id: Option<String>,
45    pub default_timezone: Option<String>,
46    pub default_column_prefix: Option<String>,
47    pub heartbeat: HeartbeatOptions,
48    pub http: HttpOptions,
49    pub grpc: GrpcOptions,
50    /// The internal gRPC options for the frontend service.
51    /// it provide the same service as the public gRPC service, just only for internal use.
52    pub internal_grpc: Option<GrpcOptions>,
53    pub mysql: MysqlOptions,
54    pub postgres: PostgresOptions,
55    pub opentsdb: OpentsdbOptions,
56    pub influxdb: InfluxdbOptions,
57    pub prom_store: PromStoreOptions,
58    pub jaeger: JaegerOptions,
59    pub otlp: OtlpOptions,
60    pub meta_client: Option<MetaClientOptions>,
61    pub logging: LoggingOptions,
62    pub datanode: DatanodeClientOptions,
63    pub user_provider: Option<String>,
64    pub tracing: TracingOptions,
65    pub query: QueryOptions,
66    pub max_in_flight_write_bytes: Option<ReadableSize>,
67    pub slow_query: SlowQueryOptions,
68    pub memory: MemoryOptions,
69    /// The event recorder options.
70    pub event_recorder: EventRecorderOptions,
71}
72
73impl Default for FrontendOptions {
74    fn default() -> Self {
75        Self {
76            node_id: None,
77            default_timezone: None,
78            default_column_prefix: None,
79            heartbeat: HeartbeatOptions::frontend_default(),
80            http: HttpOptions::default(),
81            grpc: GrpcOptions::default(),
82            internal_grpc: None,
83            mysql: MysqlOptions::default(),
84            postgres: PostgresOptions::default(),
85            opentsdb: OpentsdbOptions::default(),
86            influxdb: InfluxdbOptions::default(),
87            jaeger: JaegerOptions::default(),
88            prom_store: PromStoreOptions::default(),
89            otlp: OtlpOptions::default(),
90            meta_client: None,
91            logging: LoggingOptions::default(),
92            datanode: DatanodeClientOptions::default(),
93            user_provider: None,
94            tracing: TracingOptions::default(),
95            query: QueryOptions::default(),
96            max_in_flight_write_bytes: None,
97            slow_query: SlowQueryOptions::default(),
98            memory: MemoryOptions::default(),
99            event_recorder: EventRecorderOptions::default(),
100        }
101    }
102}
103
104impl Configurable for FrontendOptions {
105    fn env_list_keys() -> Option<&'static [&'static str]> {
106        Some(&["meta_client.metasrv_addrs"])
107    }
108}
109
110/// The [`Frontend`] struct is the main entry point for the frontend service
111/// which contains server handlers, frontend instance and some background tasks.
112pub struct Frontend {
113    pub instance: Arc<Instance>,
114    pub servers: ServerHandlers,
115    pub heartbeat_task: Option<HeartbeatTask>,
116}
117
118impl Frontend {
119    pub async fn start(&mut self) -> Result<()> {
120        if let Some(t) = &self.heartbeat_task {
121            t.start().await?;
122        }
123
124        self.servers
125            .start_all()
126            .await
127            .context(error::StartServerSnafu)
128    }
129
130    pub async fn shutdown(&mut self) -> Result<()> {
131        self.servers
132            .shutdown_all()
133            .await
134            .context(error::ShutdownServerSnafu)
135    }
136
137    pub fn server_handlers(&self) -> &ServerHandlers {
138        &self.servers
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use std::sync::atomic::{AtomicBool, Ordering};
145    use std::time::Duration;
146
147    use api::v1::meta::heartbeat_server::HeartbeatServer;
148    use api::v1::meta::mailbox_message::Payload;
149    use api::v1::meta::{
150        AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, MailboxMessage,
151        Peer, ResponseHeader, Role, heartbeat_server,
152    };
153    use async_trait::async_trait;
154    use client::{Client, Database};
155    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
156    use common_error::ext::ErrorExt;
157    use common_error::from_header_to_err_code_msg;
158    use common_error::status_code::StatusCode;
159    use common_grpc::channel_manager::ChannelManager;
160    use common_meta::heartbeat::handler::HandlerGroupExecutor;
161    use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
162    use common_meta::heartbeat::handler::suspend::SuspendHandler;
163    use common_meta::instruction::Instruction;
164    use common_stat::ResourceStatImpl;
165    use meta_client::MetaClientRef;
166    use meta_client::client::MetaClientBuilder;
167    use meta_srv::service::GrpcStream;
168    use servers::grpc::{FlightCompression, GRPC_SERVER};
169    use servers::http::HTTP_SERVER;
170    use servers::http::result::greptime_result_v1::GreptimedbV1Response;
171    use tokio::sync::mpsc;
172    use tonic::codec::CompressionEncoding;
173    use tonic::codegen::tokio_stream::StreamExt;
174    use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
175    use tonic::{Request, Response, Status, Streaming};
176
177    use super::*;
178    use crate::instance::builder::FrontendBuilder;
179    use crate::server::Services;
180
181    #[test]
182    fn test_toml() {
183        let opts = FrontendOptions::default();
184        let toml_string = toml::to_string(&opts).unwrap();
185        let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap();
186    }
187
188    struct SuspendableHeartbeatServer {
189        suspend: Arc<AtomicBool>,
190    }
191
192    #[async_trait]
193    impl heartbeat_server::Heartbeat for SuspendableHeartbeatServer {
194        type HeartbeatStream = GrpcStream<HeartbeatResponse>;
195
196        async fn heartbeat(
197            &self,
198            request: Request<Streaming<HeartbeatRequest>>,
199        ) -> std::result::Result<Response<Self::HeartbeatStream>, Status> {
200            let (tx, rx) = mpsc::channel(4);
201
202            common_runtime::spawn_global({
203                let mut requests = request.into_inner();
204                let suspend = self.suspend.clone();
205                async move {
206                    while let Some(request) = requests.next().await {
207                        if let Err(e) = request {
208                            let _ = tx.send(Err(e)).await;
209                            return;
210                        }
211
212                        let mailbox_message =
213                            suspend.load(Ordering::Relaxed).then(|| MailboxMessage {
214                                payload: Some(Payload::Json(
215                                    serde_json::to_string(&Instruction::Suspend).unwrap(),
216                                )),
217                                ..Default::default()
218                            });
219                        let response = HeartbeatResponse {
220                            header: Some(ResponseHeader::success()),
221                            mailbox_message,
222                            ..Default::default()
223                        };
224
225                        let _ = tx.send(Ok(response)).await;
226                    }
227                }
228            });
229
230            Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
231        }
232
233        async fn ask_leader(
234            &self,
235            _: Request<AskLeaderRequest>,
236        ) -> std::result::Result<Response<AskLeaderResponse>, Status> {
237            Ok(Response::new(AskLeaderResponse {
238                header: Some(ResponseHeader::success()),
239                leader: Some(Peer {
240                    addr: "localhost:0".to_string(),
241                    ..Default::default()
242                }),
243            }))
244        }
245    }
246
247    async fn create_meta_client(
248        options: &MetaClientOptions,
249        heartbeat_server: Arc<SuspendableHeartbeatServer>,
250    ) -> MetaClientRef {
251        let (client, server) = tokio::io::duplex(1024);
252
253        // create the heartbeat server:
254        common_runtime::spawn_global(async move {
255            let mut router = tonic::transport::Server::builder();
256            let router = router.add_service(
257                HeartbeatServer::from_arc(heartbeat_server)
258                    .accept_compressed(CompressionEncoding::Zstd)
259                    .send_compressed(CompressionEncoding::Zstd),
260            );
261            router
262                .serve_with_incoming(futures::stream::iter([Ok::<_, std::io::Error>(server)]))
263                .await
264        });
265
266        // Move client to an option so we can _move_ the inner value
267        // on the first attempt to connect. All other attempts will fail.
268        let mut client = Some(client);
269        let connector = tower::service_fn(move |_| {
270            let client = client.take();
271            async move {
272                if let Some(client) = client {
273                    Ok(hyper_util::rt::TokioIo::new(client))
274                } else {
275                    Err(std::io::Error::other("client already taken"))
276                }
277            }
278        });
279        let manager = ChannelManager::new();
280        manager
281            .reset_with_connector("localhost:0", connector)
282            .unwrap();
283
284        // create the heartbeat client:
285        let mut client = MetaClientBuilder::new(0, Role::Frontend)
286            .enable_heartbeat()
287            .heartbeat_channel_manager(manager)
288            .build();
289        client.start(&options.metasrv_addrs).await.unwrap();
290        Arc::new(client)
291    }
292
293    async fn create_frontend(
294        options: &FrontendOptions,
295        meta_client: MetaClientRef,
296    ) -> Result<Frontend> {
297        let instance = Arc::new(
298            FrontendBuilder::new_test(options, meta_client.clone())
299                .try_build()
300                .await?,
301        );
302
303        let servers =
304            Services::new(options.clone(), instance.clone(), Default::default()).build()?;
305
306        let executor = Arc::new(HandlerGroupExecutor::new(vec![
307            Arc::new(ParseMailboxMessageHandler),
308            Arc::new(SuspendHandler::new(instance.suspend_state())),
309        ]));
310        let heartbeat_task = Some(HeartbeatTask::new(
311            options,
312            meta_client,
313            executor,
314            Arc::new(ResourceStatImpl::default()),
315        ));
316
317        let mut frontend = Frontend {
318            instance,
319            servers,
320            heartbeat_task,
321        };
322        frontend.start().await?;
323        Ok(frontend)
324    }
325
326    async fn verify_suspend_state_by_http(
327        frontend: &Frontend,
328        expected: std::result::Result<&str, (StatusCode, &str)>,
329    ) {
330        let addr = frontend.server_handlers().addr(HTTP_SERVER).unwrap();
331        let response = reqwest::get(format!("http://{}/v1/sql?sql=SELECT 1", addr))
332            .await
333            .unwrap();
334
335        let headers = response.headers();
336        let response = if let Some((code, error)) = from_header_to_err_code_msg(headers) {
337            Err((code, error))
338        } else {
339            Ok(response.text().await.unwrap())
340        };
341
342        match (response, expected) {
343            (Ok(response), Ok(expected)) => {
344                let response: GreptimedbV1Response = serde_json::from_str(&response).unwrap();
345                let response = serde_json::to_string(response.output()).unwrap();
346                assert_eq!(&response, expected);
347            }
348            (Err(actual), Err(expected)) => assert_eq!(actual, expected),
349            _ => unreachable!(),
350        }
351    }
352
353    async fn verify_suspend_state_by_grpc(
354        frontend: &Frontend,
355        expected: std::result::Result<&str, (StatusCode, &str)>,
356    ) {
357        let addr = frontend.server_handlers().addr(GRPC_SERVER).unwrap();
358        let client = Client::with_urls([addr.to_string()]);
359        let client = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
360        let response = client.sql("SELECT 1").await;
361
362        match (response, expected) {
363            (Ok(response), Ok(expected)) => {
364                let response = response.data.pretty_print().await;
365                assert_eq!(&response, expected.trim());
366            }
367            (Err(actual), Err(expected)) => {
368                assert_eq!(actual.status_code(), expected.0);
369                assert_eq!(actual.output_msg(), expected.1);
370            }
371            _ => unreachable!(),
372        }
373    }
374
375    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
376    async fn test_suspend_frontend() -> Result<()> {
377        common_telemetry::init_default_ut_logging();
378
379        let meta_client_options = MetaClientOptions {
380            metasrv_addrs: vec!["localhost:0".to_string()],
381            ..Default::default()
382        };
383        let options = FrontendOptions {
384            http: HttpOptions {
385                addr: "127.0.0.1:0".to_string(),
386                ..Default::default()
387            },
388            grpc: GrpcOptions {
389                bind_addr: "127.0.0.1:0".to_string(),
390                flight_compression: FlightCompression::None,
391                ..Default::default()
392            },
393            mysql: MysqlOptions {
394                enable: false,
395                ..Default::default()
396            },
397            postgres: PostgresOptions {
398                enable: false,
399                ..Default::default()
400            },
401            meta_client: Some(meta_client_options.clone()),
402            heartbeat: HeartbeatOptions {
403                interval: Duration::from_secs(1),
404                ..Default::default()
405            },
406            ..Default::default()
407        };
408
409        let server = Arc::new(SuspendableHeartbeatServer {
410            suspend: Arc::new(AtomicBool::new(false)),
411        });
412        let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
413        let frontend = create_frontend(&options, meta_client).await?;
414
415        let frontend_heartbeat_interval = options.heartbeat.interval;
416        tokio::time::sleep(frontend_heartbeat_interval).await;
417        // initial state: not suspend:
418        assert!(!frontend.instance.is_suspended());
419        verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
420        verify_suspend_state_by_grpc(
421            &frontend,
422            Ok(r#"
423+----------+
424| Int64(1) |
425+----------+
426| 1        |
427+----------+"#),
428        )
429        .await;
430
431        // make heartbeat server returned "suspend" instruction,
432        server.suspend.store(true, Ordering::Relaxed);
433        tokio::time::sleep(frontend_heartbeat_interval).await;
434        // ... then the frontend is suspended:
435        assert!(frontend.instance.is_suspended());
436        verify_suspend_state_by_http(
437            &frontend,
438            Err((
439                StatusCode::Suspended,
440                "error: Service suspended, execution_time_ms: 0",
441            )),
442        )
443        .await;
444        verify_suspend_state_by_grpc(&frontend, Err((StatusCode::Suspended, "Service suspended")))
445            .await;
446
447        // make heartbeat server NOT returned "suspend" instruction,
448        server.suspend.store(false, Ordering::Relaxed);
449        tokio::time::sleep(frontend_heartbeat_interval).await;
450        // ... then frontend's suspend state is cleared:
451        assert!(!frontend.instance.is_suspended());
452        verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
453        verify_suspend_state_by_grpc(
454            &frontend,
455            Ok(r#"
456+----------+
457| Int64(1) |
458+----------+
459| 1        |
460+----------+"#),
461        )
462        .await;
463        Ok(())
464    }
465}