1use std::sync::Arc;
16
17use common_base::readable_size::ReadableSize;
18use common_config::config::Configurable;
19use common_event_recorder::EventRecorderOptions;
20use common_memory_manager::OnExhaustedPolicy;
21use common_options::datanode::DatanodeClientOptions;
22use common_options::memory::MemoryOptions;
23use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
24use meta_client::MetaClientOptions;
25use query::options::QueryOptions;
26use serde::{Deserialize, Serialize};
27use servers::grpc::GrpcOptions;
28use servers::http::HttpOptions;
29use servers::server::ServerHandlers;
30use snafu::ResultExt;
31
32use crate::error;
33use crate::error::Result;
34use crate::heartbeat::HeartbeatTask;
35use crate::instance::Instance;
36use crate::service_config::{
37 InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions,
38 PromStoreOptions,
39};
40
41#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
42#[serde(default)]
43pub struct FrontendOptions {
44 pub node_id: Option<String>,
45 pub default_timezone: Option<String>,
46 pub default_column_prefix: Option<String>,
47 pub max_in_flight_write_bytes: ReadableSize,
50 pub write_bytes_exhausted_policy: OnExhaustedPolicy,
53 pub http: HttpOptions,
54 pub grpc: GrpcOptions,
55 pub internal_grpc: Option<GrpcOptions>,
58 pub mysql: MysqlOptions,
59 pub postgres: PostgresOptions,
60 pub opentsdb: OpentsdbOptions,
61 pub influxdb: InfluxdbOptions,
62 pub prom_store: PromStoreOptions,
63 pub jaeger: JaegerOptions,
64 pub otlp: OtlpOptions,
65 pub meta_client: Option<MetaClientOptions>,
66 pub logging: LoggingOptions,
67 pub datanode: DatanodeClientOptions,
68 pub user_provider: Option<String>,
69 pub tracing: TracingOptions,
70 pub query: QueryOptions,
71 pub slow_query: SlowQueryOptions,
72 pub memory: MemoryOptions,
73 pub event_recorder: EventRecorderOptions,
75}
76
77impl Default for FrontendOptions {
78 fn default() -> Self {
79 Self {
80 node_id: None,
81 default_timezone: None,
82 default_column_prefix: None,
83 max_in_flight_write_bytes: ReadableSize(0),
84 write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
85 http: HttpOptions::default(),
86 grpc: GrpcOptions::default(),
87 internal_grpc: None,
88 mysql: MysqlOptions::default(),
89 postgres: PostgresOptions::default(),
90 opentsdb: OpentsdbOptions::default(),
91 influxdb: InfluxdbOptions::default(),
92 jaeger: JaegerOptions::default(),
93 prom_store: PromStoreOptions::default(),
94 otlp: OtlpOptions::default(),
95 meta_client: None,
96 logging: LoggingOptions::default(),
97 datanode: DatanodeClientOptions::default(),
98 user_provider: None,
99 tracing: TracingOptions::default(),
100 query: QueryOptions::default(),
101 slow_query: SlowQueryOptions::default(),
102 memory: MemoryOptions::default(),
103 event_recorder: EventRecorderOptions::default(),
104 }
105 }
106}
107
108impl Configurable for FrontendOptions {
109 fn env_list_keys() -> Option<&'static [&'static str]> {
110 Some(&["meta_client.metasrv_addrs"])
111 }
112}
113
114pub struct Frontend {
117 pub instance: Arc<Instance>,
118 pub servers: ServerHandlers,
119 pub heartbeat_task: Option<HeartbeatTask>,
120}
121
122impl Frontend {
123 pub async fn start(&mut self) -> Result<()> {
124 if let Some(t) = &self.heartbeat_task {
125 t.start().await?;
126 }
127
128 self.servers
129 .start_all()
130 .await
131 .context(error::StartServerSnafu)
132 }
133
134 pub async fn shutdown(&mut self) -> Result<()> {
135 self.servers
136 .shutdown_all()
137 .await
138 .context(error::ShutdownServerSnafu)
139 }
140
141 pub fn server_handlers(&self) -> &ServerHandlers {
142 &self.servers
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use std::sync::atomic::{AtomicBool, Ordering};
149 use std::time::Duration;
150
151 use api::v1::meta::heartbeat_server::HeartbeatServer;
152 use api::v1::meta::mailbox_message::Payload;
153 use api::v1::meta::{
154 AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, MailboxMessage,
155 Peer, ResponseHeader, Role, heartbeat_server,
156 };
157 use async_trait::async_trait;
158 use client::{Client, Database};
159 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
160 use common_error::ext::ErrorExt;
161 use common_error::from_header_to_err_code_msg;
162 use common_error::status_code::StatusCode;
163 use common_grpc::channel_manager::ChannelManager;
164 use common_meta::heartbeat::handler::HandlerGroupExecutor;
165 use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
166 use common_meta::heartbeat::handler::suspend::SuspendHandler;
167 use common_meta::instruction::Instruction;
168 use common_stat::ResourceStatImpl;
169 use meta_client::MetaClientRef;
170 use meta_client::client::MetaClientBuilder;
171 use meta_srv::service::GrpcStream;
172 use servers::grpc::{FlightCompression, GRPC_SERVER};
173 use servers::http::HTTP_SERVER;
174 use servers::http::result::greptime_result_v1::GreptimedbV1Response;
175 use tokio::sync::mpsc;
176 use tonic::codec::CompressionEncoding;
177 use tonic::codegen::tokio_stream::StreamExt;
178 use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
179 use tonic::{Request, Response, Status, Streaming};
180
181 use super::*;
182 use crate::instance::builder::FrontendBuilder;
183 use crate::server::Services;
184
185 #[test]
186 fn test_toml() {
187 let opts = FrontendOptions::default();
188 let toml_string = toml::to_string(&opts).unwrap();
189 let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap();
190 }
191
192 struct SuspendableHeartbeatServer {
193 suspend: Arc<AtomicBool>,
194 }
195
196 #[async_trait]
197 impl heartbeat_server::Heartbeat for SuspendableHeartbeatServer {
198 type HeartbeatStream = GrpcStream<HeartbeatResponse>;
199
200 async fn heartbeat(
201 &self,
202 request: Request<Streaming<HeartbeatRequest>>,
203 ) -> std::result::Result<Response<Self::HeartbeatStream>, Status> {
204 let (tx, rx) = mpsc::channel(4);
205
206 common_runtime::spawn_global({
207 let mut requests = request.into_inner();
208 let suspend = self.suspend.clone();
209 async move {
210 let heartbeat_interval_ms = Duration::from_millis(200).as_millis() as u64;
213 let mut is_handshake = true;
214 while let Some(request) = requests.next().await {
215 if let Err(e) = request {
216 let _ = tx.send(Err(e)).await;
217 return;
218 }
219
220 let mailbox_message =
221 suspend.load(Ordering::Relaxed).then(|| MailboxMessage {
222 payload: Some(Payload::Json(
223 serde_json::to_string(&Instruction::Suspend).unwrap(),
224 )),
225 ..Default::default()
226 });
227 let heartbeat_config =
228 is_handshake.then_some(api::v1::meta::HeartbeatConfig {
229 heartbeat_interval_ms,
230 retry_interval_ms: heartbeat_interval_ms,
231 });
232 is_handshake = false;
233 let response = HeartbeatResponse {
234 header: Some(ResponseHeader::success()),
235 mailbox_message,
236 heartbeat_config,
237 ..Default::default()
238 };
239
240 let _ = tx.send(Ok(response)).await;
241 }
242 }
243 });
244
245 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
246 }
247
248 async fn ask_leader(
249 &self,
250 _: Request<AskLeaderRequest>,
251 ) -> std::result::Result<Response<AskLeaderResponse>, Status> {
252 Ok(Response::new(AskLeaderResponse {
253 header: Some(ResponseHeader::success()),
254 leader: Some(Peer {
255 addr: "localhost:0".to_string(),
256 ..Default::default()
257 }),
258 }))
259 }
260 }
261
262 async fn create_meta_client(
263 options: &MetaClientOptions,
264 heartbeat_server: Arc<SuspendableHeartbeatServer>,
265 ) -> MetaClientRef {
266 let (client, server) = tokio::io::duplex(1024);
267
268 common_runtime::spawn_global(async move {
270 let mut router = tonic::transport::Server::builder();
271 let router = router.add_service(
272 HeartbeatServer::from_arc(heartbeat_server)
273 .accept_compressed(CompressionEncoding::Zstd)
274 .send_compressed(CompressionEncoding::Zstd),
275 );
276 router
277 .serve_with_incoming(futures::stream::iter([Ok::<_, std::io::Error>(server)]))
278 .await
279 });
280
281 let mut client = Some(client);
284 let connector = tower::service_fn(move |_| {
285 let client = client.take();
286 async move {
287 if let Some(client) = client {
288 Ok(hyper_util::rt::TokioIo::new(client))
289 } else {
290 Err(std::io::Error::other("client already taken"))
291 }
292 }
293 });
294 let manager = ChannelManager::new();
295 manager
296 .reset_with_connector("localhost:0", connector)
297 .unwrap();
298
299 let mut client = MetaClientBuilder::new(0, Role::Frontend)
301 .enable_heartbeat()
302 .heartbeat_channel_manager(manager)
303 .build();
304 client.start(&options.metasrv_addrs).await.unwrap();
305 Arc::new(client)
306 }
307
308 async fn create_frontend(
309 options: &FrontendOptions,
310 meta_client: MetaClientRef,
311 ) -> Result<Frontend> {
312 let instance = Arc::new(
313 FrontendBuilder::new_test(options, meta_client.clone())
314 .try_build()
315 .await?,
316 );
317
318 let servers =
319 Services::new(options.clone(), instance.clone(), Default::default()).build()?;
320
321 let executor = Arc::new(HandlerGroupExecutor::new(vec![
322 Arc::new(ParseMailboxMessageHandler),
323 Arc::new(SuspendHandler::new(instance.suspend_state())),
324 ]));
325 let heartbeat_task = Some(HeartbeatTask::new(
326 options,
327 meta_client,
328 executor,
329 Arc::new(ResourceStatImpl::default()),
330 ));
331
332 let mut frontend = Frontend {
333 instance,
334 servers,
335 heartbeat_task,
336 };
337 frontend.start().await?;
338 Ok(frontend)
339 }
340
341 async fn verify_suspend_state_by_http(
342 frontend: &Frontend,
343 expected: std::result::Result<&str, (StatusCode, &str)>,
344 ) {
345 let addr = frontend.server_handlers().addr(HTTP_SERVER).unwrap();
346 let response = reqwest::get(format!("http://{}/v1/sql?sql=SELECT 1", addr))
347 .await
348 .unwrap();
349
350 let headers = response.headers();
351 let response = if let Some((code, error)) = from_header_to_err_code_msg(headers) {
352 Err((code, error))
353 } else {
354 Ok(response.text().await.unwrap())
355 };
356
357 match (response, expected) {
358 (Ok(response), Ok(expected)) => {
359 let response: GreptimedbV1Response = serde_json::from_str(&response).unwrap();
360 let response = serde_json::to_string(response.output()).unwrap();
361 assert_eq!(&response, expected);
362 }
363 (Err(actual), Err(expected)) => assert_eq!(actual, expected),
364 _ => unreachable!(),
365 }
366 }
367
368 async fn verify_suspend_state_by_grpc(
369 frontend: &Frontend,
370 expected: std::result::Result<&str, (StatusCode, &str)>,
371 ) {
372 let addr = frontend.server_handlers().addr(GRPC_SERVER).unwrap();
373 let client = Client::with_urls([addr.to_string()]);
374 let client = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
375 let response = client.sql("SELECT 1").await;
376
377 match (response, expected) {
378 (Ok(response), Ok(expected)) => {
379 let response = response.data.pretty_print().await;
380 assert_eq!(&response, expected.trim());
381 }
382 (Err(actual), Err(expected)) => {
383 assert_eq!(actual.status_code(), expected.0);
384 assert_eq!(actual.output_msg(), expected.1);
385 }
386 _ => unreachable!(),
387 }
388 }
389
390 async fn wait_for_suspend_state(frontend: &Frontend, expected: bool) {
391 let check = || frontend.instance.is_suspended() == expected;
392 if check() {
393 return;
394 }
395
396 tokio::time::timeout(Duration::from_secs(5), async move {
397 while !check() {
398 tokio::time::sleep(Duration::from_millis(20)).await;
399 }
400 })
401 .await
402 .unwrap();
403 }
404
405 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
406 async fn test_suspend_frontend() -> Result<()> {
407 common_telemetry::init_default_ut_logging();
408
409 let meta_client_options = MetaClientOptions {
410 metasrv_addrs: vec!["localhost:0".to_string()],
411 ..Default::default()
412 };
413 let options = FrontendOptions {
414 http: HttpOptions {
415 addr: "127.0.0.1:0".to_string(),
416 ..Default::default()
417 },
418 grpc: GrpcOptions {
419 bind_addr: "127.0.0.1:0".to_string(),
420 flight_compression: FlightCompression::None,
421 ..Default::default()
422 },
423 mysql: MysqlOptions {
424 enable: false,
425 ..Default::default()
426 },
427 postgres: PostgresOptions {
428 enable: false,
429 ..Default::default()
430 },
431 meta_client: Some(meta_client_options.clone()),
432 ..Default::default()
433 };
434
435 let server = Arc::new(SuspendableHeartbeatServer {
436 suspend: Arc::new(AtomicBool::new(false)),
437 });
438 let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
439 let frontend = create_frontend(&options, meta_client).await?;
440
441 assert!(!frontend.instance.is_suspended());
443 verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
444 verify_suspend_state_by_grpc(
445 &frontend,
446 Ok(r#"
447+----------+
448| Int64(1) |
449+----------+
450| 1 |
451+----------+"#),
452 )
453 .await;
454
455 server.suspend.store(true, Ordering::Relaxed);
457 wait_for_suspend_state(&frontend, true).await;
458 assert!(frontend.instance.is_suspended());
460 verify_suspend_state_by_http(
461 &frontend,
462 Err((
463 StatusCode::Suspended,
464 "error: Service suspended, execution_time_ms: 0",
465 )),
466 )
467 .await;
468 verify_suspend_state_by_grpc(&frontend, Err((StatusCode::Suspended, "Service suspended")))
469 .await;
470
471 server.suspend.store(false, Ordering::Relaxed);
473 wait_for_suspend_state(&frontend, false).await;
474 assert!(!frontend.instance.is_suspended());
476 verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
477 verify_suspend_state_by_grpc(
478 &frontend,
479 Ok(r#"
480+----------+
481| Int64(1) |
482+----------+
483| 1 |
484+----------+"#),
485 )
486 .await;
487 Ok(())
488 }
489}