meta_srv/service/
admin.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod health;
16pub(crate) mod heartbeat;
17pub(crate) mod leader;
18pub(crate) mod maintenance;
19pub(crate) mod node_lease;
20pub(crate) mod procedure;
21pub(crate) mod recovery;
22pub(crate) mod sequencer;
23mod util;
24
25use std::collections::HashMap;
26use std::convert::Infallible;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use axum::{Router as AxumRouter, routing};
31use tonic::body::Body;
32use tonic::codegen::{BoxFuture, Service, http};
33use tonic::server::NamedService;
34
35use crate::metasrv::Metasrv;
36use crate::service::admin::heartbeat::HeartBeatHandler;
37use crate::service::admin::leader::LeaderHandler;
38use crate::service::admin::maintenance::MaintenanceHandler;
39use crate::service::admin::node_lease::NodeLeaseHandler;
40use crate::service::admin::procedure::ProcedureManagerHandler;
41use crate::service::admin::recovery::RecoveryHandler;
42use crate::service::admin::sequencer::TableIdSequenceHandler;
43
44/// Expose admin http service on rpc port(3002).
45///
46/// # Deprecated
47///
48/// This function is deprecated and will be removed in the future. Please use
49/// [`admin_axum_router`] instead.
50pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
51    let router = Router::new().route("/health", health::HealthHandler);
52
53    let router = router.route(
54        "/node-lease",
55        node_lease::NodeLeaseHandler {
56            meta_peer_client: metasrv.meta_peer_client().clone(),
57        },
58    );
59
60    let handler = heartbeat::HeartBeatHandler {
61        meta_peer_client: metasrv.meta_peer_client().clone(),
62    };
63    let router = router
64        .route("/heartbeat", handler.clone())
65        .route("/heartbeat/help", handler);
66
67    let router = router.route(
68        "/leader",
69        leader::LeaderHandler {
70            election: metasrv.election().cloned(),
71        },
72    );
73
74    let router = router.routes(
75        &[
76            "/maintenance",
77            "/maintenance/status",
78            "/maintenance/enable",
79            "/maintenance/disable",
80        ],
81        maintenance::MaintenanceHandler {
82            manager: metasrv.runtime_switch_manager().clone(),
83        },
84    );
85    let router = router.routes(
86        &[
87            "/procedure-manager/pause",
88            "/procedure-manager/resume",
89            "/procedure-manager/status",
90        ],
91        procedure::ProcedureManagerHandler {
92            manager: metasrv.runtime_switch_manager().clone(),
93        },
94    );
95    let router = Router::nest("/admin", router);
96
97    Admin::new(router)
98}
99
100#[async_trait::async_trait]
101pub trait HttpHandler: Send + Sync {
102    async fn handle(
103        &self,
104        path: &str,
105        method: http::Method,
106        params: &HashMap<String, String>,
107    ) -> crate::Result<http::Response<String>>;
108}
109
110#[derive(Clone)]
111pub struct Admin
112where
113    Self: Send,
114{
115    router: Arc<Router>,
116}
117
118impl Admin {
119    pub fn new(router: Router) -> Self {
120        Self {
121            router: Arc::new(router),
122        }
123    }
124}
125
126impl NamedService for Admin {
127    const NAME: &'static str = "admin";
128}
129
130impl Service<http::Request<Body>> for Admin {
131    type Response = http::Response<Body>;
132    type Error = Infallible;
133    type Future = BoxFuture<Self::Response, Self::Error>;
134
135    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
136        Poll::Ready(Ok(()))
137    }
138
139    fn call(&mut self, req: http::Request<Body>) -> Self::Future {
140        let router = self.router.clone();
141        let query_params = req
142            .uri()
143            .query()
144            .map(|q| {
145                url::form_urlencoded::parse(q.as_bytes())
146                    .into_owned()
147                    .collect()
148            })
149            .unwrap_or_default();
150        let path = req.uri().path().to_owned();
151        let method = req.method().clone();
152        Box::pin(async move { router.call(&path, method, query_params).await })
153    }
154}
155
156#[derive(Default)]
157pub struct Router {
158    handlers: HashMap<String, Arc<dyn HttpHandler>>,
159}
160
161impl Router {
162    pub fn new() -> Self {
163        Self {
164            handlers: HashMap::default(),
165        }
166    }
167
168    pub fn nest(path: &str, router: Router) -> Self {
169        check_path(path);
170
171        let handlers = router
172            .handlers
173            .into_iter()
174            .map(|(url, handler)| (format!("{path}{url}"), handler))
175            .collect();
176
177        Self { handlers }
178    }
179
180    pub fn route(mut self, path: &str, handler: impl HttpHandler + 'static) -> Self {
181        check_path(path);
182
183        let _ = self.handlers.insert(path.to_owned(), Arc::new(handler));
184
185        self
186    }
187
188    pub fn routes(mut self, paths: &[&str], handler: impl HttpHandler + 'static) -> Self {
189        let handler = Arc::new(handler);
190        for path in paths {
191            check_path(path);
192            let _ = self.handlers.insert(path.to_string(), handler.clone());
193        }
194
195        self
196    }
197
198    pub async fn call(
199        &self,
200        path: &str,
201        method: http::Method,
202        params: HashMap<String, String>,
203    ) -> Result<http::Response<Body>, Infallible> {
204        let handler = match self.handlers.get(path) {
205            Some(handler) => handler,
206            None => {
207                return Ok(http::Response::builder()
208                    .status(http::StatusCode::NOT_FOUND)
209                    .body(Body::empty())
210                    .unwrap());
211            }
212        };
213
214        let res = match handler.handle(path, method, &params).await {
215            Ok(res) => res.map(Body::new),
216            Err(e) => http::Response::builder()
217                .status(http::StatusCode::INTERNAL_SERVER_ERROR)
218                .body(Body::new(e.to_string()))
219                .unwrap(),
220        };
221
222        Ok(res)
223    }
224}
225
226fn check_path(path: &str) {
227    if path.is_empty() || !path.starts_with('/') {
228        panic!("paths must start with a `/`")
229    }
230}
231
232/// Expose admin HTTP endpoints as an Axum router for the main HTTP server.
233pub fn admin_axum_router(metasrv: Arc<Metasrv>) -> AxumRouter {
234    let node_lease_handler = NodeLeaseHandler {
235        meta_peer_client: metasrv.meta_peer_client().clone(),
236    };
237    let heartbeat_handler = HeartBeatHandler {
238        meta_peer_client: metasrv.meta_peer_client().clone(),
239    };
240    let leader_handler = LeaderHandler {
241        election: metasrv.election().cloned(),
242    };
243    let maintenance_handler = MaintenanceHandler {
244        manager: metasrv.runtime_switch_manager().clone(),
245    };
246    let procedure_handler = ProcedureManagerHandler {
247        manager: metasrv.runtime_switch_manager().clone(),
248    };
249    let recovery_handler = RecoveryHandler {
250        manager: metasrv.runtime_switch_manager().clone(),
251    };
252    let table_id_sequence_handler = TableIdSequenceHandler {
253        table_id_sequence: metasrv.table_id_sequence().clone(),
254        runtime_switch_manager: metasrv.runtime_switch_manager().clone(),
255    };
256
257    let admin_router = AxumRouter::new()
258        .route("/health", routing::get(health::health))
259        .route(
260            "/node-lease",
261            routing::get(node_lease::get).with_state(node_lease_handler),
262        )
263        .route(
264            "/leader",
265            routing::get(leader::get).with_state(leader_handler),
266        )
267        .nest(
268            "/heartbeat",
269            AxumRouter::new()
270                .route("/", routing::get(heartbeat::get))
271                .route("/help", routing::get(heartbeat::help))
272                .with_state(heartbeat_handler),
273        )
274        .nest(
275            "/maintenance",
276            AxumRouter::new()
277                .route("/", routing::get(maintenance::status))
278                .route("/status", routing::get(maintenance::status))
279                .route("/enable", routing::post(maintenance::set))
280                .route("/disable", routing::post(maintenance::unset))
281                .with_state(maintenance_handler),
282        )
283        .nest(
284            "/procedure-manager",
285            AxumRouter::new()
286                .route("/status", routing::get(procedure::status))
287                .route("/pause", routing::post(procedure::pause))
288                .route("/resume", routing::post(procedure::resume))
289                .with_state(procedure_handler),
290        )
291        .nest(
292            "/recovery",
293            AxumRouter::new()
294                .route("/status", routing::get(recovery::status))
295                .route("/enable", routing::post(recovery::set))
296                .route("/disable", routing::post(recovery::unset))
297                .with_state(recovery_handler),
298        )
299        .nest(
300            "/sequence",
301            AxumRouter::new().nest(
302                "/table",
303                AxumRouter::new()
304                    .route("/next-id", routing::get(sequencer::get_next_table_id))
305                    .route("/set-next-id", routing::post(sequencer::set_next_table_id))
306                    .with_state(table_id_sequence_handler.clone()),
307            ),
308        );
309
310    AxumRouter::new().nest("/admin", admin_router)
311}
312
313#[cfg(test)]
314mod tests {
315    use common_meta::kv_backend::KvBackendRef;
316    use common_meta::kv_backend::memory::MemoryKvBackend;
317    use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
318
319    use super::*;
320    use crate::metasrv::MetasrvOptions;
321    use crate::metasrv::builder::MetasrvBuilder;
322    use crate::{bootstrap, error};
323
324    struct MockOkHandler;
325
326    #[async_trait::async_trait]
327    impl HttpHandler for MockOkHandler {
328        async fn handle(
329            &self,
330            _: &str,
331            _: http::Method,
332            _: &HashMap<String, String>,
333        ) -> crate::Result<http::Response<String>> {
334            Ok(http::Response::builder()
335                .status(http::StatusCode::OK)
336                .body("Ok".to_string())
337                .unwrap())
338        }
339    }
340    struct MockEmptyKeyErrorHandler;
341
342    #[async_trait::async_trait]
343    impl HttpHandler for MockEmptyKeyErrorHandler {
344        async fn handle(
345            &self,
346            _: &str,
347            _: http::Method,
348            _: &HashMap<String, String>,
349        ) -> crate::Result<http::Response<String>> {
350            error::EmptyKeySnafu {}.fail()
351        }
352    }
353
354    #[test]
355    fn test_route_nest() {
356        let mock_handler = MockOkHandler {};
357        let router = Router::new().route("/test_node", mock_handler);
358        let router = Router::nest("/test_root", router);
359
360        assert_eq!(1, router.handlers.len());
361        assert!(router.handlers.contains_key("/test_root/test_node"));
362    }
363
364    #[should_panic]
365    #[test]
366    fn test_invalid_path() {
367        check_path("test_node")
368    }
369
370    #[should_panic]
371    #[test]
372    fn test_empty_path() {
373        check_path("")
374    }
375
376    #[tokio::test]
377    async fn test_route_call_ok() {
378        let mock_handler = MockOkHandler {};
379        let router = Router::new().route("/test_node", mock_handler);
380        let router = Router::nest("/test_root", router);
381
382        let res = router
383            .call(
384                "/test_root/test_node",
385                http::Method::GET,
386                HashMap::default(),
387            )
388            .await
389            .unwrap();
390
391        assert!(res.status().is_success());
392    }
393
394    #[tokio::test]
395    async fn test_route_call_no_handler() {
396        let router = Router::new();
397
398        let res = router
399            .call(
400                "/test_root/test_node",
401                http::Method::GET,
402                HashMap::default(),
403            )
404            .await
405            .unwrap();
406
407        assert_eq!(http::StatusCode::NOT_FOUND, res.status());
408    }
409
410    #[tokio::test]
411    async fn test_route_call_err() {
412        let mock_handler = MockEmptyKeyErrorHandler {};
413        let router = Router::new().route("/test_node", mock_handler);
414        let router = Router::nest("/test_root", router);
415
416        let res = router
417            .call(
418                "/test_root/test_node",
419                http::Method::GET,
420                HashMap::default(),
421            )
422            .await
423            .unwrap();
424
425        assert_eq!(http::StatusCode::INTERNAL_SERVER_ERROR, res.status());
426    }
427
428    async fn test_metasrv(kv_backend: KvBackendRef) -> Metasrv {
429        let opts = MetasrvOptions::default();
430        let builder = MetasrvBuilder::new()
431            .options(opts)
432            .kv_backend(kv_backend.clone());
433
434        builder.build().await.unwrap()
435    }
436
437    async fn send_request(client: &mut DuplexStream, request: &[u8]) -> String {
438        client.write_all(request).await.unwrap();
439        let mut buf = vec![0; 1024];
440        let n = client.read(&mut buf).await.unwrap();
441        String::from_utf8_lossy(&buf[..n]).to_string()
442    }
443
444    #[tokio::test(flavor = "multi_thread")]
445    async fn test_metasrv_maintenance_mode() {
446        common_telemetry::init_default_ut_logging();
447        let kv_backend = Arc::new(MemoryKvBackend::new());
448        let metasrv = test_metasrv(kv_backend).await;
449        metasrv.try_start().await.unwrap();
450
451        let (mut client, server) = tokio::io::duplex(1024);
452        let metasrv = Arc::new(metasrv);
453        let service = metasrv.clone();
454        let _handle = tokio::spawn(async move {
455            let router = bootstrap::router(service);
456            router
457                .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
458                .await
459        });
460
461        // Get maintenance mode
462        let response = send_request(
463            &mut client,
464            b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
465        )
466        .await;
467        assert!(response.contains(r#"{"enabled":false}"#));
468        assert!(response.contains("200 OK"));
469
470        // Set maintenance mode to true
471        let response = send_request(
472            &mut client,
473            b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
474        )
475        .await;
476        assert!(response.contains(r#"{"enabled":true}"#));
477        assert!(response.contains("200 OK"));
478
479        let enabled = metasrv
480            .runtime_switch_manager()
481            .maintenance_mode()
482            .await
483            .unwrap();
484        assert!(enabled);
485
486        // Get maintenance mode again
487        let response = send_request(
488            &mut client,
489            b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
490        )
491        .await;
492        assert!(response.contains(r#"{"enabled":true}"#));
493        assert!(response.contains("200 OK"));
494
495        // Set maintenance mode to false
496        let response = send_request(
497            &mut client,
498            b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
499        )
500        .await;
501        assert!(response.contains(r#"{"enabled":false}"#));
502        assert!(response.contains("200 OK"));
503
504        let enabled = metasrv
505            .runtime_switch_manager()
506            .maintenance_mode()
507            .await
508            .unwrap();
509        assert!(!enabled);
510
511        // Set maintenance mode to true via GET request
512        let response = send_request(
513            &mut client,
514            b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n",
515        )
516        .await;
517        assert!(response.contains(r#"{"enabled":true}"#));
518        assert!(response.contains("200 OK"));
519
520        // Set maintenance mode to false via GET request
521        let response = send_request(
522            &mut client,
523            b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n",
524        )
525        .await;
526        assert!(response.contains(r#"{"enabled":false}"#));
527        assert!(response.contains("200 OK"));
528
529        // Get maintenance mode via status path
530        let response = send_request(
531            &mut client,
532            b"GET /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
533        )
534        .await;
535        assert!(response.contains(r#"{"enabled":false}"#));
536
537        // Set maintenance mode via enable path
538        let response = send_request(
539            &mut client,
540            b"POST /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
541        )
542        .await;
543        assert!(response.contains(r#"{"enabled":true}"#));
544
545        // Unset maintenance mode via disable path
546        let response = send_request(
547            &mut client,
548            b"POST /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
549        )
550        .await;
551        assert!(response.contains(r#"{"enabled":false}"#));
552
553        // send POST request to status path
554        let response = send_request(
555            &mut client,
556            b"POST /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
557        )
558        .await;
559        assert!(response.contains("404 Not Found"));
560
561        // send GET request to enable path
562        let response = send_request(
563            &mut client,
564            b"GET /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
565        )
566        .await;
567        assert!(response.contains("404 Not Found"));
568
569        // send GET request to disable path
570        let response = send_request(
571            &mut client,
572            b"GET /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
573        )
574        .await;
575        assert!(response.contains("404 Not Found"));
576    }
577
578    #[tokio::test(flavor = "multi_thread")]
579    async fn test_metasrv_procedure_manager_handler() {
580        common_telemetry::init_default_ut_logging();
581        let kv_backend = Arc::new(MemoryKvBackend::new());
582        let metasrv = test_metasrv(kv_backend).await;
583        metasrv.try_start().await.unwrap();
584
585        let (mut client, server) = tokio::io::duplex(1024);
586        let metasrv = Arc::new(metasrv);
587        let service = metasrv.clone();
588        let _handle = tokio::spawn(async move {
589            let router = bootstrap::router(service);
590            router
591                .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
592                .await
593        });
594
595        // send GET request to procedure-manager/status path
596        let response = send_request(
597            &mut client,
598            b"GET /admin/procedure-manager/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
599        )
600        .await;
601        assert!(response.contains("200 OK"));
602        assert!(
603            response.contains(r#"{"status":"running"}"#),
604            "response: {}",
605            response
606        );
607
608        // send POST request to procedure-manager/pause path
609        let response = send_request(
610            &mut client,
611            b"POST /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
612        )
613        .await;
614        assert!(response.contains("200 OK"));
615        assert!(response.contains(r#"{"status":"paused"}"#));
616
617        // send POST request to procedure-manager/resume path
618        let response = send_request(
619            &mut client,
620            b"POST /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
621        )
622        .await;
623        assert!(response.contains("200 OK"));
624        assert!(
625            response.contains(r#"{"status":"running"}"#),
626            "response: {}",
627            response
628        );
629
630        // send GET request to procedure-manager/resume path
631        let response = send_request(
632            &mut client,
633            b"GET /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
634        )
635        .await;
636        assert!(response.contains("404 Not Found"));
637
638        // send GET request to procedure-manager/pause path
639        let response = send_request(
640            &mut client,
641            b"GET /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
642        )
643        .await;
644        assert!(response.contains("404 Not Found"));
645    }
646}
647
648#[cfg(test)]
649mod axum_admin_tests {
650    use std::sync::Arc;
651
652    use axum::body::{Body, to_bytes};
653    use axum::http::{Method, Request, StatusCode};
654    use common_meta::kv_backend::memory::MemoryKvBackend;
655    use tower::ServiceExt; // for `oneshot`
656
657    use super::*;
658    use crate::metasrv::MetasrvOptions;
659    use crate::metasrv::builder::MetasrvBuilder;
660    use crate::service::admin::sequencer::NextTableIdResponse;
661
662    async fn setup_axum_app() -> AxumRouter {
663        let kv_backend = Arc::new(MemoryKvBackend::new());
664        let metasrv = MetasrvBuilder::new()
665            .options(MetasrvOptions::default())
666            .kv_backend(kv_backend)
667            .build()
668            .await
669            .unwrap();
670        let metasrv = Arc::new(metasrv);
671        admin_axum_router(metasrv)
672    }
673
674    async fn get_body_string(resp: axum::response::Response) -> String {
675        let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
676        String::from_utf8_lossy(&body_bytes).to_string()
677    }
678
679    async fn into_bytes(resp: axum::response::Response) -> Vec<u8> {
680        let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
681        body_bytes.to_vec()
682    }
683
684    #[tokio::test]
685    async fn test_admin_health() {
686        let app = setup_axum_app().await;
687        let response = app
688            .oneshot(
689                Request::builder()
690                    .uri("/admin/health")
691                    .body(Body::empty())
692                    .unwrap(),
693            )
694            .await
695            .unwrap();
696        assert_eq!(response.status(), StatusCode::OK);
697        let body = get_body_string(response).await;
698        assert!(body.to_lowercase().contains("ok"));
699    }
700
701    #[tokio::test]
702    async fn test_admin_node_lease() {
703        let app = setup_axum_app().await;
704        let response = app
705            .oneshot(
706                Request::builder()
707                    .uri("/admin/node-lease")
708                    .body(Body::empty())
709                    .unwrap(),
710            )
711            .await
712            .unwrap();
713        assert_eq!(response.status(), StatusCode::OK);
714    }
715
716    #[tokio::test]
717    async fn test_admin_heartbeat() {
718        let app = setup_axum_app().await;
719        let response = app
720            .oneshot(
721                Request::builder()
722                    .uri("/admin/heartbeat")
723                    .body(Body::empty())
724                    .unwrap(),
725            )
726            .await
727            .unwrap();
728        assert_eq!(response.status(), StatusCode::OK);
729    }
730
731    #[tokio::test]
732    async fn test_admin_heartbeat_help() {
733        let app = setup_axum_app().await;
734        let response = app
735            .oneshot(
736                Request::builder()
737                    .uri("/admin/heartbeat/help")
738                    .body(Body::empty())
739                    .unwrap(),
740            )
741            .await
742            .unwrap();
743        assert_eq!(response.status(), StatusCode::OK);
744    }
745
746    #[tokio::test]
747    async fn test_admin_leader() {
748        let app = setup_axum_app().await;
749        let response = app
750            .oneshot(
751                Request::builder()
752                    .uri("/admin/leader")
753                    .body(Body::empty())
754                    .unwrap(),
755            )
756            .await
757            .unwrap();
758        assert_eq!(response.status(), StatusCode::OK);
759    }
760
761    #[tokio::test]
762    async fn test_admin_maintenance() {
763        let app = setup_axum_app().await;
764        let response = app
765            .oneshot(
766                Request::builder()
767                    .uri("/admin/maintenance")
768                    .body(Body::empty())
769                    .unwrap(),
770            )
771            .await
772            .unwrap();
773        assert_eq!(response.status(), StatusCode::OK);
774        let body = get_body_string(response).await;
775        assert!(body.contains("enabled"));
776    }
777
778    #[tokio::test]
779    async fn test_admin_maintenance_status() {
780        let app = setup_axum_app().await;
781        let response = app
782            .oneshot(
783                Request::builder()
784                    .uri("/admin/maintenance/status")
785                    .body(Body::empty())
786                    .unwrap(),
787            )
788            .await
789            .unwrap();
790        assert_eq!(response.status(), StatusCode::OK);
791        let body = get_body_string(response).await;
792        assert!(body.contains("enabled"));
793    }
794
795    #[tokio::test]
796    async fn test_admin_maintenance_enable_disable() {
797        // Enable maintenance
798        let response = setup_axum_app()
799            .await
800            .oneshot(
801                Request::builder()
802                    .method(Method::POST)
803                    .uri("/admin/maintenance/enable")
804                    .body(Body::empty())
805                    .unwrap(),
806            )
807            .await
808            .unwrap();
809        assert_eq!(response.status(), StatusCode::OK);
810        let body = get_body_string(response).await;
811        assert!(body.contains("enabled"));
812        // Disable maintenance
813        let response = setup_axum_app()
814            .await
815            .oneshot(
816                Request::builder()
817                    .method(Method::POST)
818                    .uri("/admin/maintenance/disable")
819                    .body(Body::empty())
820                    .unwrap(),
821            )
822            .await
823            .unwrap();
824        assert_eq!(response.status(), StatusCode::OK);
825        let body = get_body_string(response).await;
826        assert!(body.contains("enabled"));
827    }
828
829    #[tokio::test]
830    async fn test_admin_procedure_manager_status() {
831        let app = setup_axum_app().await;
832        let response = app
833            .oneshot(
834                Request::builder()
835                    .uri("/admin/procedure-manager/status")
836                    .body(Body::empty())
837                    .unwrap(),
838            )
839            .await
840            .unwrap();
841        assert_eq!(response.status(), StatusCode::OK);
842        let body = get_body_string(response).await;
843        assert!(body.contains("status"));
844    }
845
846    #[tokio::test]
847    async fn test_admin_procedure_manager_pause_resume() {
848        // Pause
849        let response = setup_axum_app()
850            .await
851            .oneshot(
852                Request::builder()
853                    .method(Method::POST)
854                    .uri("/admin/procedure-manager/pause")
855                    .body(Body::empty())
856                    .unwrap(),
857            )
858            .await
859            .unwrap();
860        assert_eq!(response.status(), StatusCode::OK);
861        let body = get_body_string(response).await;
862        assert!(body.contains("paused"));
863        // Resume
864        let response = setup_axum_app()
865            .await
866            .oneshot(
867                Request::builder()
868                    .method(Method::POST)
869                    .uri("/admin/procedure-manager/resume")
870                    .body(Body::empty())
871                    .unwrap(),
872            )
873            .await
874            .unwrap();
875        assert_eq!(response.status(), StatusCode::OK);
876        let body = get_body_string(response).await;
877        assert!(body.contains("running"));
878    }
879
880    #[tokio::test]
881    async fn test_admin_recovery() {
882        let app = setup_axum_app().await;
883        let response = app
884            .clone()
885            .oneshot(
886                Request::builder()
887                    .uri("/admin/recovery/status")
888                    .method(Method::GET)
889                    .body(Body::empty())
890                    .unwrap(),
891            )
892            .await
893            .unwrap();
894        assert_eq!(response.status(), StatusCode::OK);
895        let body = get_body_string(response).await;
896        assert!(body.contains("false"));
897
898        // Enable recovery
899        let response = app
900            .clone()
901            .oneshot(
902                Request::builder()
903                    .uri("/admin/recovery/enable")
904                    .method(Method::POST)
905                    .body(Body::empty())
906                    .unwrap(),
907            )
908            .await
909            .unwrap();
910        assert_eq!(response.status(), StatusCode::OK);
911        let body = get_body_string(response).await;
912        assert!(body.contains("true"));
913
914        let response = app
915            .clone()
916            .oneshot(
917                Request::builder()
918                    .uri("/admin/recovery/status")
919                    .method(Method::GET)
920                    .body(Body::empty())
921                    .unwrap(),
922            )
923            .await
924            .unwrap();
925        assert_eq!(response.status(), StatusCode::OK);
926        let body = get_body_string(response).await;
927        assert!(body.contains("true"));
928
929        // Disable recovery
930        let response = app
931            .clone()
932            .oneshot(
933                Request::builder()
934                    .uri("/admin/recovery/disable")
935                    .method(Method::POST)
936                    .body(Body::empty())
937                    .unwrap(),
938            )
939            .await
940            .unwrap();
941        assert_eq!(response.status(), StatusCode::OK);
942        let body = get_body_string(response).await;
943        assert!(body.contains("false"));
944
945        let response = app
946            .clone()
947            .oneshot(
948                Request::builder()
949                    .uri("/admin/recovery/status")
950                    .method(Method::GET)
951                    .body(Body::empty())
952                    .unwrap(),
953            )
954            .await
955            .unwrap();
956        assert_eq!(response.status(), StatusCode::OK);
957        let body = get_body_string(response).await;
958        assert!(body.contains("false"));
959    }
960
961    #[tokio::test]
962    async fn test_admin_sequence_table_id() {
963        common_telemetry::init_default_ut_logging();
964        let kv_backend = Arc::new(MemoryKvBackend::new());
965        let metasrv = MetasrvBuilder::new()
966            .options(MetasrvOptions::default())
967            .kv_backend(kv_backend)
968            .build()
969            .await
970            .unwrap();
971        let metasrv = Arc::new(metasrv);
972        let runtime_switch_manager = metasrv.runtime_switch_manager().clone();
973        let app = admin_axum_router(metasrv);
974        // Set recovery mode to true
975        runtime_switch_manager.set_recovery_mode().await.unwrap();
976        let response = app
977            .clone()
978            .oneshot(
979                Request::builder()
980                    .method(Method::GET)
981                    .uri("/admin/sequence/table/next-id")
982                    .body(Body::empty())
983                    .unwrap(),
984            )
985            .await
986            .unwrap();
987        assert_eq!(response.status(), StatusCode::OK);
988        let body = into_bytes(response).await;
989        let resp: NextTableIdResponse = serde_json::from_slice(&body).unwrap();
990        assert_eq!(resp.next_table_id, 1024);
991
992        // Bad request
993        let response = app
994            .clone()
995            .oneshot(
996                Request::builder()
997                    .method(Method::POST)
998                    .header(http::header::CONTENT_TYPE, "application/json")
999                    .uri("/admin/sequence/table/set-next-id")
1000                    .body(Body::empty())
1001                    .unwrap(),
1002            )
1003            .await
1004            .unwrap();
1005        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
1006
1007        // Bad next id
1008        let response = app
1009            .clone()
1010            .oneshot(
1011                Request::builder()
1012                    .method(Method::POST)
1013                    .header(http::header::CONTENT_TYPE, "application/json")
1014                    .uri("/admin/sequence/table/set-next-id")
1015                    .body(Body::from(r#"{"next_table_id": 0}"#))
1016                    .unwrap(),
1017            )
1018            .await
1019            .unwrap();
1020        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
1021        let body = get_body_string(response).await;
1022        assert!(body.contains("is not greater than the current next value"));
1023
1024        // Set next id
1025        let response = app
1026            .clone()
1027            .oneshot(
1028                Request::builder()
1029                    .method(Method::POST)
1030                    .header(http::header::CONTENT_TYPE, "application/json")
1031                    .uri("/admin/sequence/table/set-next-id")
1032                    .body(Body::from(r#"{"next_table_id": 2048}"#))
1033                    .unwrap(),
1034            )
1035            .await
1036            .unwrap();
1037        assert_eq!(response.status(), StatusCode::OK);
1038
1039        // Set next id
1040        let response = app
1041            .clone()
1042            .oneshot(
1043                Request::builder()
1044                    .method(Method::GET)
1045                    .uri("/admin/sequence/table/next-id")
1046                    .body(Body::empty())
1047                    .unwrap(),
1048            )
1049            .await
1050            .unwrap();
1051        assert_eq!(response.status(), StatusCode::OK);
1052        let body = into_bytes(response).await;
1053        let resp: NextTableIdResponse = serde_json::from_slice(&body).unwrap();
1054        assert_eq!(resp.next_table_id, 2048);
1055
1056        // Set recovery mode to false
1057        runtime_switch_manager.unset_recovery_mode().await.unwrap();
1058        // Set next id with recovery mode disabled
1059        let response = app
1060            .clone()
1061            .oneshot(
1062                Request::builder()
1063                    .method(Method::POST)
1064                    .header(http::header::CONTENT_TYPE, "application/json")
1065                    .uri("/admin/sequence/table/set-next-id")
1066                    .body(Body::from(r#"{"next_table_id": 2049}"#))
1067                    .unwrap(),
1068            )
1069            .await
1070            .unwrap();
1071        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
1072        let body = get_body_string(response).await;
1073        assert!(body.contains("Setting next table id is only allowed in recovery mode"));
1074    }
1075}