1use std::sync::Arc;
16
17use common_base::readable_size::ReadableSize;
18use common_config::config::Configurable;
19use common_event_recorder::EventRecorderOptions;
20use common_options::datanode::DatanodeClientOptions;
21use common_options::memory::MemoryOptions;
22use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
23use meta_client::MetaClientOptions;
24use query::options::QueryOptions;
25use serde::{Deserialize, Serialize};
26use servers::grpc::GrpcOptions;
27use servers::heartbeat_options::HeartbeatOptions;
28use servers::http::HttpOptions;
29use servers::server::ServerHandlers;
30use snafu::ResultExt;
31
32use crate::error;
33use crate::error::Result;
34use crate::heartbeat::HeartbeatTask;
35use crate::instance::Instance;
36use crate::service_config::{
37 InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions,
38 PromStoreOptions,
39};
40
41#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
42#[serde(default)]
43pub struct FrontendOptions {
44 pub node_id: Option<String>,
45 pub default_timezone: Option<String>,
46 pub default_column_prefix: Option<String>,
47 pub heartbeat: HeartbeatOptions,
48 pub http: HttpOptions,
49 pub grpc: GrpcOptions,
50 pub internal_grpc: Option<GrpcOptions>,
53 pub mysql: MysqlOptions,
54 pub postgres: PostgresOptions,
55 pub opentsdb: OpentsdbOptions,
56 pub influxdb: InfluxdbOptions,
57 pub prom_store: PromStoreOptions,
58 pub jaeger: JaegerOptions,
59 pub otlp: OtlpOptions,
60 pub meta_client: Option<MetaClientOptions>,
61 pub logging: LoggingOptions,
62 pub datanode: DatanodeClientOptions,
63 pub user_provider: Option<String>,
64 pub tracing: TracingOptions,
65 pub query: QueryOptions,
66 pub max_in_flight_write_bytes: Option<ReadableSize>,
67 pub slow_query: SlowQueryOptions,
68 pub memory: MemoryOptions,
69 pub event_recorder: EventRecorderOptions,
71}
72
73impl Default for FrontendOptions {
74 fn default() -> Self {
75 Self {
76 node_id: None,
77 default_timezone: None,
78 default_column_prefix: None,
79 heartbeat: HeartbeatOptions::frontend_default(),
80 http: HttpOptions::default(),
81 grpc: GrpcOptions::default(),
82 internal_grpc: None,
83 mysql: MysqlOptions::default(),
84 postgres: PostgresOptions::default(),
85 opentsdb: OpentsdbOptions::default(),
86 influxdb: InfluxdbOptions::default(),
87 jaeger: JaegerOptions::default(),
88 prom_store: PromStoreOptions::default(),
89 otlp: OtlpOptions::default(),
90 meta_client: None,
91 logging: LoggingOptions::default(),
92 datanode: DatanodeClientOptions::default(),
93 user_provider: None,
94 tracing: TracingOptions::default(),
95 query: QueryOptions::default(),
96 max_in_flight_write_bytes: None,
97 slow_query: SlowQueryOptions::default(),
98 memory: MemoryOptions::default(),
99 event_recorder: EventRecorderOptions::default(),
100 }
101 }
102}
103
104impl Configurable for FrontendOptions {
105 fn env_list_keys() -> Option<&'static [&'static str]> {
106 Some(&["meta_client.metasrv_addrs"])
107 }
108}
109
110pub struct Frontend {
113 pub instance: Arc<Instance>,
114 pub servers: ServerHandlers,
115 pub heartbeat_task: Option<HeartbeatTask>,
116}
117
118impl Frontend {
119 pub async fn start(&mut self) -> Result<()> {
120 if let Some(t) = &self.heartbeat_task {
121 t.start().await?;
122 }
123
124 self.servers
125 .start_all()
126 .await
127 .context(error::StartServerSnafu)
128 }
129
130 pub async fn shutdown(&mut self) -> Result<()> {
131 self.servers
132 .shutdown_all()
133 .await
134 .context(error::ShutdownServerSnafu)
135 }
136
137 pub fn server_handlers(&self) -> &ServerHandlers {
138 &self.servers
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use std::sync::atomic::{AtomicBool, Ordering};
145 use std::time::Duration;
146
147 use api::v1::meta::heartbeat_server::HeartbeatServer;
148 use api::v1::meta::mailbox_message::Payload;
149 use api::v1::meta::{
150 AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, MailboxMessage,
151 Peer, ResponseHeader, Role, heartbeat_server,
152 };
153 use async_trait::async_trait;
154 use client::{Client, Database};
155 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
156 use common_error::ext::ErrorExt;
157 use common_error::from_header_to_err_code_msg;
158 use common_error::status_code::StatusCode;
159 use common_grpc::channel_manager::ChannelManager;
160 use common_meta::heartbeat::handler::HandlerGroupExecutor;
161 use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
162 use common_meta::heartbeat::handler::suspend::SuspendHandler;
163 use common_meta::instruction::Instruction;
164 use common_stat::ResourceStatImpl;
165 use meta_client::MetaClientRef;
166 use meta_client::client::MetaClientBuilder;
167 use meta_srv::service::GrpcStream;
168 use servers::grpc::{FlightCompression, GRPC_SERVER};
169 use servers::http::HTTP_SERVER;
170 use servers::http::result::greptime_result_v1::GreptimedbV1Response;
171 use tokio::sync::mpsc;
172 use tonic::codec::CompressionEncoding;
173 use tonic::codegen::tokio_stream::StreamExt;
174 use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
175 use tonic::{Request, Response, Status, Streaming};
176
177 use super::*;
178 use crate::instance::builder::FrontendBuilder;
179 use crate::server::Services;
180
181 #[test]
182 fn test_toml() {
183 let opts = FrontendOptions::default();
184 let toml_string = toml::to_string(&opts).unwrap();
185 let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap();
186 }
187
188 struct SuspendableHeartbeatServer {
189 suspend: Arc<AtomicBool>,
190 }
191
192 #[async_trait]
193 impl heartbeat_server::Heartbeat for SuspendableHeartbeatServer {
194 type HeartbeatStream = GrpcStream<HeartbeatResponse>;
195
196 async fn heartbeat(
197 &self,
198 request: Request<Streaming<HeartbeatRequest>>,
199 ) -> std::result::Result<Response<Self::HeartbeatStream>, Status> {
200 let (tx, rx) = mpsc::channel(4);
201
202 common_runtime::spawn_global({
203 let mut requests = request.into_inner();
204 let suspend = self.suspend.clone();
205 async move {
206 while let Some(request) = requests.next().await {
207 if let Err(e) = request {
208 let _ = tx.send(Err(e)).await;
209 return;
210 }
211
212 let mailbox_message =
213 suspend.load(Ordering::Relaxed).then(|| MailboxMessage {
214 payload: Some(Payload::Json(
215 serde_json::to_string(&Instruction::Suspend).unwrap(),
216 )),
217 ..Default::default()
218 });
219 let response = HeartbeatResponse {
220 header: Some(ResponseHeader::success()),
221 mailbox_message,
222 ..Default::default()
223 };
224
225 let _ = tx.send(Ok(response)).await;
226 }
227 }
228 });
229
230 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
231 }
232
233 async fn ask_leader(
234 &self,
235 _: Request<AskLeaderRequest>,
236 ) -> std::result::Result<Response<AskLeaderResponse>, Status> {
237 Ok(Response::new(AskLeaderResponse {
238 header: Some(ResponseHeader::success()),
239 leader: Some(Peer {
240 addr: "localhost:0".to_string(),
241 ..Default::default()
242 }),
243 }))
244 }
245 }
246
247 async fn create_meta_client(
248 options: &MetaClientOptions,
249 heartbeat_server: Arc<SuspendableHeartbeatServer>,
250 ) -> MetaClientRef {
251 let (client, server) = tokio::io::duplex(1024);
252
253 common_runtime::spawn_global(async move {
255 let mut router = tonic::transport::Server::builder();
256 let router = router.add_service(
257 HeartbeatServer::from_arc(heartbeat_server)
258 .accept_compressed(CompressionEncoding::Zstd)
259 .send_compressed(CompressionEncoding::Zstd),
260 );
261 router
262 .serve_with_incoming(futures::stream::iter([Ok::<_, std::io::Error>(server)]))
263 .await
264 });
265
266 let mut client = Some(client);
269 let connector = tower::service_fn(move |_| {
270 let client = client.take();
271 async move {
272 if let Some(client) = client {
273 Ok(hyper_util::rt::TokioIo::new(client))
274 } else {
275 Err(std::io::Error::other("client already taken"))
276 }
277 }
278 });
279 let manager = ChannelManager::new();
280 manager
281 .reset_with_connector("localhost:0", connector)
282 .unwrap();
283
284 let mut client = MetaClientBuilder::new(0, Role::Frontend)
286 .enable_heartbeat()
287 .heartbeat_channel_manager(manager)
288 .build();
289 client.start(&options.metasrv_addrs).await.unwrap();
290 Arc::new(client)
291 }
292
293 async fn create_frontend(
294 options: &FrontendOptions,
295 meta_client: MetaClientRef,
296 ) -> Result<Frontend> {
297 let instance = Arc::new(
298 FrontendBuilder::new_test(options, meta_client.clone())
299 .try_build()
300 .await?,
301 );
302
303 let servers =
304 Services::new(options.clone(), instance.clone(), Default::default()).build()?;
305
306 let executor = Arc::new(HandlerGroupExecutor::new(vec![
307 Arc::new(ParseMailboxMessageHandler),
308 Arc::new(SuspendHandler::new(instance.suspend_state())),
309 ]));
310 let heartbeat_task = Some(HeartbeatTask::new(
311 options,
312 meta_client,
313 executor,
314 Arc::new(ResourceStatImpl::default()),
315 ));
316
317 let mut frontend = Frontend {
318 instance,
319 servers,
320 heartbeat_task,
321 };
322 frontend.start().await?;
323 Ok(frontend)
324 }
325
326 async fn verify_suspend_state_by_http(
327 frontend: &Frontend,
328 expected: std::result::Result<&str, (StatusCode, &str)>,
329 ) {
330 let addr = frontend.server_handlers().addr(HTTP_SERVER).unwrap();
331 let response = reqwest::get(format!("http://{}/v1/sql?sql=SELECT 1", addr))
332 .await
333 .unwrap();
334
335 let headers = response.headers();
336 let response = if let Some((code, error)) = from_header_to_err_code_msg(headers) {
337 Err((code, error))
338 } else {
339 Ok(response.text().await.unwrap())
340 };
341
342 match (response, expected) {
343 (Ok(response), Ok(expected)) => {
344 let response: GreptimedbV1Response = serde_json::from_str(&response).unwrap();
345 let response = serde_json::to_string(response.output()).unwrap();
346 assert_eq!(&response, expected);
347 }
348 (Err(actual), Err(expected)) => assert_eq!(actual, expected),
349 _ => unreachable!(),
350 }
351 }
352
353 async fn verify_suspend_state_by_grpc(
354 frontend: &Frontend,
355 expected: std::result::Result<&str, (StatusCode, &str)>,
356 ) {
357 let addr = frontend.server_handlers().addr(GRPC_SERVER).unwrap();
358 let client = Client::with_urls([addr.to_string()]);
359 let client = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
360 let response = client.sql("SELECT 1").await;
361
362 match (response, expected) {
363 (Ok(response), Ok(expected)) => {
364 let response = response.data.pretty_print().await;
365 assert_eq!(&response, expected.trim());
366 }
367 (Err(actual), Err(expected)) => {
368 assert_eq!(actual.status_code(), expected.0);
369 assert_eq!(actual.output_msg(), expected.1);
370 }
371 _ => unreachable!(),
372 }
373 }
374
375 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
376 async fn test_suspend_frontend() -> Result<()> {
377 common_telemetry::init_default_ut_logging();
378
379 let meta_client_options = MetaClientOptions {
380 metasrv_addrs: vec!["localhost:0".to_string()],
381 ..Default::default()
382 };
383 let options = FrontendOptions {
384 http: HttpOptions {
385 addr: "127.0.0.1:0".to_string(),
386 ..Default::default()
387 },
388 grpc: GrpcOptions {
389 bind_addr: "127.0.0.1:0".to_string(),
390 flight_compression: FlightCompression::None,
391 ..Default::default()
392 },
393 mysql: MysqlOptions {
394 enable: false,
395 ..Default::default()
396 },
397 postgres: PostgresOptions {
398 enable: false,
399 ..Default::default()
400 },
401 meta_client: Some(meta_client_options.clone()),
402 heartbeat: HeartbeatOptions {
403 interval: Duration::from_secs(1),
404 ..Default::default()
405 },
406 ..Default::default()
407 };
408
409 let server = Arc::new(SuspendableHeartbeatServer {
410 suspend: Arc::new(AtomicBool::new(false)),
411 });
412 let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
413 let frontend = create_frontend(&options, meta_client).await?;
414
415 let frontend_heartbeat_interval = options.heartbeat.interval;
416 tokio::time::sleep(frontend_heartbeat_interval).await;
417 assert!(!frontend.instance.is_suspended());
419 verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
420 verify_suspend_state_by_grpc(
421 &frontend,
422 Ok(r#"
423+----------+
424| Int64(1) |
425+----------+
426| 1 |
427+----------+"#),
428 )
429 .await;
430
431 server.suspend.store(true, Ordering::Relaxed);
433 tokio::time::sleep(frontend_heartbeat_interval).await;
434 assert!(frontend.instance.is_suspended());
436 verify_suspend_state_by_http(
437 &frontend,
438 Err((
439 StatusCode::Suspended,
440 "error: Service suspended, execution_time_ms: 0",
441 )),
442 )
443 .await;
444 verify_suspend_state_by_grpc(&frontend, Err((StatusCode::Suspended, "Service suspended")))
445 .await;
446
447 server.suspend.store(false, Ordering::Relaxed);
449 tokio::time::sleep(frontend_heartbeat_interval).await;
450 assert!(!frontend.instance.is_suspended());
452 verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
453 verify_suspend_state_by_grpc(
454 &frontend,
455 Ok(r#"
456+----------+
457| Int64(1) |
458+----------+
459| 1 |
460+----------+"#),
461 )
462 .await;
463 Ok(())
464 }
465}