1pub(crate) mod health;
16pub(crate) mod heartbeat;
17pub(crate) mod leader;
18pub(crate) mod maintenance;
19pub(crate) mod node_lease;
20pub(crate) mod procedure;
21pub(crate) mod recovery;
22pub(crate) mod sequencer;
23mod util;
24
25use std::collections::HashMap;
26use std::convert::Infallible;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use axum::{routing, Router as AxumRouter};
31use tonic::body::Body;
32use tonic::codegen::{http, BoxFuture, Service};
33use tonic::server::NamedService;
34
35use crate::metasrv::Metasrv;
36use crate::service::admin::heartbeat::HeartBeatHandler;
37use crate::service::admin::leader::LeaderHandler;
38use crate::service::admin::maintenance::MaintenanceHandler;
39use crate::service::admin::node_lease::NodeLeaseHandler;
40use crate::service::admin::procedure::ProcedureManagerHandler;
41use crate::service::admin::recovery::RecoveryHandler;
42use crate::service::admin::sequencer::TableIdSequenceHandler;
43
44pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
51 let router = Router::new().route("/health", health::HealthHandler);
52
53 let router = router.route(
54 "/node-lease",
55 node_lease::NodeLeaseHandler {
56 meta_peer_client: metasrv.meta_peer_client().clone(),
57 },
58 );
59
60 let handler = heartbeat::HeartBeatHandler {
61 meta_peer_client: metasrv.meta_peer_client().clone(),
62 };
63 let router = router
64 .route("/heartbeat", handler.clone())
65 .route("/heartbeat/help", handler);
66
67 let router = router.route(
68 "/leader",
69 leader::LeaderHandler {
70 election: metasrv.election().cloned(),
71 },
72 );
73
74 let router = router.routes(
75 &[
76 "/maintenance",
77 "/maintenance/status",
78 "/maintenance/enable",
79 "/maintenance/disable",
80 ],
81 maintenance::MaintenanceHandler {
82 manager: metasrv.runtime_switch_manager().clone(),
83 },
84 );
85 let router = router.routes(
86 &[
87 "/procedure-manager/pause",
88 "/procedure-manager/resume",
89 "/procedure-manager/status",
90 ],
91 procedure::ProcedureManagerHandler {
92 manager: metasrv.runtime_switch_manager().clone(),
93 },
94 );
95 let router = Router::nest("/admin", router);
96
97 Admin::new(router)
98}
99
100#[async_trait::async_trait]
101pub trait HttpHandler: Send + Sync {
102 async fn handle(
103 &self,
104 path: &str,
105 method: http::Method,
106 params: &HashMap<String, String>,
107 ) -> crate::Result<http::Response<String>>;
108}
109
110#[derive(Clone)]
111pub struct Admin
112where
113 Self: Send,
114{
115 router: Arc<Router>,
116}
117
118impl Admin {
119 pub fn new(router: Router) -> Self {
120 Self {
121 router: Arc::new(router),
122 }
123 }
124}
125
126impl NamedService for Admin {
127 const NAME: &'static str = "admin";
128}
129
130impl Service<http::Request<Body>> for Admin {
131 type Response = http::Response<Body>;
132 type Error = Infallible;
133 type Future = BoxFuture<Self::Response, Self::Error>;
134
135 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
136 Poll::Ready(Ok(()))
137 }
138
139 fn call(&mut self, req: http::Request<Body>) -> Self::Future {
140 let router = self.router.clone();
141 let query_params = req
142 .uri()
143 .query()
144 .map(|q| {
145 url::form_urlencoded::parse(q.as_bytes())
146 .into_owned()
147 .collect()
148 })
149 .unwrap_or_default();
150 let path = req.uri().path().to_owned();
151 let method = req.method().clone();
152 Box::pin(async move { router.call(&path, method, query_params).await })
153 }
154}
155
156#[derive(Default)]
157pub struct Router {
158 handlers: HashMap<String, Arc<dyn HttpHandler>>,
159}
160
161impl Router {
162 pub fn new() -> Self {
163 Self {
164 handlers: HashMap::default(),
165 }
166 }
167
168 pub fn nest(path: &str, router: Router) -> Self {
169 check_path(path);
170
171 let handlers = router
172 .handlers
173 .into_iter()
174 .map(|(url, handler)| (format!("{path}{url}"), handler))
175 .collect();
176
177 Self { handlers }
178 }
179
180 pub fn route(mut self, path: &str, handler: impl HttpHandler + 'static) -> Self {
181 check_path(path);
182
183 let _ = self.handlers.insert(path.to_owned(), Arc::new(handler));
184
185 self
186 }
187
188 pub fn routes(mut self, paths: &[&str], handler: impl HttpHandler + 'static) -> Self {
189 let handler = Arc::new(handler);
190 for path in paths {
191 check_path(path);
192 let _ = self.handlers.insert(path.to_string(), handler.clone());
193 }
194
195 self
196 }
197
198 pub async fn call(
199 &self,
200 path: &str,
201 method: http::Method,
202 params: HashMap<String, String>,
203 ) -> Result<http::Response<Body>, Infallible> {
204 let handler = match self.handlers.get(path) {
205 Some(handler) => handler,
206 None => {
207 return Ok(http::Response::builder()
208 .status(http::StatusCode::NOT_FOUND)
209 .body(Body::empty())
210 .unwrap())
211 }
212 };
213
214 let res = match handler.handle(path, method, ¶ms).await {
215 Ok(res) => res.map(Body::new),
216 Err(e) => http::Response::builder()
217 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
218 .body(Body::new(e.to_string()))
219 .unwrap(),
220 };
221
222 Ok(res)
223 }
224}
225
226fn check_path(path: &str) {
227 if path.is_empty() || !path.starts_with('/') {
228 panic!("paths must start with a `/`")
229 }
230}
231
232pub fn admin_axum_router(metasrv: Arc<Metasrv>) -> AxumRouter {
234 let node_lease_handler = NodeLeaseHandler {
235 meta_peer_client: metasrv.meta_peer_client().clone(),
236 };
237 let heartbeat_handler = HeartBeatHandler {
238 meta_peer_client: metasrv.meta_peer_client().clone(),
239 };
240 let leader_handler = LeaderHandler {
241 election: metasrv.election().cloned(),
242 };
243 let maintenance_handler = MaintenanceHandler {
244 manager: metasrv.runtime_switch_manager().clone(),
245 };
246 let procedure_handler = ProcedureManagerHandler {
247 manager: metasrv.runtime_switch_manager().clone(),
248 };
249 let recovery_handler = RecoveryHandler {
250 manager: metasrv.runtime_switch_manager().clone(),
251 };
252 let table_id_sequence_handler = TableIdSequenceHandler {
253 table_id_sequence: metasrv.table_id_sequence().clone(),
254 runtime_switch_manager: metasrv.runtime_switch_manager().clone(),
255 };
256
257 let admin_router = AxumRouter::new()
258 .route("/health", routing::get(health::health))
259 .route(
260 "/node-lease",
261 routing::get(node_lease::get).with_state(node_lease_handler),
262 )
263 .route(
264 "/leader",
265 routing::get(leader::get).with_state(leader_handler),
266 )
267 .nest(
268 "/heartbeat",
269 AxumRouter::new()
270 .route("/", routing::get(heartbeat::get))
271 .route("/help", routing::get(heartbeat::help))
272 .with_state(heartbeat_handler),
273 )
274 .nest(
275 "/maintenance",
276 AxumRouter::new()
277 .route("/", routing::get(maintenance::status))
278 .route("/status", routing::get(maintenance::status))
279 .route("/enable", routing::post(maintenance::set))
280 .route("/disable", routing::post(maintenance::unset))
281 .with_state(maintenance_handler),
282 )
283 .nest(
284 "/procedure-manager",
285 AxumRouter::new()
286 .route("/status", routing::get(procedure::status))
287 .route("/pause", routing::post(procedure::pause))
288 .route("/resume", routing::post(procedure::resume))
289 .with_state(procedure_handler),
290 )
291 .nest(
292 "/recovery",
293 AxumRouter::new()
294 .route("/status", routing::get(recovery::status))
295 .route("/enable", routing::post(recovery::set))
296 .route("/disable", routing::post(recovery::unset))
297 .with_state(recovery_handler),
298 )
299 .nest(
300 "/sequence",
301 AxumRouter::new().nest(
302 "/table",
303 AxumRouter::new()
304 .route("/next-id", routing::get(sequencer::get_next_table_id))
305 .route("/set-next-id", routing::post(sequencer::set_next_table_id))
306 .with_state(table_id_sequence_handler.clone()),
307 ),
308 );
309
310 AxumRouter::new().nest("/admin", admin_router)
311}
312
313#[cfg(test)]
314mod tests {
315 use common_meta::kv_backend::memory::MemoryKvBackend;
316 use common_meta::kv_backend::KvBackendRef;
317 use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
318
319 use super::*;
320 use crate::metasrv::builder::MetasrvBuilder;
321 use crate::metasrv::MetasrvOptions;
322 use crate::{bootstrap, error};
323
324 struct MockOkHandler;
325
326 #[async_trait::async_trait]
327 impl HttpHandler for MockOkHandler {
328 async fn handle(
329 &self,
330 _: &str,
331 _: http::Method,
332 _: &HashMap<String, String>,
333 ) -> crate::Result<http::Response<String>> {
334 Ok(http::Response::builder()
335 .status(http::StatusCode::OK)
336 .body("Ok".to_string())
337 .unwrap())
338 }
339 }
340 struct MockEmptyKeyErrorHandler;
341
342 #[async_trait::async_trait]
343 impl HttpHandler for MockEmptyKeyErrorHandler {
344 async fn handle(
345 &self,
346 _: &str,
347 _: http::Method,
348 _: &HashMap<String, String>,
349 ) -> crate::Result<http::Response<String>> {
350 error::EmptyKeySnafu {}.fail()
351 }
352 }
353
354 #[test]
355 fn test_route_nest() {
356 let mock_handler = MockOkHandler {};
357 let router = Router::new().route("/test_node", mock_handler);
358 let router = Router::nest("/test_root", router);
359
360 assert_eq!(1, router.handlers.len());
361 assert!(router.handlers.contains_key("/test_root/test_node"));
362 }
363
364 #[should_panic]
365 #[test]
366 fn test_invalid_path() {
367 check_path("test_node")
368 }
369
370 #[should_panic]
371 #[test]
372 fn test_empty_path() {
373 check_path("")
374 }
375
376 #[tokio::test]
377 async fn test_route_call_ok() {
378 let mock_handler = MockOkHandler {};
379 let router = Router::new().route("/test_node", mock_handler);
380 let router = Router::nest("/test_root", router);
381
382 let res = router
383 .call(
384 "/test_root/test_node",
385 http::Method::GET,
386 HashMap::default(),
387 )
388 .await
389 .unwrap();
390
391 assert!(res.status().is_success());
392 }
393
394 #[tokio::test]
395 async fn test_route_call_no_handler() {
396 let router = Router::new();
397
398 let res = router
399 .call(
400 "/test_root/test_node",
401 http::Method::GET,
402 HashMap::default(),
403 )
404 .await
405 .unwrap();
406
407 assert_eq!(http::StatusCode::NOT_FOUND, res.status());
408 }
409
410 #[tokio::test]
411 async fn test_route_call_err() {
412 let mock_handler = MockEmptyKeyErrorHandler {};
413 let router = Router::new().route("/test_node", mock_handler);
414 let router = Router::nest("/test_root", router);
415
416 let res = router
417 .call(
418 "/test_root/test_node",
419 http::Method::GET,
420 HashMap::default(),
421 )
422 .await
423 .unwrap();
424
425 assert_eq!(http::StatusCode::INTERNAL_SERVER_ERROR, res.status());
426 }
427
428 async fn test_metasrv(kv_backend: KvBackendRef) -> Metasrv {
429 let opts = MetasrvOptions::default();
430 let builder = MetasrvBuilder::new()
431 .options(opts)
432 .kv_backend(kv_backend.clone());
433
434 let metasrv = builder.build().await.unwrap();
435 metasrv
436 }
437
438 async fn send_request(client: &mut DuplexStream, request: &[u8]) -> String {
439 client.write_all(request).await.unwrap();
440 let mut buf = vec![0; 1024];
441 let n = client.read(&mut buf).await.unwrap();
442 String::from_utf8_lossy(&buf[..n]).to_string()
443 }
444
445 #[tokio::test(flavor = "multi_thread")]
446 async fn test_metasrv_maintenance_mode() {
447 common_telemetry::init_default_ut_logging();
448 let kv_backend = Arc::new(MemoryKvBackend::new());
449 let metasrv = test_metasrv(kv_backend).await;
450 metasrv.try_start().await.unwrap();
451
452 let (mut client, server) = tokio::io::duplex(1024);
453 let metasrv = Arc::new(metasrv);
454 let service = metasrv.clone();
455 let _handle = tokio::spawn(async move {
456 let router = bootstrap::router(service);
457 router
458 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
459 .await
460 });
461
462 let response = send_request(
464 &mut client,
465 b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
466 )
467 .await;
468 assert!(response.contains(r#"{"enabled":false}"#));
469 assert!(response.contains("200 OK"));
470
471 let response = send_request(
473 &mut client,
474 b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
475 )
476 .await;
477 assert!(response.contains(r#"{"enabled":true}"#));
478 assert!(response.contains("200 OK"));
479
480 let enabled = metasrv
481 .runtime_switch_manager()
482 .maintenance_mode()
483 .await
484 .unwrap();
485 assert!(enabled);
486
487 let response = send_request(
489 &mut client,
490 b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
491 )
492 .await;
493 assert!(response.contains(r#"{"enabled":true}"#));
494 assert!(response.contains("200 OK"));
495
496 let response = send_request(
498 &mut client,
499 b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
500 )
501 .await;
502 assert!(response.contains(r#"{"enabled":false}"#));
503 assert!(response.contains("200 OK"));
504
505 let enabled = metasrv
506 .runtime_switch_manager()
507 .maintenance_mode()
508 .await
509 .unwrap();
510 assert!(!enabled);
511
512 let response = send_request(
514 &mut client,
515 b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n",
516 )
517 .await;
518 assert!(response.contains(r#"{"enabled":true}"#));
519 assert!(response.contains("200 OK"));
520
521 let response = send_request(
523 &mut client,
524 b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n",
525 )
526 .await;
527 assert!(response.contains(r#"{"enabled":false}"#));
528 assert!(response.contains("200 OK"));
529
530 let response = send_request(
532 &mut client,
533 b"GET /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
534 )
535 .await;
536 assert!(response.contains(r#"{"enabled":false}"#));
537
538 let response = send_request(
540 &mut client,
541 b"POST /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
542 )
543 .await;
544 assert!(response.contains(r#"{"enabled":true}"#));
545
546 let response = send_request(
548 &mut client,
549 b"POST /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
550 )
551 .await;
552 assert!(response.contains(r#"{"enabled":false}"#));
553
554 let response = send_request(
556 &mut client,
557 b"POST /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
558 )
559 .await;
560 assert!(response.contains("404 Not Found"));
561
562 let response = send_request(
564 &mut client,
565 b"GET /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
566 )
567 .await;
568 assert!(response.contains("404 Not Found"));
569
570 let response = send_request(
572 &mut client,
573 b"GET /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
574 )
575 .await;
576 assert!(response.contains("404 Not Found"));
577 }
578
579 #[tokio::test(flavor = "multi_thread")]
580 async fn test_metasrv_procedure_manager_handler() {
581 common_telemetry::init_default_ut_logging();
582 let kv_backend = Arc::new(MemoryKvBackend::new());
583 let metasrv = test_metasrv(kv_backend).await;
584 metasrv.try_start().await.unwrap();
585
586 let (mut client, server) = tokio::io::duplex(1024);
587 let metasrv = Arc::new(metasrv);
588 let service = metasrv.clone();
589 let _handle = tokio::spawn(async move {
590 let router = bootstrap::router(service);
591 router
592 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
593 .await
594 });
595
596 let response = send_request(
598 &mut client,
599 b"GET /admin/procedure-manager/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
600 )
601 .await;
602 assert!(response.contains("200 OK"));
603 assert!(
604 response.contains(r#"{"status":"running"}"#),
605 "response: {}",
606 response
607 );
608
609 let response = send_request(
611 &mut client,
612 b"POST /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
613 )
614 .await;
615 assert!(response.contains("200 OK"));
616 assert!(response.contains(r#"{"status":"paused"}"#));
617
618 let response = send_request(
620 &mut client,
621 b"POST /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
622 )
623 .await;
624 assert!(response.contains("200 OK"));
625 assert!(
626 response.contains(r#"{"status":"running"}"#),
627 "response: {}",
628 response
629 );
630
631 let response = send_request(
633 &mut client,
634 b"GET /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
635 )
636 .await;
637 assert!(response.contains("404 Not Found"));
638
639 let response = send_request(
641 &mut client,
642 b"GET /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
643 )
644 .await;
645 assert!(response.contains("404 Not Found"));
646 }
647}
648
649#[cfg(test)]
650mod axum_admin_tests {
651 use std::sync::Arc;
652
653 use axum::body::{to_bytes, Body};
654 use axum::http::{Method, Request, StatusCode};
655 use common_meta::kv_backend::memory::MemoryKvBackend;
656 use tower::ServiceExt; use super::*;
659 use crate::metasrv::builder::MetasrvBuilder;
660 use crate::metasrv::MetasrvOptions;
661 use crate::service::admin::sequencer::NextTableIdResponse;
662
663 async fn setup_axum_app() -> AxumRouter {
664 let kv_backend = Arc::new(MemoryKvBackend::new());
665 let metasrv = MetasrvBuilder::new()
666 .options(MetasrvOptions::default())
667 .kv_backend(kv_backend)
668 .build()
669 .await
670 .unwrap();
671 let metasrv = Arc::new(metasrv);
672 admin_axum_router(metasrv)
673 }
674
675 async fn get_body_string(resp: axum::response::Response) -> String {
676 let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
677 String::from_utf8_lossy(&body_bytes).to_string()
678 }
679
680 async fn into_bytes(resp: axum::response::Response) -> Vec<u8> {
681 let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
682 body_bytes.to_vec()
683 }
684
685 #[tokio::test]
686 async fn test_admin_health() {
687 let app = setup_axum_app().await;
688 let response = app
689 .oneshot(
690 Request::builder()
691 .uri("/admin/health")
692 .body(Body::empty())
693 .unwrap(),
694 )
695 .await
696 .unwrap();
697 assert_eq!(response.status(), StatusCode::OK);
698 let body = get_body_string(response).await;
699 assert!(body.to_lowercase().contains("ok"));
700 }
701
702 #[tokio::test]
703 async fn test_admin_node_lease() {
704 let app = setup_axum_app().await;
705 let response = app
706 .oneshot(
707 Request::builder()
708 .uri("/admin/node-lease")
709 .body(Body::empty())
710 .unwrap(),
711 )
712 .await
713 .unwrap();
714 assert_eq!(response.status(), StatusCode::OK);
715 }
716
717 #[tokio::test]
718 async fn test_admin_heartbeat() {
719 let app = setup_axum_app().await;
720 let response = app
721 .oneshot(
722 Request::builder()
723 .uri("/admin/heartbeat")
724 .body(Body::empty())
725 .unwrap(),
726 )
727 .await
728 .unwrap();
729 assert_eq!(response.status(), StatusCode::OK);
730 }
731
732 #[tokio::test]
733 async fn test_admin_heartbeat_help() {
734 let app = setup_axum_app().await;
735 let response = app
736 .oneshot(
737 Request::builder()
738 .uri("/admin/heartbeat/help")
739 .body(Body::empty())
740 .unwrap(),
741 )
742 .await
743 .unwrap();
744 assert_eq!(response.status(), StatusCode::OK);
745 }
746
747 #[tokio::test]
748 async fn test_admin_leader() {
749 let app = setup_axum_app().await;
750 let response = app
751 .oneshot(
752 Request::builder()
753 .uri("/admin/leader")
754 .body(Body::empty())
755 .unwrap(),
756 )
757 .await
758 .unwrap();
759 assert_eq!(response.status(), StatusCode::OK);
760 }
761
762 #[tokio::test]
763 async fn test_admin_maintenance() {
764 let app = setup_axum_app().await;
765 let response = app
766 .oneshot(
767 Request::builder()
768 .uri("/admin/maintenance")
769 .body(Body::empty())
770 .unwrap(),
771 )
772 .await
773 .unwrap();
774 assert_eq!(response.status(), StatusCode::OK);
775 let body = get_body_string(response).await;
776 assert!(body.contains("enabled"));
777 }
778
779 #[tokio::test]
780 async fn test_admin_maintenance_status() {
781 let app = setup_axum_app().await;
782 let response = app
783 .oneshot(
784 Request::builder()
785 .uri("/admin/maintenance/status")
786 .body(Body::empty())
787 .unwrap(),
788 )
789 .await
790 .unwrap();
791 assert_eq!(response.status(), StatusCode::OK);
792 let body = get_body_string(response).await;
793 assert!(body.contains("enabled"));
794 }
795
796 #[tokio::test]
797 async fn test_admin_maintenance_enable_disable() {
798 let response = setup_axum_app()
800 .await
801 .oneshot(
802 Request::builder()
803 .method(Method::POST)
804 .uri("/admin/maintenance/enable")
805 .body(Body::empty())
806 .unwrap(),
807 )
808 .await
809 .unwrap();
810 assert_eq!(response.status(), StatusCode::OK);
811 let body = get_body_string(response).await;
812 assert!(body.contains("enabled"));
813 let response = setup_axum_app()
815 .await
816 .oneshot(
817 Request::builder()
818 .method(Method::POST)
819 .uri("/admin/maintenance/disable")
820 .body(Body::empty())
821 .unwrap(),
822 )
823 .await
824 .unwrap();
825 assert_eq!(response.status(), StatusCode::OK);
826 let body = get_body_string(response).await;
827 assert!(body.contains("enabled"));
828 }
829
830 #[tokio::test]
831 async fn test_admin_procedure_manager_status() {
832 let app = setup_axum_app().await;
833 let response = app
834 .oneshot(
835 Request::builder()
836 .uri("/admin/procedure-manager/status")
837 .body(Body::empty())
838 .unwrap(),
839 )
840 .await
841 .unwrap();
842 assert_eq!(response.status(), StatusCode::OK);
843 let body = get_body_string(response).await;
844 assert!(body.contains("status"));
845 }
846
847 #[tokio::test]
848 async fn test_admin_procedure_manager_pause_resume() {
849 let response = setup_axum_app()
851 .await
852 .oneshot(
853 Request::builder()
854 .method(Method::POST)
855 .uri("/admin/procedure-manager/pause")
856 .body(Body::empty())
857 .unwrap(),
858 )
859 .await
860 .unwrap();
861 assert_eq!(response.status(), StatusCode::OK);
862 let body = get_body_string(response).await;
863 assert!(body.contains("paused"));
864 let response = setup_axum_app()
866 .await
867 .oneshot(
868 Request::builder()
869 .method(Method::POST)
870 .uri("/admin/procedure-manager/resume")
871 .body(Body::empty())
872 .unwrap(),
873 )
874 .await
875 .unwrap();
876 assert_eq!(response.status(), StatusCode::OK);
877 let body = get_body_string(response).await;
878 assert!(body.contains("running"));
879 }
880
881 #[tokio::test]
882 async fn test_admin_recovery() {
883 let app = setup_axum_app().await;
884 let response = app
885 .clone()
886 .oneshot(
887 Request::builder()
888 .uri("/admin/recovery/status")
889 .method(Method::GET)
890 .body(Body::empty())
891 .unwrap(),
892 )
893 .await
894 .unwrap();
895 assert_eq!(response.status(), StatusCode::OK);
896 let body = get_body_string(response).await;
897 assert!(body.contains("false"));
898
899 let response = app
901 .clone()
902 .oneshot(
903 Request::builder()
904 .uri("/admin/recovery/enable")
905 .method(Method::POST)
906 .body(Body::empty())
907 .unwrap(),
908 )
909 .await
910 .unwrap();
911 assert_eq!(response.status(), StatusCode::OK);
912 let body = get_body_string(response).await;
913 assert!(body.contains("true"));
914
915 let response = app
916 .clone()
917 .oneshot(
918 Request::builder()
919 .uri("/admin/recovery/status")
920 .method(Method::GET)
921 .body(Body::empty())
922 .unwrap(),
923 )
924 .await
925 .unwrap();
926 assert_eq!(response.status(), StatusCode::OK);
927 let body = get_body_string(response).await;
928 assert!(body.contains("true"));
929
930 let response = app
932 .clone()
933 .oneshot(
934 Request::builder()
935 .uri("/admin/recovery/disable")
936 .method(Method::POST)
937 .body(Body::empty())
938 .unwrap(),
939 )
940 .await
941 .unwrap();
942 assert_eq!(response.status(), StatusCode::OK);
943 let body = get_body_string(response).await;
944 assert!(body.contains("false"));
945
946 let response = app
947 .clone()
948 .oneshot(
949 Request::builder()
950 .uri("/admin/recovery/status")
951 .method(Method::GET)
952 .body(Body::empty())
953 .unwrap(),
954 )
955 .await
956 .unwrap();
957 assert_eq!(response.status(), StatusCode::OK);
958 let body = get_body_string(response).await;
959 assert!(body.contains("false"));
960 }
961
962 #[tokio::test]
963 async fn test_admin_sequence_table_id() {
964 common_telemetry::init_default_ut_logging();
965 let kv_backend = Arc::new(MemoryKvBackend::new());
966 let metasrv = MetasrvBuilder::new()
967 .options(MetasrvOptions::default())
968 .kv_backend(kv_backend)
969 .build()
970 .await
971 .unwrap();
972 let metasrv = Arc::new(metasrv);
973 let runtime_switch_manager = metasrv.runtime_switch_manager().clone();
974 let app = admin_axum_router(metasrv);
975 runtime_switch_manager.set_recovery_mode().await.unwrap();
977 let response = app
978 .clone()
979 .oneshot(
980 Request::builder()
981 .method(Method::GET)
982 .uri("/admin/sequence/table/next-id")
983 .body(Body::empty())
984 .unwrap(),
985 )
986 .await
987 .unwrap();
988 assert_eq!(response.status(), StatusCode::OK);
989 let body = into_bytes(response).await;
990 let resp: NextTableIdResponse = serde_json::from_slice(&body).unwrap();
991 assert_eq!(resp.next_table_id, 1024);
992
993 let response = app
995 .clone()
996 .oneshot(
997 Request::builder()
998 .method(Method::POST)
999 .header(http::header::CONTENT_TYPE, "application/json")
1000 .uri("/admin/sequence/table/set-next-id")
1001 .body(Body::empty())
1002 .unwrap(),
1003 )
1004 .await
1005 .unwrap();
1006 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
1007
1008 let response = app
1010 .clone()
1011 .oneshot(
1012 Request::builder()
1013 .method(Method::POST)
1014 .header(http::header::CONTENT_TYPE, "application/json")
1015 .uri("/admin/sequence/table/set-next-id")
1016 .body(Body::from(r#"{"next_table_id": 0}"#))
1017 .unwrap(),
1018 )
1019 .await
1020 .unwrap();
1021 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
1022 let body = get_body_string(response).await;
1023 assert!(body.contains("is not greater than the current next value"));
1024
1025 let response = app
1027 .clone()
1028 .oneshot(
1029 Request::builder()
1030 .method(Method::POST)
1031 .header(http::header::CONTENT_TYPE, "application/json")
1032 .uri("/admin/sequence/table/set-next-id")
1033 .body(Body::from(r#"{"next_table_id": 2048}"#))
1034 .unwrap(),
1035 )
1036 .await
1037 .unwrap();
1038 assert_eq!(response.status(), StatusCode::OK);
1039
1040 let response = app
1042 .clone()
1043 .oneshot(
1044 Request::builder()
1045 .method(Method::GET)
1046 .uri("/admin/sequence/table/next-id")
1047 .body(Body::empty())
1048 .unwrap(),
1049 )
1050 .await
1051 .unwrap();
1052 assert_eq!(response.status(), StatusCode::OK);
1053 let body = into_bytes(response).await;
1054 let resp: NextTableIdResponse = serde_json::from_slice(&body).unwrap();
1055 assert_eq!(resp.next_table_id, 2048);
1056
1057 runtime_switch_manager.unset_recovery_mode().await.unwrap();
1059 let response = app
1061 .clone()
1062 .oneshot(
1063 Request::builder()
1064 .method(Method::POST)
1065 .header(http::header::CONTENT_TYPE, "application/json")
1066 .uri("/admin/sequence/table/set-next-id")
1067 .body(Body::from(r#"{"next_table_id": 2049}"#))
1068 .unwrap(),
1069 )
1070 .await
1071 .unwrap();
1072 assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
1073 let body = get_body_string(response).await;
1074 assert!(body.contains("Setting next table id is only allowed in recovery mode"));
1075 }
1076}