1pub(crate) mod health;
16pub(crate) mod heartbeat;
17pub(crate) mod leader;
18pub(crate) mod maintenance;
19pub(crate) mod node_lease;
20pub(crate) mod procedure;
21pub(crate) mod recovery;
22pub(crate) mod sequencer;
23mod util;
24
25use std::collections::HashMap;
26use std::convert::Infallible;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use axum::{Router as AxumRouter, routing};
31use tonic::body::Body;
32use tonic::codegen::{BoxFuture, Service, http};
33use tonic::server::NamedService;
34
35use crate::metasrv::Metasrv;
36use crate::service::admin::heartbeat::HeartBeatHandler;
37use crate::service::admin::leader::LeaderHandler;
38use crate::service::admin::maintenance::MaintenanceHandler;
39use crate::service::admin::node_lease::NodeLeaseHandler;
40use crate::service::admin::procedure::ProcedureManagerHandler;
41use crate::service::admin::recovery::RecoveryHandler;
42use crate::service::admin::sequencer::TableIdSequenceHandler;
43
44pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
51 let router = Router::new().route("/health", health::HealthHandler);
52
53 let router = router.route(
54 "/node-lease",
55 node_lease::NodeLeaseHandler {
56 meta_peer_client: metasrv.meta_peer_client().clone(),
57 },
58 );
59
60 let handler = heartbeat::HeartBeatHandler {
61 meta_peer_client: metasrv.meta_peer_client().clone(),
62 };
63 let router = router
64 .route("/heartbeat", handler.clone())
65 .route("/heartbeat/help", handler);
66
67 let router = router.route(
68 "/leader",
69 leader::LeaderHandler {
70 election: metasrv.election().cloned(),
71 },
72 );
73
74 let router = router.routes(
75 &[
76 "/maintenance",
77 "/maintenance/status",
78 "/maintenance/enable",
79 "/maintenance/disable",
80 ],
81 maintenance::MaintenanceHandler {
82 manager: metasrv.runtime_switch_manager().clone(),
83 },
84 );
85 let router = router.routes(
86 &[
87 "/procedure-manager/pause",
88 "/procedure-manager/resume",
89 "/procedure-manager/status",
90 ],
91 procedure::ProcedureManagerHandler {
92 manager: metasrv.runtime_switch_manager().clone(),
93 },
94 );
95 let router = Router::nest("/admin", router);
96
97 Admin::new(router)
98}
99
100#[async_trait::async_trait]
101pub trait HttpHandler: Send + Sync {
102 async fn handle(
103 &self,
104 path: &str,
105 method: http::Method,
106 params: &HashMap<String, String>,
107 ) -> crate::Result<http::Response<String>>;
108}
109
110#[derive(Clone)]
111pub struct Admin
112where
113 Self: Send,
114{
115 router: Arc<Router>,
116}
117
118impl Admin {
119 pub fn new(router: Router) -> Self {
120 Self {
121 router: Arc::new(router),
122 }
123 }
124}
125
126impl NamedService for Admin {
127 const NAME: &'static str = "admin";
128}
129
130impl Service<http::Request<Body>> for Admin {
131 type Response = http::Response<Body>;
132 type Error = Infallible;
133 type Future = BoxFuture<Self::Response, Self::Error>;
134
135 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
136 Poll::Ready(Ok(()))
137 }
138
139 fn call(&mut self, req: http::Request<Body>) -> Self::Future {
140 let router = self.router.clone();
141 let query_params = req
142 .uri()
143 .query()
144 .map(|q| {
145 url::form_urlencoded::parse(q.as_bytes())
146 .into_owned()
147 .collect()
148 })
149 .unwrap_or_default();
150 let path = req.uri().path().to_owned();
151 let method = req.method().clone();
152 Box::pin(async move { router.call(&path, method, query_params).await })
153 }
154}
155
156#[derive(Default)]
157pub struct Router {
158 handlers: HashMap<String, Arc<dyn HttpHandler>>,
159}
160
161impl Router {
162 pub fn new() -> Self {
163 Self {
164 handlers: HashMap::default(),
165 }
166 }
167
168 pub fn nest(path: &str, router: Router) -> Self {
169 check_path(path);
170
171 let handlers = router
172 .handlers
173 .into_iter()
174 .map(|(url, handler)| (format!("{path}{url}"), handler))
175 .collect();
176
177 Self { handlers }
178 }
179
180 pub fn route(mut self, path: &str, handler: impl HttpHandler + 'static) -> Self {
181 check_path(path);
182
183 let _ = self.handlers.insert(path.to_owned(), Arc::new(handler));
184
185 self
186 }
187
188 pub fn routes(mut self, paths: &[&str], handler: impl HttpHandler + 'static) -> Self {
189 let handler = Arc::new(handler);
190 for path in paths {
191 check_path(path);
192 let _ = self.handlers.insert(path.to_string(), handler.clone());
193 }
194
195 self
196 }
197
198 pub async fn call(
199 &self,
200 path: &str,
201 method: http::Method,
202 params: HashMap<String, String>,
203 ) -> Result<http::Response<Body>, Infallible> {
204 let handler = match self.handlers.get(path) {
205 Some(handler) => handler,
206 None => {
207 return Ok(http::Response::builder()
208 .status(http::StatusCode::NOT_FOUND)
209 .body(Body::empty())
210 .unwrap());
211 }
212 };
213
214 let res = match handler.handle(path, method, ¶ms).await {
215 Ok(res) => res.map(Body::new),
216 Err(e) => http::Response::builder()
217 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
218 .body(Body::new(e.to_string()))
219 .unwrap(),
220 };
221
222 Ok(res)
223 }
224}
225
226fn check_path(path: &str) {
227 if path.is_empty() || !path.starts_with('/') {
228 panic!("paths must start with a `/`")
229 }
230}
231
232pub fn admin_axum_router(metasrv: Arc<Metasrv>) -> AxumRouter {
234 let node_lease_handler = NodeLeaseHandler {
235 meta_peer_client: metasrv.meta_peer_client().clone(),
236 };
237 let heartbeat_handler = HeartBeatHandler {
238 meta_peer_client: metasrv.meta_peer_client().clone(),
239 };
240 let leader_handler = LeaderHandler {
241 election: metasrv.election().cloned(),
242 };
243 let maintenance_handler = MaintenanceHandler {
244 manager: metasrv.runtime_switch_manager().clone(),
245 };
246 let procedure_handler = ProcedureManagerHandler {
247 manager: metasrv.runtime_switch_manager().clone(),
248 };
249 let recovery_handler = RecoveryHandler {
250 manager: metasrv.runtime_switch_manager().clone(),
251 };
252 let table_id_sequence_handler = TableIdSequenceHandler {
253 table_id_sequence: metasrv.table_id_sequence().clone(),
254 runtime_switch_manager: metasrv.runtime_switch_manager().clone(),
255 };
256
257 let admin_router = AxumRouter::new()
258 .route("/health", routing::get(health::health))
259 .route(
260 "/node-lease",
261 routing::get(node_lease::get).with_state(node_lease_handler),
262 )
263 .route(
264 "/leader",
265 routing::get(leader::get).with_state(leader_handler),
266 )
267 .nest(
268 "/heartbeat",
269 AxumRouter::new()
270 .route("/", routing::get(heartbeat::get))
271 .route("/help", routing::get(heartbeat::help))
272 .with_state(heartbeat_handler),
273 )
274 .nest(
275 "/maintenance",
276 AxumRouter::new()
277 .route("/", routing::get(maintenance::status))
278 .route("/status", routing::get(maintenance::status))
279 .route("/enable", routing::post(maintenance::set))
280 .route("/disable", routing::post(maintenance::unset))
281 .with_state(maintenance_handler),
282 )
283 .nest(
284 "/procedure-manager",
285 AxumRouter::new()
286 .route("/status", routing::get(procedure::status))
287 .route("/pause", routing::post(procedure::pause))
288 .route("/resume", routing::post(procedure::resume))
289 .with_state(procedure_handler),
290 )
291 .nest(
292 "/recovery",
293 AxumRouter::new()
294 .route("/status", routing::get(recovery::status))
295 .route("/enable", routing::post(recovery::set))
296 .route("/disable", routing::post(recovery::unset))
297 .with_state(recovery_handler),
298 )
299 .nest(
300 "/sequence",
301 AxumRouter::new().nest(
302 "/table",
303 AxumRouter::new()
304 .route("/next-id", routing::get(sequencer::get_next_table_id))
305 .route("/set-next-id", routing::post(sequencer::set_next_table_id))
306 .with_state(table_id_sequence_handler.clone()),
307 ),
308 );
309
310 AxumRouter::new().nest("/admin", admin_router)
311}
312
313#[cfg(test)]
314mod tests {
315 use common_meta::kv_backend::KvBackendRef;
316 use common_meta::kv_backend::memory::MemoryKvBackend;
317 use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
318
319 use super::*;
320 use crate::metasrv::MetasrvOptions;
321 use crate::metasrv::builder::MetasrvBuilder;
322 use crate::{bootstrap, error};
323
324 struct MockOkHandler;
325
326 #[async_trait::async_trait]
327 impl HttpHandler for MockOkHandler {
328 async fn handle(
329 &self,
330 _: &str,
331 _: http::Method,
332 _: &HashMap<String, String>,
333 ) -> crate::Result<http::Response<String>> {
334 Ok(http::Response::builder()
335 .status(http::StatusCode::OK)
336 .body("Ok".to_string())
337 .unwrap())
338 }
339 }
340 struct MockEmptyKeyErrorHandler;
341
342 #[async_trait::async_trait]
343 impl HttpHandler for MockEmptyKeyErrorHandler {
344 async fn handle(
345 &self,
346 _: &str,
347 _: http::Method,
348 _: &HashMap<String, String>,
349 ) -> crate::Result<http::Response<String>> {
350 error::EmptyKeySnafu {}.fail()
351 }
352 }
353
354 #[test]
355 fn test_route_nest() {
356 let mock_handler = MockOkHandler {};
357 let router = Router::new().route("/test_node", mock_handler);
358 let router = Router::nest("/test_root", router);
359
360 assert_eq!(1, router.handlers.len());
361 assert!(router.handlers.contains_key("/test_root/test_node"));
362 }
363
364 #[should_panic]
365 #[test]
366 fn test_invalid_path() {
367 check_path("test_node")
368 }
369
370 #[should_panic]
371 #[test]
372 fn test_empty_path() {
373 check_path("")
374 }
375
376 #[tokio::test]
377 async fn test_route_call_ok() {
378 let mock_handler = MockOkHandler {};
379 let router = Router::new().route("/test_node", mock_handler);
380 let router = Router::nest("/test_root", router);
381
382 let res = router
383 .call(
384 "/test_root/test_node",
385 http::Method::GET,
386 HashMap::default(),
387 )
388 .await
389 .unwrap();
390
391 assert!(res.status().is_success());
392 }
393
394 #[tokio::test]
395 async fn test_route_call_no_handler() {
396 let router = Router::new();
397
398 let res = router
399 .call(
400 "/test_root/test_node",
401 http::Method::GET,
402 HashMap::default(),
403 )
404 .await
405 .unwrap();
406
407 assert_eq!(http::StatusCode::NOT_FOUND, res.status());
408 }
409
410 #[tokio::test]
411 async fn test_route_call_err() {
412 let mock_handler = MockEmptyKeyErrorHandler {};
413 let router = Router::new().route("/test_node", mock_handler);
414 let router = Router::nest("/test_root", router);
415
416 let res = router
417 .call(
418 "/test_root/test_node",
419 http::Method::GET,
420 HashMap::default(),
421 )
422 .await
423 .unwrap();
424
425 assert_eq!(http::StatusCode::INTERNAL_SERVER_ERROR, res.status());
426 }
427
428 async fn test_metasrv(kv_backend: KvBackendRef) -> Metasrv {
429 let opts = MetasrvOptions::default();
430 let builder = MetasrvBuilder::new()
431 .options(opts)
432 .kv_backend(kv_backend.clone());
433
434 builder.build().await.unwrap()
435 }
436
437 async fn send_request(client: &mut DuplexStream, request: &[u8]) -> String {
438 client.write_all(request).await.unwrap();
439 let mut buf = vec![0; 1024];
440 let n = client.read(&mut buf).await.unwrap();
441 String::from_utf8_lossy(&buf[..n]).to_string()
442 }
443
444 #[tokio::test(flavor = "multi_thread")]
445 async fn test_metasrv_maintenance_mode() {
446 common_telemetry::init_default_ut_logging();
447 let kv_backend = Arc::new(MemoryKvBackend::new());
448 let metasrv = test_metasrv(kv_backend).await;
449 metasrv.try_start().await.unwrap();
450
451 let (mut client, server) = tokio::io::duplex(1024);
452 let metasrv = Arc::new(metasrv);
453 let service = metasrv.clone();
454 let _handle = tokio::spawn(async move {
455 let router = bootstrap::router(service);
456 router
457 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
458 .await
459 });
460
461 let response = send_request(
463 &mut client,
464 b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
465 )
466 .await;
467 assert!(response.contains(r#"{"enabled":false}"#));
468 assert!(response.contains("200 OK"));
469
470 let response = send_request(
472 &mut client,
473 b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
474 )
475 .await;
476 assert!(response.contains(r#"{"enabled":true}"#));
477 assert!(response.contains("200 OK"));
478
479 let enabled = metasrv
480 .runtime_switch_manager()
481 .maintenance_mode()
482 .await
483 .unwrap();
484 assert!(enabled);
485
486 let response = send_request(
488 &mut client,
489 b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
490 )
491 .await;
492 assert!(response.contains(r#"{"enabled":true}"#));
493 assert!(response.contains("200 OK"));
494
495 let response = send_request(
497 &mut client,
498 b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
499 )
500 .await;
501 assert!(response.contains(r#"{"enabled":false}"#));
502 assert!(response.contains("200 OK"));
503
504 let enabled = metasrv
505 .runtime_switch_manager()
506 .maintenance_mode()
507 .await
508 .unwrap();
509 assert!(!enabled);
510
511 let response = send_request(
513 &mut client,
514 b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n",
515 )
516 .await;
517 assert!(response.contains(r#"{"enabled":true}"#));
518 assert!(response.contains("200 OK"));
519
520 let response = send_request(
522 &mut client,
523 b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n",
524 )
525 .await;
526 assert!(response.contains(r#"{"enabled":false}"#));
527 assert!(response.contains("200 OK"));
528
529 let response = send_request(
531 &mut client,
532 b"GET /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
533 )
534 .await;
535 assert!(response.contains(r#"{"enabled":false}"#));
536
537 let response = send_request(
539 &mut client,
540 b"POST /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
541 )
542 .await;
543 assert!(response.contains(r#"{"enabled":true}"#));
544
545 let response = send_request(
547 &mut client,
548 b"POST /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
549 )
550 .await;
551 assert!(response.contains(r#"{"enabled":false}"#));
552
553 let response = send_request(
555 &mut client,
556 b"POST /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
557 )
558 .await;
559 assert!(response.contains("404 Not Found"));
560
561 let response = send_request(
563 &mut client,
564 b"GET /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
565 )
566 .await;
567 assert!(response.contains("404 Not Found"));
568
569 let response = send_request(
571 &mut client,
572 b"GET /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
573 )
574 .await;
575 assert!(response.contains("404 Not Found"));
576 }
577
578 #[tokio::test(flavor = "multi_thread")]
579 async fn test_metasrv_procedure_manager_handler() {
580 common_telemetry::init_default_ut_logging();
581 let kv_backend = Arc::new(MemoryKvBackend::new());
582 let metasrv = test_metasrv(kv_backend).await;
583 metasrv.try_start().await.unwrap();
584
585 let (mut client, server) = tokio::io::duplex(1024);
586 let metasrv = Arc::new(metasrv);
587 let service = metasrv.clone();
588 let _handle = tokio::spawn(async move {
589 let router = bootstrap::router(service);
590 router
591 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
592 .await
593 });
594
595 let response = send_request(
597 &mut client,
598 b"GET /admin/procedure-manager/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
599 )
600 .await;
601 assert!(response.contains("200 OK"));
602 assert!(
603 response.contains(r#"{"status":"running"}"#),
604 "response: {}",
605 response
606 );
607
608 let response = send_request(
610 &mut client,
611 b"POST /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
612 )
613 .await;
614 assert!(response.contains("200 OK"));
615 assert!(response.contains(r#"{"status":"paused"}"#));
616
617 let response = send_request(
619 &mut client,
620 b"POST /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
621 )
622 .await;
623 assert!(response.contains("200 OK"));
624 assert!(
625 response.contains(r#"{"status":"running"}"#),
626 "response: {}",
627 response
628 );
629
630 let response = send_request(
632 &mut client,
633 b"GET /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
634 )
635 .await;
636 assert!(response.contains("404 Not Found"));
637
638 let response = send_request(
640 &mut client,
641 b"GET /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
642 )
643 .await;
644 assert!(response.contains("404 Not Found"));
645 }
646}
647
648#[cfg(test)]
649mod axum_admin_tests {
650 use std::sync::Arc;
651
652 use axum::body::{Body, to_bytes};
653 use axum::http::{Method, Request, StatusCode};
654 use common_meta::kv_backend::memory::MemoryKvBackend;
655 use tower::ServiceExt; use super::*;
658 use crate::metasrv::MetasrvOptions;
659 use crate::metasrv::builder::MetasrvBuilder;
660 use crate::service::admin::sequencer::NextTableIdResponse;
661
662 async fn setup_axum_app() -> AxumRouter {
663 let kv_backend = Arc::new(MemoryKvBackend::new());
664 let metasrv = MetasrvBuilder::new()
665 .options(MetasrvOptions::default())
666 .kv_backend(kv_backend)
667 .build()
668 .await
669 .unwrap();
670 let metasrv = Arc::new(metasrv);
671 admin_axum_router(metasrv)
672 }
673
674 async fn get_body_string(resp: axum::response::Response) -> String {
675 let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
676 String::from_utf8_lossy(&body_bytes).to_string()
677 }
678
679 async fn into_bytes(resp: axum::response::Response) -> Vec<u8> {
680 let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
681 body_bytes.to_vec()
682 }
683
684 #[tokio::test]
685 async fn test_admin_health() {
686 let app = setup_axum_app().await;
687 let response = app
688 .oneshot(
689 Request::builder()
690 .uri("/admin/health")
691 .body(Body::empty())
692 .unwrap(),
693 )
694 .await
695 .unwrap();
696 assert_eq!(response.status(), StatusCode::OK);
697 let body = get_body_string(response).await;
698 assert!(body.to_lowercase().contains("ok"));
699 }
700
701 #[tokio::test]
702 async fn test_admin_node_lease() {
703 let app = setup_axum_app().await;
704 let response = app
705 .oneshot(
706 Request::builder()
707 .uri("/admin/node-lease")
708 .body(Body::empty())
709 .unwrap(),
710 )
711 .await
712 .unwrap();
713 assert_eq!(response.status(), StatusCode::OK);
714 }
715
716 #[tokio::test]
717 async fn test_admin_heartbeat() {
718 let app = setup_axum_app().await;
719 let response = app
720 .oneshot(
721 Request::builder()
722 .uri("/admin/heartbeat")
723 .body(Body::empty())
724 .unwrap(),
725 )
726 .await
727 .unwrap();
728 assert_eq!(response.status(), StatusCode::OK);
729 }
730
731 #[tokio::test]
732 async fn test_admin_heartbeat_help() {
733 let app = setup_axum_app().await;
734 let response = app
735 .oneshot(
736 Request::builder()
737 .uri("/admin/heartbeat/help")
738 .body(Body::empty())
739 .unwrap(),
740 )
741 .await
742 .unwrap();
743 assert_eq!(response.status(), StatusCode::OK);
744 }
745
746 #[tokio::test]
747 async fn test_admin_leader() {
748 let app = setup_axum_app().await;
749 let response = app
750 .oneshot(
751 Request::builder()
752 .uri("/admin/leader")
753 .body(Body::empty())
754 .unwrap(),
755 )
756 .await
757 .unwrap();
758 assert_eq!(response.status(), StatusCode::OK);
759 }
760
761 #[tokio::test]
762 async fn test_admin_maintenance() {
763 let app = setup_axum_app().await;
764 let response = app
765 .oneshot(
766 Request::builder()
767 .uri("/admin/maintenance")
768 .body(Body::empty())
769 .unwrap(),
770 )
771 .await
772 .unwrap();
773 assert_eq!(response.status(), StatusCode::OK);
774 let body = get_body_string(response).await;
775 assert!(body.contains("enabled"));
776 }
777
778 #[tokio::test]
779 async fn test_admin_maintenance_status() {
780 let app = setup_axum_app().await;
781 let response = app
782 .oneshot(
783 Request::builder()
784 .uri("/admin/maintenance/status")
785 .body(Body::empty())
786 .unwrap(),
787 )
788 .await
789 .unwrap();
790 assert_eq!(response.status(), StatusCode::OK);
791 let body = get_body_string(response).await;
792 assert!(body.contains("enabled"));
793 }
794
795 #[tokio::test]
796 async fn test_admin_maintenance_enable_disable() {
797 let response = setup_axum_app()
799 .await
800 .oneshot(
801 Request::builder()
802 .method(Method::POST)
803 .uri("/admin/maintenance/enable")
804 .body(Body::empty())
805 .unwrap(),
806 )
807 .await
808 .unwrap();
809 assert_eq!(response.status(), StatusCode::OK);
810 let body = get_body_string(response).await;
811 assert!(body.contains("enabled"));
812 let response = setup_axum_app()
814 .await
815 .oneshot(
816 Request::builder()
817 .method(Method::POST)
818 .uri("/admin/maintenance/disable")
819 .body(Body::empty())
820 .unwrap(),
821 )
822 .await
823 .unwrap();
824 assert_eq!(response.status(), StatusCode::OK);
825 let body = get_body_string(response).await;
826 assert!(body.contains("enabled"));
827 }
828
829 #[tokio::test]
830 async fn test_admin_procedure_manager_status() {
831 let app = setup_axum_app().await;
832 let response = app
833 .oneshot(
834 Request::builder()
835 .uri("/admin/procedure-manager/status")
836 .body(Body::empty())
837 .unwrap(),
838 )
839 .await
840 .unwrap();
841 assert_eq!(response.status(), StatusCode::OK);
842 let body = get_body_string(response).await;
843 assert!(body.contains("status"));
844 }
845
846 #[tokio::test]
847 async fn test_admin_procedure_manager_pause_resume() {
848 let response = setup_axum_app()
850 .await
851 .oneshot(
852 Request::builder()
853 .method(Method::POST)
854 .uri("/admin/procedure-manager/pause")
855 .body(Body::empty())
856 .unwrap(),
857 )
858 .await
859 .unwrap();
860 assert_eq!(response.status(), StatusCode::OK);
861 let body = get_body_string(response).await;
862 assert!(body.contains("paused"));
863 let response = setup_axum_app()
865 .await
866 .oneshot(
867 Request::builder()
868 .method(Method::POST)
869 .uri("/admin/procedure-manager/resume")
870 .body(Body::empty())
871 .unwrap(),
872 )
873 .await
874 .unwrap();
875 assert_eq!(response.status(), StatusCode::OK);
876 let body = get_body_string(response).await;
877 assert!(body.contains("running"));
878 }
879
880 #[tokio::test]
881 async fn test_admin_recovery() {
882 let app = setup_axum_app().await;
883 let response = app
884 .clone()
885 .oneshot(
886 Request::builder()
887 .uri("/admin/recovery/status")
888 .method(Method::GET)
889 .body(Body::empty())
890 .unwrap(),
891 )
892 .await
893 .unwrap();
894 assert_eq!(response.status(), StatusCode::OK);
895 let body = get_body_string(response).await;
896 assert!(body.contains("false"));
897
898 let response = app
900 .clone()
901 .oneshot(
902 Request::builder()
903 .uri("/admin/recovery/enable")
904 .method(Method::POST)
905 .body(Body::empty())
906 .unwrap(),
907 )
908 .await
909 .unwrap();
910 assert_eq!(response.status(), StatusCode::OK);
911 let body = get_body_string(response).await;
912 assert!(body.contains("true"));
913
914 let response = app
915 .clone()
916 .oneshot(
917 Request::builder()
918 .uri("/admin/recovery/status")
919 .method(Method::GET)
920 .body(Body::empty())
921 .unwrap(),
922 )
923 .await
924 .unwrap();
925 assert_eq!(response.status(), StatusCode::OK);
926 let body = get_body_string(response).await;
927 assert!(body.contains("true"));
928
929 let response = app
931 .clone()
932 .oneshot(
933 Request::builder()
934 .uri("/admin/recovery/disable")
935 .method(Method::POST)
936 .body(Body::empty())
937 .unwrap(),
938 )
939 .await
940 .unwrap();
941 assert_eq!(response.status(), StatusCode::OK);
942 let body = get_body_string(response).await;
943 assert!(body.contains("false"));
944
945 let response = app
946 .clone()
947 .oneshot(
948 Request::builder()
949 .uri("/admin/recovery/status")
950 .method(Method::GET)
951 .body(Body::empty())
952 .unwrap(),
953 )
954 .await
955 .unwrap();
956 assert_eq!(response.status(), StatusCode::OK);
957 let body = get_body_string(response).await;
958 assert!(body.contains("false"));
959 }
960
961 #[tokio::test]
962 async fn test_admin_sequence_table_id() {
963 common_telemetry::init_default_ut_logging();
964 let kv_backend = Arc::new(MemoryKvBackend::new());
965 let metasrv = MetasrvBuilder::new()
966 .options(MetasrvOptions::default())
967 .kv_backend(kv_backend)
968 .build()
969 .await
970 .unwrap();
971 let metasrv = Arc::new(metasrv);
972 let runtime_switch_manager = metasrv.runtime_switch_manager().clone();
973 let app = admin_axum_router(metasrv);
974 runtime_switch_manager.set_recovery_mode().await.unwrap();
976 let response = app
977 .clone()
978 .oneshot(
979 Request::builder()
980 .method(Method::GET)
981 .uri("/admin/sequence/table/next-id")
982 .body(Body::empty())
983 .unwrap(),
984 )
985 .await
986 .unwrap();
987 assert_eq!(response.status(), StatusCode::OK);
988 let body = into_bytes(response).await;
989 let resp: NextTableIdResponse = serde_json::from_slice(&body).unwrap();
990 assert_eq!(resp.next_table_id, 1024);
991
992 let response = app
994 .clone()
995 .oneshot(
996 Request::builder()
997 .method(Method::POST)
998 .header(http::header::CONTENT_TYPE, "application/json")
999 .uri("/admin/sequence/table/set-next-id")
1000 .body(Body::empty())
1001 .unwrap(),
1002 )
1003 .await
1004 .unwrap();
1005 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
1006
1007 let response = app
1009 .clone()
1010 .oneshot(
1011 Request::builder()
1012 .method(Method::POST)
1013 .header(http::header::CONTENT_TYPE, "application/json")
1014 .uri("/admin/sequence/table/set-next-id")
1015 .body(Body::from(r#"{"next_table_id": 0}"#))
1016 .unwrap(),
1017 )
1018 .await
1019 .unwrap();
1020 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
1021 let body = get_body_string(response).await;
1022 assert!(body.contains("is not greater than the current next value"));
1023
1024 let response = app
1026 .clone()
1027 .oneshot(
1028 Request::builder()
1029 .method(Method::POST)
1030 .header(http::header::CONTENT_TYPE, "application/json")
1031 .uri("/admin/sequence/table/set-next-id")
1032 .body(Body::from(r#"{"next_table_id": 2048}"#))
1033 .unwrap(),
1034 )
1035 .await
1036 .unwrap();
1037 assert_eq!(response.status(), StatusCode::OK);
1038
1039 let response = app
1041 .clone()
1042 .oneshot(
1043 Request::builder()
1044 .method(Method::GET)
1045 .uri("/admin/sequence/table/next-id")
1046 .body(Body::empty())
1047 .unwrap(),
1048 )
1049 .await
1050 .unwrap();
1051 assert_eq!(response.status(), StatusCode::OK);
1052 let body = into_bytes(response).await;
1053 let resp: NextTableIdResponse = serde_json::from_slice(&body).unwrap();
1054 assert_eq!(resp.next_table_id, 2048);
1055
1056 runtime_switch_manager.unset_recovery_mode().await.unwrap();
1058 let response = app
1060 .clone()
1061 .oneshot(
1062 Request::builder()
1063 .method(Method::POST)
1064 .header(http::header::CONTENT_TYPE, "application/json")
1065 .uri("/admin/sequence/table/set-next-id")
1066 .body(Body::from(r#"{"next_table_id": 2049}"#))
1067 .unwrap(),
1068 )
1069 .await
1070 .unwrap();
1071 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
1072 let body = get_body_string(response).await;
1073 assert!(body.contains("Setting next table id is only allowed in recovery mode"));
1074 }
1075}