1use std::sync::Arc;
16
17use common_base::readable_size::ReadableSize;
18use common_config::config::Configurable;
19use common_event_recorder::EventRecorderOptions;
20use common_memory_manager::OnExhaustedPolicy;
21use common_options::datanode::DatanodeClientOptions;
22use common_options::memory::MemoryOptions;
23use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
24use meta_client::MetaClientOptions;
25use query::options::QueryOptions;
26use serde::{Deserialize, Serialize};
27use servers::grpc::GrpcOptions;
28use servers::http::HttpOptions;
29use servers::server::ServerHandlers;
30use snafu::ResultExt;
31
32use crate::error;
33use crate::error::Result;
34use crate::heartbeat::HeartbeatTask;
35use crate::instance::Instance;
36use crate::service_config::{
37 InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions,
38 PromStoreOptions,
39};
40
41#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
42#[serde(default)]
43pub struct FrontendOptions {
44 pub node_id: Option<String>,
45 pub default_timezone: Option<String>,
46 pub default_column_prefix: Option<String>,
47 pub max_in_flight_write_bytes: ReadableSize,
50 pub write_bytes_exhausted_policy: OnExhaustedPolicy,
53 pub http: HttpOptions,
54 pub grpc: GrpcOptions,
55 pub internal_grpc: Option<GrpcOptions>,
58 pub mysql: MysqlOptions,
59 pub postgres: PostgresOptions,
60 pub opentsdb: OpentsdbOptions,
61 pub influxdb: InfluxdbOptions,
62 pub prom_store: PromStoreOptions,
63 pub jaeger: JaegerOptions,
64 pub otlp: OtlpOptions,
65 pub meta_client: Option<MetaClientOptions>,
66 pub logging: LoggingOptions,
67 pub datanode: DatanodeClientOptions,
68 pub user_provider: Option<String>,
69 pub tracing: TracingOptions,
70 pub query: QueryOptions,
71 pub slow_query: SlowQueryOptions,
72 pub memory: MemoryOptions,
73 pub event_recorder: EventRecorderOptions,
75}
76
77impl Default for FrontendOptions {
78 fn default() -> Self {
79 Self {
80 node_id: None,
81 default_timezone: None,
82 default_column_prefix: None,
83 max_in_flight_write_bytes: ReadableSize(0),
84 write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
85 http: HttpOptions::default(),
86 grpc: GrpcOptions::default(),
87 internal_grpc: None,
88 mysql: MysqlOptions::default(),
89 postgres: PostgresOptions::default(),
90 opentsdb: OpentsdbOptions::default(),
91 influxdb: InfluxdbOptions::default(),
92 jaeger: JaegerOptions::default(),
93 prom_store: PromStoreOptions::default(),
94 otlp: OtlpOptions::default(),
95 meta_client: None,
96 logging: LoggingOptions::default(),
97 datanode: DatanodeClientOptions::default(),
98 user_provider: None,
99 tracing: TracingOptions::default(),
100 query: QueryOptions::default(),
101 slow_query: SlowQueryOptions::default(),
102 memory: MemoryOptions::default(),
103 event_recorder: EventRecorderOptions::default(),
104 }
105 }
106}
107
108impl Configurable for FrontendOptions {
109 fn env_list_keys() -> Option<&'static [&'static str]> {
110 Some(&["meta_client.metasrv_addrs"])
111 }
112}
113
114pub struct Frontend {
117 pub instance: Arc<Instance>,
118 pub servers: ServerHandlers,
119 pub heartbeat_task: Option<HeartbeatTask>,
120}
121
122impl Frontend {
123 pub async fn start(&mut self) -> Result<()> {
124 if let Some(t) = &self.heartbeat_task {
125 t.start().await?;
126 }
127
128 self.servers
129 .start_all()
130 .await
131 .context(error::StartServerSnafu)
132 }
133
134 pub async fn shutdown(&mut self) -> Result<()> {
135 self.servers
136 .shutdown_all()
137 .await
138 .context(error::ShutdownServerSnafu)
139 }
140
141 pub fn server_handlers(&self) -> &ServerHandlers {
142 &self.servers
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use std::sync::atomic::{AtomicBool, Ordering};
149 use std::time::Duration;
150
151 use api::v1::meta::heartbeat_server::HeartbeatServer;
152 use api::v1::meta::mailbox_message::Payload;
153 use api::v1::meta::{
154 AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, MailboxMessage,
155 Peer, ResponseHeader, Role, heartbeat_server,
156 };
157 use async_trait::async_trait;
158 use client::{Client, Database};
159 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
160 use common_error::ext::ErrorExt;
161 use common_error::from_header_to_err_code_msg;
162 use common_error::status_code::StatusCode;
163 use common_grpc::channel_manager::ChannelManager;
164 use common_meta::heartbeat::handler::HandlerGroupExecutor;
165 use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
166 use common_meta::heartbeat::handler::suspend::SuspendHandler;
167 use common_meta::instruction::Instruction;
168 use common_stat::ResourceStatImpl;
169 use meta_client::MetaClientRef;
170 use meta_client::client::MetaClientBuilder;
171 use meta_srv::service::GrpcStream;
172 use servers::grpc::{FlightCompression, GRPC_SERVER};
173 use servers::http::HTTP_SERVER;
174 use servers::http::result::greptime_result_v1::GreptimedbV1Response;
175 use tokio::sync::mpsc;
176 use tonic::codec::CompressionEncoding;
177 use tonic::codegen::tokio_stream::StreamExt;
178 use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
179 use tonic::{Request, Response, Status, Streaming};
180
181 use super::*;
182 use crate::instance::builder::FrontendBuilder;
183 use crate::server::Services;
184
185 #[test]
186 fn test_toml() {
187 let opts = FrontendOptions::default();
188 let toml_string = toml::to_string(&opts).unwrap();
189 let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap();
190 }
191
192 struct SuspendableHeartbeatServer {
193 suspend: Arc<AtomicBool>,
194 }
195
196 #[async_trait]
197 impl heartbeat_server::Heartbeat for SuspendableHeartbeatServer {
198 type HeartbeatStream = GrpcStream<HeartbeatResponse>;
199
200 async fn heartbeat(
201 &self,
202 request: Request<Streaming<HeartbeatRequest>>,
203 ) -> std::result::Result<Response<Self::HeartbeatStream>, Status> {
204 let (tx, rx) = mpsc::channel(4);
205
206 common_runtime::spawn_global({
207 let mut requests = request.into_inner();
208 let suspend = self.suspend.clone();
209 async move {
210 while let Some(request) = requests.next().await {
211 if let Err(e) = request {
212 let _ = tx.send(Err(e)).await;
213 return;
214 }
215
216 let mailbox_message =
217 suspend.load(Ordering::Relaxed).then(|| MailboxMessage {
218 payload: Some(Payload::Json(
219 serde_json::to_string(&Instruction::Suspend).unwrap(),
220 )),
221 ..Default::default()
222 });
223 let response = HeartbeatResponse {
224 header: Some(ResponseHeader::success()),
225 mailbox_message,
226 ..Default::default()
227 };
228
229 let _ = tx.send(Ok(response)).await;
230 }
231 }
232 });
233
234 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
235 }
236
237 async fn ask_leader(
238 &self,
239 _: Request<AskLeaderRequest>,
240 ) -> std::result::Result<Response<AskLeaderResponse>, Status> {
241 Ok(Response::new(AskLeaderResponse {
242 header: Some(ResponseHeader::success()),
243 leader: Some(Peer {
244 addr: "localhost:0".to_string(),
245 ..Default::default()
246 }),
247 }))
248 }
249 }
250
251 async fn create_meta_client(
252 options: &MetaClientOptions,
253 heartbeat_server: Arc<SuspendableHeartbeatServer>,
254 ) -> MetaClientRef {
255 let (client, server) = tokio::io::duplex(1024);
256
257 common_runtime::spawn_global(async move {
259 let mut router = tonic::transport::Server::builder();
260 let router = router.add_service(
261 HeartbeatServer::from_arc(heartbeat_server)
262 .accept_compressed(CompressionEncoding::Zstd)
263 .send_compressed(CompressionEncoding::Zstd),
264 );
265 router
266 .serve_with_incoming(futures::stream::iter([Ok::<_, std::io::Error>(server)]))
267 .await
268 });
269
270 let mut client = Some(client);
273 let connector = tower::service_fn(move |_| {
274 let client = client.take();
275 async move {
276 if let Some(client) = client {
277 Ok(hyper_util::rt::TokioIo::new(client))
278 } else {
279 Err(std::io::Error::other("client already taken"))
280 }
281 }
282 });
283 let manager = ChannelManager::new();
284 manager
285 .reset_with_connector("localhost:0", connector)
286 .unwrap();
287
288 let mut client = MetaClientBuilder::new(0, Role::Frontend)
290 .enable_heartbeat()
291 .heartbeat_channel_manager(manager)
292 .build();
293 client.start(&options.metasrv_addrs).await.unwrap();
294 Arc::new(client)
295 }
296
297 async fn create_frontend(
298 options: &FrontendOptions,
299 meta_client: MetaClientRef,
300 ) -> Result<Frontend> {
301 let instance = Arc::new(
302 FrontendBuilder::new_test(options, meta_client.clone())
303 .try_build()
304 .await?,
305 );
306
307 let servers =
308 Services::new(options.clone(), instance.clone(), Default::default()).build()?;
309
310 let executor = Arc::new(HandlerGroupExecutor::new(vec![
311 Arc::new(ParseMailboxMessageHandler),
312 Arc::new(SuspendHandler::new(instance.suspend_state())),
313 ]));
314 let heartbeat_task = Some(HeartbeatTask::new(
315 options,
316 meta_client,
317 executor,
318 Arc::new(ResourceStatImpl::default()),
319 ));
320
321 let mut frontend = Frontend {
322 instance,
323 servers,
324 heartbeat_task,
325 };
326 frontend.start().await?;
327 Ok(frontend)
328 }
329
330 async fn verify_suspend_state_by_http(
331 frontend: &Frontend,
332 expected: std::result::Result<&str, (StatusCode, &str)>,
333 ) {
334 let addr = frontend.server_handlers().addr(HTTP_SERVER).unwrap();
335 let response = reqwest::get(format!("http://{}/v1/sql?sql=SELECT 1", addr))
336 .await
337 .unwrap();
338
339 let headers = response.headers();
340 let response = if let Some((code, error)) = from_header_to_err_code_msg(headers) {
341 Err((code, error))
342 } else {
343 Ok(response.text().await.unwrap())
344 };
345
346 match (response, expected) {
347 (Ok(response), Ok(expected)) => {
348 let response: GreptimedbV1Response = serde_json::from_str(&response).unwrap();
349 let response = serde_json::to_string(response.output()).unwrap();
350 assert_eq!(&response, expected);
351 }
352 (Err(actual), Err(expected)) => assert_eq!(actual, expected),
353 _ => unreachable!(),
354 }
355 }
356
357 async fn verify_suspend_state_by_grpc(
358 frontend: &Frontend,
359 expected: std::result::Result<&str, (StatusCode, &str)>,
360 ) {
361 let addr = frontend.server_handlers().addr(GRPC_SERVER).unwrap();
362 let client = Client::with_urls([addr.to_string()]);
363 let client = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
364 let response = client.sql("SELECT 1").await;
365
366 match (response, expected) {
367 (Ok(response), Ok(expected)) => {
368 let response = response.data.pretty_print().await;
369 assert_eq!(&response, expected.trim());
370 }
371 (Err(actual), Err(expected)) => {
372 assert_eq!(actual.status_code(), expected.0);
373 assert_eq!(actual.output_msg(), expected.1);
374 }
375 _ => unreachable!(),
376 }
377 }
378
379 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
380 async fn test_suspend_frontend() -> Result<()> {
381 common_telemetry::init_default_ut_logging();
382
383 let meta_client_options = MetaClientOptions {
384 metasrv_addrs: vec!["localhost:0".to_string()],
385 ..Default::default()
386 };
387 let options = FrontendOptions {
388 http: HttpOptions {
389 addr: "127.0.0.1:0".to_string(),
390 ..Default::default()
391 },
392 grpc: GrpcOptions {
393 bind_addr: "127.0.0.1:0".to_string(),
394 flight_compression: FlightCompression::None,
395 ..Default::default()
396 },
397 mysql: MysqlOptions {
398 enable: false,
399 ..Default::default()
400 },
401 postgres: PostgresOptions {
402 enable: false,
403 ..Default::default()
404 },
405 meta_client: Some(meta_client_options.clone()),
406 ..Default::default()
407 };
408
409 let server = Arc::new(SuspendableHeartbeatServer {
410 suspend: Arc::new(AtomicBool::new(false)),
411 });
412 let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
413 let frontend = create_frontend(&options, meta_client).await?;
414
415 use common_meta::distributed_time_constants::{
416 BASE_HEARTBEAT_INTERVAL, frontend_heartbeat_interval,
417 };
418 let frontend_heartbeat_interval =
419 frontend_heartbeat_interval(BASE_HEARTBEAT_INTERVAL) + Duration::from_secs(1);
420 tokio::time::sleep(frontend_heartbeat_interval).await;
421 assert!(!frontend.instance.is_suspended());
423 verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
424 verify_suspend_state_by_grpc(
425 &frontend,
426 Ok(r#"
427+----------+
428| Int64(1) |
429+----------+
430| 1 |
431+----------+"#),
432 )
433 .await;
434
435 server.suspend.store(true, Ordering::Relaxed);
437 tokio::time::sleep(frontend_heartbeat_interval).await;
438 assert!(frontend.instance.is_suspended());
440 verify_suspend_state_by_http(
441 &frontend,
442 Err((
443 StatusCode::Suspended,
444 "error: Service suspended, execution_time_ms: 0",
445 )),
446 )
447 .await;
448 verify_suspend_state_by_grpc(&frontend, Err((StatusCode::Suspended, "Service suspended")))
449 .await;
450
451 server.suspend.store(false, Ordering::Relaxed);
453 tokio::time::sleep(frontend_heartbeat_interval).await;
454 assert!(!frontend.instance.is_suspended());
456 verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
457 verify_suspend_state_by_grpc(
458 &frontend,
459 Ok(r#"
460+----------+
461| Int64(1) |
462+----------+
463| 1 |
464+----------+"#),
465 )
466 .await;
467 Ok(())
468 }
469}