1use std::sync::Arc;
16
17use common_base::readable_size::ReadableSize;
18use common_config::config::Configurable;
19use common_event_recorder::EventRecorderOptions;
20use common_memory_manager::OnExhaustedPolicy;
21use common_options::datanode::DatanodeClientOptions;
22use common_options::memory::MemoryOptions;
23use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
24use meta_client::MetaClientOptions;
25use query::options::QueryOptions;
26use serde::{Deserialize, Serialize};
27use servers::grpc::GrpcOptions;
28use servers::heartbeat_options::HeartbeatOptions;
29use servers::http::HttpOptions;
30use servers::server::ServerHandlers;
31use snafu::ResultExt;
32
33use crate::error;
34use crate::error::Result;
35use crate::heartbeat::HeartbeatTask;
36use crate::instance::Instance;
37use crate::service_config::{
38 InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions,
39 PromStoreOptions,
40};
41
42#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
43#[serde(default)]
44pub struct FrontendOptions {
45 pub node_id: Option<String>,
46 pub default_timezone: Option<String>,
47 pub default_column_prefix: Option<String>,
48 pub heartbeat: HeartbeatOptions,
49 pub max_in_flight_write_bytes: ReadableSize,
52 pub write_bytes_exhausted_policy: OnExhaustedPolicy,
55 pub http: HttpOptions,
56 pub grpc: GrpcOptions,
57 pub internal_grpc: Option<GrpcOptions>,
60 pub mysql: MysqlOptions,
61 pub postgres: PostgresOptions,
62 pub opentsdb: OpentsdbOptions,
63 pub influxdb: InfluxdbOptions,
64 pub prom_store: PromStoreOptions,
65 pub jaeger: JaegerOptions,
66 pub otlp: OtlpOptions,
67 pub meta_client: Option<MetaClientOptions>,
68 pub logging: LoggingOptions,
69 pub datanode: DatanodeClientOptions,
70 pub user_provider: Option<String>,
71 pub tracing: TracingOptions,
72 pub query: QueryOptions,
73 pub slow_query: SlowQueryOptions,
74 pub memory: MemoryOptions,
75 pub event_recorder: EventRecorderOptions,
77}
78
79impl Default for FrontendOptions {
80 fn default() -> Self {
81 Self {
82 node_id: None,
83 default_timezone: None,
84 default_column_prefix: None,
85 heartbeat: HeartbeatOptions::frontend_default(),
86 max_in_flight_write_bytes: ReadableSize(0),
87 write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
88 http: HttpOptions::default(),
89 grpc: GrpcOptions::default(),
90 internal_grpc: None,
91 mysql: MysqlOptions::default(),
92 postgres: PostgresOptions::default(),
93 opentsdb: OpentsdbOptions::default(),
94 influxdb: InfluxdbOptions::default(),
95 jaeger: JaegerOptions::default(),
96 prom_store: PromStoreOptions::default(),
97 otlp: OtlpOptions::default(),
98 meta_client: None,
99 logging: LoggingOptions::default(),
100 datanode: DatanodeClientOptions::default(),
101 user_provider: None,
102 tracing: TracingOptions::default(),
103 query: QueryOptions::default(),
104 slow_query: SlowQueryOptions::default(),
105 memory: MemoryOptions::default(),
106 event_recorder: EventRecorderOptions::default(),
107 }
108 }
109}
110
111impl Configurable for FrontendOptions {
112 fn env_list_keys() -> Option<&'static [&'static str]> {
113 Some(&["meta_client.metasrv_addrs"])
114 }
115}
116
117pub struct Frontend {
120 pub instance: Arc<Instance>,
121 pub servers: ServerHandlers,
122 pub heartbeat_task: Option<HeartbeatTask>,
123}
124
125impl Frontend {
126 pub async fn start(&mut self) -> Result<()> {
127 if let Some(t) = &self.heartbeat_task {
128 t.start().await?;
129 }
130
131 self.servers
132 .start_all()
133 .await
134 .context(error::StartServerSnafu)
135 }
136
137 pub async fn shutdown(&mut self) -> Result<()> {
138 self.servers
139 .shutdown_all()
140 .await
141 .context(error::ShutdownServerSnafu)
142 }
143
144 pub fn server_handlers(&self) -> &ServerHandlers {
145 &self.servers
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use std::sync::atomic::{AtomicBool, Ordering};
152 use std::time::Duration;
153
154 use api::v1::meta::heartbeat_server::HeartbeatServer;
155 use api::v1::meta::mailbox_message::Payload;
156 use api::v1::meta::{
157 AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, MailboxMessage,
158 Peer, ResponseHeader, Role, heartbeat_server,
159 };
160 use async_trait::async_trait;
161 use client::{Client, Database};
162 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
163 use common_error::ext::ErrorExt;
164 use common_error::from_header_to_err_code_msg;
165 use common_error::status_code::StatusCode;
166 use common_grpc::channel_manager::ChannelManager;
167 use common_meta::heartbeat::handler::HandlerGroupExecutor;
168 use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
169 use common_meta::heartbeat::handler::suspend::SuspendHandler;
170 use common_meta::instruction::Instruction;
171 use common_stat::ResourceStatImpl;
172 use meta_client::MetaClientRef;
173 use meta_client::client::MetaClientBuilder;
174 use meta_srv::service::GrpcStream;
175 use servers::grpc::{FlightCompression, GRPC_SERVER};
176 use servers::http::HTTP_SERVER;
177 use servers::http::result::greptime_result_v1::GreptimedbV1Response;
178 use tokio::sync::mpsc;
179 use tonic::codec::CompressionEncoding;
180 use tonic::codegen::tokio_stream::StreamExt;
181 use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
182 use tonic::{Request, Response, Status, Streaming};
183
184 use super::*;
185 use crate::instance::builder::FrontendBuilder;
186 use crate::server::Services;
187
188 #[test]
189 fn test_toml() {
190 let opts = FrontendOptions::default();
191 let toml_string = toml::to_string(&opts).unwrap();
192 let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap();
193 }
194
195 struct SuspendableHeartbeatServer {
196 suspend: Arc<AtomicBool>,
197 }
198
199 #[async_trait]
200 impl heartbeat_server::Heartbeat for SuspendableHeartbeatServer {
201 type HeartbeatStream = GrpcStream<HeartbeatResponse>;
202
203 async fn heartbeat(
204 &self,
205 request: Request<Streaming<HeartbeatRequest>>,
206 ) -> std::result::Result<Response<Self::HeartbeatStream>, Status> {
207 let (tx, rx) = mpsc::channel(4);
208
209 common_runtime::spawn_global({
210 let mut requests = request.into_inner();
211 let suspend = self.suspend.clone();
212 async move {
213 while let Some(request) = requests.next().await {
214 if let Err(e) = request {
215 let _ = tx.send(Err(e)).await;
216 return;
217 }
218
219 let mailbox_message =
220 suspend.load(Ordering::Relaxed).then(|| MailboxMessage {
221 payload: Some(Payload::Json(
222 serde_json::to_string(&Instruction::Suspend).unwrap(),
223 )),
224 ..Default::default()
225 });
226 let response = HeartbeatResponse {
227 header: Some(ResponseHeader::success()),
228 mailbox_message,
229 ..Default::default()
230 };
231
232 let _ = tx.send(Ok(response)).await;
233 }
234 }
235 });
236
237 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
238 }
239
240 async fn ask_leader(
241 &self,
242 _: Request<AskLeaderRequest>,
243 ) -> std::result::Result<Response<AskLeaderResponse>, Status> {
244 Ok(Response::new(AskLeaderResponse {
245 header: Some(ResponseHeader::success()),
246 leader: Some(Peer {
247 addr: "localhost:0".to_string(),
248 ..Default::default()
249 }),
250 }))
251 }
252 }
253
254 async fn create_meta_client(
255 options: &MetaClientOptions,
256 heartbeat_server: Arc<SuspendableHeartbeatServer>,
257 ) -> MetaClientRef {
258 let (client, server) = tokio::io::duplex(1024);
259
260 common_runtime::spawn_global(async move {
262 let mut router = tonic::transport::Server::builder();
263 let router = router.add_service(
264 HeartbeatServer::from_arc(heartbeat_server)
265 .accept_compressed(CompressionEncoding::Zstd)
266 .send_compressed(CompressionEncoding::Zstd),
267 );
268 router
269 .serve_with_incoming(futures::stream::iter([Ok::<_, std::io::Error>(server)]))
270 .await
271 });
272
273 let mut client = Some(client);
276 let connector = tower::service_fn(move |_| {
277 let client = client.take();
278 async move {
279 if let Some(client) = client {
280 Ok(hyper_util::rt::TokioIo::new(client))
281 } else {
282 Err(std::io::Error::other("client already taken"))
283 }
284 }
285 });
286 let manager = ChannelManager::new();
287 manager
288 .reset_with_connector("localhost:0", connector)
289 .unwrap();
290
291 let mut client = MetaClientBuilder::new(0, Role::Frontend)
293 .enable_heartbeat()
294 .heartbeat_channel_manager(manager)
295 .build();
296 client.start(&options.metasrv_addrs).await.unwrap();
297 Arc::new(client)
298 }
299
300 async fn create_frontend(
301 options: &FrontendOptions,
302 meta_client: MetaClientRef,
303 ) -> Result<Frontend> {
304 let instance = Arc::new(
305 FrontendBuilder::new_test(options, meta_client.clone())
306 .try_build()
307 .await?,
308 );
309
310 let servers =
311 Services::new(options.clone(), instance.clone(), Default::default()).build()?;
312
313 let executor = Arc::new(HandlerGroupExecutor::new(vec![
314 Arc::new(ParseMailboxMessageHandler),
315 Arc::new(SuspendHandler::new(instance.suspend_state())),
316 ]));
317 let heartbeat_task = Some(HeartbeatTask::new(
318 options,
319 meta_client,
320 executor,
321 Arc::new(ResourceStatImpl::default()),
322 ));
323
324 let mut frontend = Frontend {
325 instance,
326 servers,
327 heartbeat_task,
328 };
329 frontend.start().await?;
330 Ok(frontend)
331 }
332
333 async fn verify_suspend_state_by_http(
334 frontend: &Frontend,
335 expected: std::result::Result<&str, (StatusCode, &str)>,
336 ) {
337 let addr = frontend.server_handlers().addr(HTTP_SERVER).unwrap();
338 let response = reqwest::get(format!("http://{}/v1/sql?sql=SELECT 1", addr))
339 .await
340 .unwrap();
341
342 let headers = response.headers();
343 let response = if let Some((code, error)) = from_header_to_err_code_msg(headers) {
344 Err((code, error))
345 } else {
346 Ok(response.text().await.unwrap())
347 };
348
349 match (response, expected) {
350 (Ok(response), Ok(expected)) => {
351 let response: GreptimedbV1Response = serde_json::from_str(&response).unwrap();
352 let response = serde_json::to_string(response.output()).unwrap();
353 assert_eq!(&response, expected);
354 }
355 (Err(actual), Err(expected)) => assert_eq!(actual, expected),
356 _ => unreachable!(),
357 }
358 }
359
360 async fn verify_suspend_state_by_grpc(
361 frontend: &Frontend,
362 expected: std::result::Result<&str, (StatusCode, &str)>,
363 ) {
364 let addr = frontend.server_handlers().addr(GRPC_SERVER).unwrap();
365 let client = Client::with_urls([addr.to_string()]);
366 let client = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
367 let response = client.sql("SELECT 1").await;
368
369 match (response, expected) {
370 (Ok(response), Ok(expected)) => {
371 let response = response.data.pretty_print().await;
372 assert_eq!(&response, expected.trim());
373 }
374 (Err(actual), Err(expected)) => {
375 assert_eq!(actual.status_code(), expected.0);
376 assert_eq!(actual.output_msg(), expected.1);
377 }
378 _ => unreachable!(),
379 }
380 }
381
382 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
383 async fn test_suspend_frontend() -> Result<()> {
384 common_telemetry::init_default_ut_logging();
385
386 let meta_client_options = MetaClientOptions {
387 metasrv_addrs: vec!["localhost:0".to_string()],
388 ..Default::default()
389 };
390 let options = FrontendOptions {
391 http: HttpOptions {
392 addr: "127.0.0.1:0".to_string(),
393 ..Default::default()
394 },
395 grpc: GrpcOptions {
396 bind_addr: "127.0.0.1:0".to_string(),
397 flight_compression: FlightCompression::None,
398 ..Default::default()
399 },
400 mysql: MysqlOptions {
401 enable: false,
402 ..Default::default()
403 },
404 postgres: PostgresOptions {
405 enable: false,
406 ..Default::default()
407 },
408 meta_client: Some(meta_client_options.clone()),
409 heartbeat: HeartbeatOptions {
410 interval: Duration::from_secs(1),
411 ..Default::default()
412 },
413 ..Default::default()
414 };
415
416 let server = Arc::new(SuspendableHeartbeatServer {
417 suspend: Arc::new(AtomicBool::new(false)),
418 });
419 let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
420 let frontend = create_frontend(&options, meta_client).await?;
421
422 let frontend_heartbeat_interval = options.heartbeat.interval;
423 tokio::time::sleep(frontend_heartbeat_interval).await;
424 assert!(!frontend.instance.is_suspended());
426 verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
427 verify_suspend_state_by_grpc(
428 &frontend,
429 Ok(r#"
430+----------+
431| Int64(1) |
432+----------+
433| 1 |
434+----------+"#),
435 )
436 .await;
437
438 server.suspend.store(true, Ordering::Relaxed);
440 tokio::time::sleep(frontend_heartbeat_interval).await;
441 assert!(frontend.instance.is_suspended());
443 verify_suspend_state_by_http(
444 &frontend,
445 Err((
446 StatusCode::Suspended,
447 "error: Service suspended, execution_time_ms: 0",
448 )),
449 )
450 .await;
451 verify_suspend_state_by_grpc(&frontend, Err((StatusCode::Suspended, "Service suspended")))
452 .await;
453
454 server.suspend.store(false, Ordering::Relaxed);
456 tokio::time::sleep(frontend_heartbeat_interval).await;
457 assert!(!frontend.instance.is_suspended());
459 verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
460 verify_suspend_state_by_grpc(
461 &frontend,
462 Ok(r#"
463+----------+
464| Int64(1) |
465+----------+
466| 1 |
467+----------+"#),
468 )
469 .await;
470 Ok(())
471 }
472}