meta_srv/service/
admin.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod health;
16pub(crate) mod heartbeat;
17pub(crate) mod leader;
18pub(crate) mod maintenance;
19pub(crate) mod node_lease;
20pub(crate) mod procedure;
21pub(crate) mod recovery;
22pub(crate) mod sequencer;
23mod util;
24
25use std::collections::HashMap;
26use std::convert::Infallible;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use axum::{routing, Router as AxumRouter};
31use tonic::body::Body;
32use tonic::codegen::{http, BoxFuture, Service};
33use tonic::server::NamedService;
34
35use crate::metasrv::Metasrv;
36use crate::service::admin::heartbeat::HeartBeatHandler;
37use crate::service::admin::leader::LeaderHandler;
38use crate::service::admin::maintenance::MaintenanceHandler;
39use crate::service::admin::node_lease::NodeLeaseHandler;
40use crate::service::admin::procedure::ProcedureManagerHandler;
41use crate::service::admin::recovery::RecoveryHandler;
42use crate::service::admin::sequencer::TableIdSequenceHandler;
43
44/// Expose admin http service on rpc port(3002).
45///
46/// # Deprecated
47///
48/// This function is deprecated and will be removed in the future. Please use
49/// [`admin_axum_router`] instead.
50pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
51    let router = Router::new().route("/health", health::HealthHandler);
52
53    let router = router.route(
54        "/node-lease",
55        node_lease::NodeLeaseHandler {
56            meta_peer_client: metasrv.meta_peer_client().clone(),
57        },
58    );
59
60    let handler = heartbeat::HeartBeatHandler {
61        meta_peer_client: metasrv.meta_peer_client().clone(),
62    };
63    let router = router
64        .route("/heartbeat", handler.clone())
65        .route("/heartbeat/help", handler);
66
67    let router = router.route(
68        "/leader",
69        leader::LeaderHandler {
70            election: metasrv.election().cloned(),
71        },
72    );
73
74    let router = router.routes(
75        &[
76            "/maintenance",
77            "/maintenance/status",
78            "/maintenance/enable",
79            "/maintenance/disable",
80        ],
81        maintenance::MaintenanceHandler {
82            manager: metasrv.runtime_switch_manager().clone(),
83        },
84    );
85    let router = router.routes(
86        &[
87            "/procedure-manager/pause",
88            "/procedure-manager/resume",
89            "/procedure-manager/status",
90        ],
91        procedure::ProcedureManagerHandler {
92            manager: metasrv.runtime_switch_manager().clone(),
93        },
94    );
95    let router = Router::nest("/admin", router);
96
97    Admin::new(router)
98}
99
100#[async_trait::async_trait]
101pub trait HttpHandler: Send + Sync {
102    async fn handle(
103        &self,
104        path: &str,
105        method: http::Method,
106        params: &HashMap<String, String>,
107    ) -> crate::Result<http::Response<String>>;
108}
109
110#[derive(Clone)]
111pub struct Admin
112where
113    Self: Send,
114{
115    router: Arc<Router>,
116}
117
118impl Admin {
119    pub fn new(router: Router) -> Self {
120        Self {
121            router: Arc::new(router),
122        }
123    }
124}
125
126impl NamedService for Admin {
127    const NAME: &'static str = "admin";
128}
129
130impl Service<http::Request<Body>> for Admin {
131    type Response = http::Response<Body>;
132    type Error = Infallible;
133    type Future = BoxFuture<Self::Response, Self::Error>;
134
135    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
136        Poll::Ready(Ok(()))
137    }
138
139    fn call(&mut self, req: http::Request<Body>) -> Self::Future {
140        let router = self.router.clone();
141        let query_params = req
142            .uri()
143            .query()
144            .map(|q| {
145                url::form_urlencoded::parse(q.as_bytes())
146                    .into_owned()
147                    .collect()
148            })
149            .unwrap_or_default();
150        let path = req.uri().path().to_owned();
151        let method = req.method().clone();
152        Box::pin(async move { router.call(&path, method, query_params).await })
153    }
154}
155
156#[derive(Default)]
157pub struct Router {
158    handlers: HashMap<String, Arc<dyn HttpHandler>>,
159}
160
161impl Router {
162    pub fn new() -> Self {
163        Self {
164            handlers: HashMap::default(),
165        }
166    }
167
168    pub fn nest(path: &str, router: Router) -> Self {
169        check_path(path);
170
171        let handlers = router
172            .handlers
173            .into_iter()
174            .map(|(url, handler)| (format!("{path}{url}"), handler))
175            .collect();
176
177        Self { handlers }
178    }
179
180    pub fn route(mut self, path: &str, handler: impl HttpHandler + 'static) -> Self {
181        check_path(path);
182
183        let _ = self.handlers.insert(path.to_owned(), Arc::new(handler));
184
185        self
186    }
187
188    pub fn routes(mut self, paths: &[&str], handler: impl HttpHandler + 'static) -> Self {
189        let handler = Arc::new(handler);
190        for path in paths {
191            check_path(path);
192            let _ = self.handlers.insert(path.to_string(), handler.clone());
193        }
194
195        self
196    }
197
198    pub async fn call(
199        &self,
200        path: &str,
201        method: http::Method,
202        params: HashMap<String, String>,
203    ) -> Result<http::Response<Body>, Infallible> {
204        let handler = match self.handlers.get(path) {
205            Some(handler) => handler,
206            None => {
207                return Ok(http::Response::builder()
208                    .status(http::StatusCode::NOT_FOUND)
209                    .body(Body::empty())
210                    .unwrap())
211            }
212        };
213
214        let res = match handler.handle(path, method, &params).await {
215            Ok(res) => res.map(Body::new),
216            Err(e) => http::Response::builder()
217                .status(http::StatusCode::INTERNAL_SERVER_ERROR)
218                .body(Body::new(e.to_string()))
219                .unwrap(),
220        };
221
222        Ok(res)
223    }
224}
225
226fn check_path(path: &str) {
227    if path.is_empty() || !path.starts_with('/') {
228        panic!("paths must start with a `/`")
229    }
230}
231
232/// Expose admin HTTP endpoints as an Axum router for the main HTTP server.
233pub fn admin_axum_router(metasrv: Arc<Metasrv>) -> AxumRouter {
234    let node_lease_handler = NodeLeaseHandler {
235        meta_peer_client: metasrv.meta_peer_client().clone(),
236    };
237    let heartbeat_handler = HeartBeatHandler {
238        meta_peer_client: metasrv.meta_peer_client().clone(),
239    };
240    let leader_handler = LeaderHandler {
241        election: metasrv.election().cloned(),
242    };
243    let maintenance_handler = MaintenanceHandler {
244        manager: metasrv.runtime_switch_manager().clone(),
245    };
246    let procedure_handler = ProcedureManagerHandler {
247        manager: metasrv.runtime_switch_manager().clone(),
248    };
249    let recovery_handler = RecoveryHandler {
250        manager: metasrv.runtime_switch_manager().clone(),
251    };
252    let table_id_sequence_handler = TableIdSequenceHandler {
253        table_id_sequence: metasrv.table_id_sequence().clone(),
254        runtime_switch_manager: metasrv.runtime_switch_manager().clone(),
255    };
256
257    let admin_router = AxumRouter::new()
258        .route("/health", routing::get(health::health))
259        .route(
260            "/node-lease",
261            routing::get(node_lease::get).with_state(node_lease_handler),
262        )
263        .route(
264            "/leader",
265            routing::get(leader::get).with_state(leader_handler),
266        )
267        .nest(
268            "/heartbeat",
269            AxumRouter::new()
270                .route("/", routing::get(heartbeat::get))
271                .route("/help", routing::get(heartbeat::help))
272                .with_state(heartbeat_handler),
273        )
274        .nest(
275            "/maintenance",
276            AxumRouter::new()
277                .route("/", routing::get(maintenance::status))
278                .route("/status", routing::get(maintenance::status))
279                .route("/enable", routing::post(maintenance::set))
280                .route("/disable", routing::post(maintenance::unset))
281                .with_state(maintenance_handler),
282        )
283        .nest(
284            "/procedure-manager",
285            AxumRouter::new()
286                .route("/status", routing::get(procedure::status))
287                .route("/pause", routing::post(procedure::pause))
288                .route("/resume", routing::post(procedure::resume))
289                .with_state(procedure_handler),
290        )
291        .nest(
292            "/recovery",
293            AxumRouter::new()
294                .route("/status", routing::get(recovery::status))
295                .route("/enable", routing::post(recovery::set))
296                .route("/disable", routing::post(recovery::unset))
297                .with_state(recovery_handler),
298        )
299        .nest(
300            "/sequence",
301            AxumRouter::new().nest(
302                "/table",
303                AxumRouter::new()
304                    .route("/next-id", routing::get(sequencer::get_next_table_id))
305                    .route("/set-next-id", routing::post(sequencer::set_next_table_id))
306                    .with_state(table_id_sequence_handler.clone()),
307            ),
308        );
309
310    AxumRouter::new().nest("/admin", admin_router)
311}
312
313#[cfg(test)]
314mod tests {
315    use common_meta::kv_backend::memory::MemoryKvBackend;
316    use common_meta::kv_backend::KvBackendRef;
317    use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
318
319    use super::*;
320    use crate::metasrv::builder::MetasrvBuilder;
321    use crate::metasrv::MetasrvOptions;
322    use crate::{bootstrap, error};
323
324    struct MockOkHandler;
325
326    #[async_trait::async_trait]
327    impl HttpHandler for MockOkHandler {
328        async fn handle(
329            &self,
330            _: &str,
331            _: http::Method,
332            _: &HashMap<String, String>,
333        ) -> crate::Result<http::Response<String>> {
334            Ok(http::Response::builder()
335                .status(http::StatusCode::OK)
336                .body("Ok".to_string())
337                .unwrap())
338        }
339    }
340    struct MockEmptyKeyErrorHandler;
341
342    #[async_trait::async_trait]
343    impl HttpHandler for MockEmptyKeyErrorHandler {
344        async fn handle(
345            &self,
346            _: &str,
347            _: http::Method,
348            _: &HashMap<String, String>,
349        ) -> crate::Result<http::Response<String>> {
350            error::EmptyKeySnafu {}.fail()
351        }
352    }
353
354    #[test]
355    fn test_route_nest() {
356        let mock_handler = MockOkHandler {};
357        let router = Router::new().route("/test_node", mock_handler);
358        let router = Router::nest("/test_root", router);
359
360        assert_eq!(1, router.handlers.len());
361        assert!(router.handlers.contains_key("/test_root/test_node"));
362    }
363
364    #[should_panic]
365    #[test]
366    fn test_invalid_path() {
367        check_path("test_node")
368    }
369
370    #[should_panic]
371    #[test]
372    fn test_empty_path() {
373        check_path("")
374    }
375
376    #[tokio::test]
377    async fn test_route_call_ok() {
378        let mock_handler = MockOkHandler {};
379        let router = Router::new().route("/test_node", mock_handler);
380        let router = Router::nest("/test_root", router);
381
382        let res = router
383            .call(
384                "/test_root/test_node",
385                http::Method::GET,
386                HashMap::default(),
387            )
388            .await
389            .unwrap();
390
391        assert!(res.status().is_success());
392    }
393
394    #[tokio::test]
395    async fn test_route_call_no_handler() {
396        let router = Router::new();
397
398        let res = router
399            .call(
400                "/test_root/test_node",
401                http::Method::GET,
402                HashMap::default(),
403            )
404            .await
405            .unwrap();
406
407        assert_eq!(http::StatusCode::NOT_FOUND, res.status());
408    }
409
410    #[tokio::test]
411    async fn test_route_call_err() {
412        let mock_handler = MockEmptyKeyErrorHandler {};
413        let router = Router::new().route("/test_node", mock_handler);
414        let router = Router::nest("/test_root", router);
415
416        let res = router
417            .call(
418                "/test_root/test_node",
419                http::Method::GET,
420                HashMap::default(),
421            )
422            .await
423            .unwrap();
424
425        assert_eq!(http::StatusCode::INTERNAL_SERVER_ERROR, res.status());
426    }
427
428    async fn test_metasrv(kv_backend: KvBackendRef) -> Metasrv {
429        let opts = MetasrvOptions::default();
430        let builder = MetasrvBuilder::new()
431            .options(opts)
432            .kv_backend(kv_backend.clone());
433
434        let metasrv = builder.build().await.unwrap();
435        metasrv
436    }
437
438    async fn send_request(client: &mut DuplexStream, request: &[u8]) -> String {
439        client.write_all(request).await.unwrap();
440        let mut buf = vec![0; 1024];
441        let n = client.read(&mut buf).await.unwrap();
442        String::from_utf8_lossy(&buf[..n]).to_string()
443    }
444
445    #[tokio::test(flavor = "multi_thread")]
446    async fn test_metasrv_maintenance_mode() {
447        common_telemetry::init_default_ut_logging();
448        let kv_backend = Arc::new(MemoryKvBackend::new());
449        let metasrv = test_metasrv(kv_backend).await;
450        metasrv.try_start().await.unwrap();
451
452        let (mut client, server) = tokio::io::duplex(1024);
453        let metasrv = Arc::new(metasrv);
454        let service = metasrv.clone();
455        let _handle = tokio::spawn(async move {
456            let router = bootstrap::router(service);
457            router
458                .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
459                .await
460        });
461
462        // Get maintenance mode
463        let response = send_request(
464            &mut client,
465            b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
466        )
467        .await;
468        assert!(response.contains(r#"{"enabled":false}"#));
469        assert!(response.contains("200 OK"));
470
471        // Set maintenance mode to true
472        let response = send_request(
473            &mut client,
474            b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
475        )
476        .await;
477        assert!(response.contains(r#"{"enabled":true}"#));
478        assert!(response.contains("200 OK"));
479
480        let enabled = metasrv
481            .runtime_switch_manager()
482            .maintenance_mode()
483            .await
484            .unwrap();
485        assert!(enabled);
486
487        // Get maintenance mode again
488        let response = send_request(
489            &mut client,
490            b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
491        )
492        .await;
493        assert!(response.contains(r#"{"enabled":true}"#));
494        assert!(response.contains("200 OK"));
495
496        // Set maintenance mode to false
497        let response = send_request(
498            &mut client,
499            b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
500        )
501        .await;
502        assert!(response.contains(r#"{"enabled":false}"#));
503        assert!(response.contains("200 OK"));
504
505        let enabled = metasrv
506            .runtime_switch_manager()
507            .maintenance_mode()
508            .await
509            .unwrap();
510        assert!(!enabled);
511
512        // Set maintenance mode to true via GET request
513        let response = send_request(
514            &mut client,
515            b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n",
516        )
517        .await;
518        assert!(response.contains(r#"{"enabled":true}"#));
519        assert!(response.contains("200 OK"));
520
521        // Set maintenance mode to false via GET request
522        let response = send_request(
523            &mut client,
524            b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n",
525        )
526        .await;
527        assert!(response.contains(r#"{"enabled":false}"#));
528        assert!(response.contains("200 OK"));
529
530        // Get maintenance mode via status path
531        let response = send_request(
532            &mut client,
533            b"GET /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
534        )
535        .await;
536        assert!(response.contains(r#"{"enabled":false}"#));
537
538        // Set maintenance mode via enable path
539        let response = send_request(
540            &mut client,
541            b"POST /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
542        )
543        .await;
544        assert!(response.contains(r#"{"enabled":true}"#));
545
546        // Unset maintenance mode via disable path
547        let response = send_request(
548            &mut client,
549            b"POST /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
550        )
551        .await;
552        assert!(response.contains(r#"{"enabled":false}"#));
553
554        // send POST request to status path
555        let response = send_request(
556            &mut client,
557            b"POST /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
558        )
559        .await;
560        assert!(response.contains("404 Not Found"));
561
562        // send GET request to enable path
563        let response = send_request(
564            &mut client,
565            b"GET /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
566        )
567        .await;
568        assert!(response.contains("404 Not Found"));
569
570        // send GET request to disable path
571        let response = send_request(
572            &mut client,
573            b"GET /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
574        )
575        .await;
576        assert!(response.contains("404 Not Found"));
577    }
578
579    #[tokio::test(flavor = "multi_thread")]
580    async fn test_metasrv_procedure_manager_handler() {
581        common_telemetry::init_default_ut_logging();
582        let kv_backend = Arc::new(MemoryKvBackend::new());
583        let metasrv = test_metasrv(kv_backend).await;
584        metasrv.try_start().await.unwrap();
585
586        let (mut client, server) = tokio::io::duplex(1024);
587        let metasrv = Arc::new(metasrv);
588        let service = metasrv.clone();
589        let _handle = tokio::spawn(async move {
590            let router = bootstrap::router(service);
591            router
592                .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
593                .await
594        });
595
596        // send GET request to procedure-manager/status path
597        let response = send_request(
598            &mut client,
599            b"GET /admin/procedure-manager/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
600        )
601        .await;
602        assert!(response.contains("200 OK"));
603        assert!(
604            response.contains(r#"{"status":"running"}"#),
605            "response: {}",
606            response
607        );
608
609        // send POST request to procedure-manager/pause path
610        let response = send_request(
611            &mut client,
612            b"POST /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
613        )
614        .await;
615        assert!(response.contains("200 OK"));
616        assert!(response.contains(r#"{"status":"paused"}"#));
617
618        // send POST request to procedure-manager/resume path
619        let response = send_request(
620            &mut client,
621            b"POST /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
622        )
623        .await;
624        assert!(response.contains("200 OK"));
625        assert!(
626            response.contains(r#"{"status":"running"}"#),
627            "response: {}",
628            response
629        );
630
631        // send GET request to procedure-manager/resume path
632        let response = send_request(
633            &mut client,
634            b"GET /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
635        )
636        .await;
637        assert!(response.contains("404 Not Found"));
638
639        // send GET request to procedure-manager/pause path
640        let response = send_request(
641            &mut client,
642            b"GET /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
643        )
644        .await;
645        assert!(response.contains("404 Not Found"));
646    }
647}
648
649#[cfg(test)]
650mod axum_admin_tests {
651    use std::sync::Arc;
652
653    use axum::body::{to_bytes, Body};
654    use axum::http::{Method, Request, StatusCode};
655    use common_meta::kv_backend::memory::MemoryKvBackend;
656    use tower::ServiceExt; // for `oneshot`
657
658    use super::*;
659    use crate::metasrv::builder::MetasrvBuilder;
660    use crate::metasrv::MetasrvOptions;
661    use crate::service::admin::sequencer::NextTableIdResponse;
662
663    async fn setup_axum_app() -> AxumRouter {
664        let kv_backend = Arc::new(MemoryKvBackend::new());
665        let metasrv = MetasrvBuilder::new()
666            .options(MetasrvOptions::default())
667            .kv_backend(kv_backend)
668            .build()
669            .await
670            .unwrap();
671        let metasrv = Arc::new(metasrv);
672        admin_axum_router(metasrv)
673    }
674
675    async fn get_body_string(resp: axum::response::Response) -> String {
676        let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
677        String::from_utf8_lossy(&body_bytes).to_string()
678    }
679
680    async fn into_bytes(resp: axum::response::Response) -> Vec<u8> {
681        let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
682        body_bytes.to_vec()
683    }
684
685    #[tokio::test]
686    async fn test_admin_health() {
687        let app = setup_axum_app().await;
688        let response = app
689            .oneshot(
690                Request::builder()
691                    .uri("/admin/health")
692                    .body(Body::empty())
693                    .unwrap(),
694            )
695            .await
696            .unwrap();
697        assert_eq!(response.status(), StatusCode::OK);
698        let body = get_body_string(response).await;
699        assert!(body.to_lowercase().contains("ok"));
700    }
701
702    #[tokio::test]
703    async fn test_admin_node_lease() {
704        let app = setup_axum_app().await;
705        let response = app
706            .oneshot(
707                Request::builder()
708                    .uri("/admin/node-lease")
709                    .body(Body::empty())
710                    .unwrap(),
711            )
712            .await
713            .unwrap();
714        assert_eq!(response.status(), StatusCode::OK);
715    }
716
717    #[tokio::test]
718    async fn test_admin_heartbeat() {
719        let app = setup_axum_app().await;
720        let response = app
721            .oneshot(
722                Request::builder()
723                    .uri("/admin/heartbeat")
724                    .body(Body::empty())
725                    .unwrap(),
726            )
727            .await
728            .unwrap();
729        assert_eq!(response.status(), StatusCode::OK);
730    }
731
732    #[tokio::test]
733    async fn test_admin_heartbeat_help() {
734        let app = setup_axum_app().await;
735        let response = app
736            .oneshot(
737                Request::builder()
738                    .uri("/admin/heartbeat/help")
739                    .body(Body::empty())
740                    .unwrap(),
741            )
742            .await
743            .unwrap();
744        assert_eq!(response.status(), StatusCode::OK);
745    }
746
747    #[tokio::test]
748    async fn test_admin_leader() {
749        let app = setup_axum_app().await;
750        let response = app
751            .oneshot(
752                Request::builder()
753                    .uri("/admin/leader")
754                    .body(Body::empty())
755                    .unwrap(),
756            )
757            .await
758            .unwrap();
759        assert_eq!(response.status(), StatusCode::OK);
760    }
761
762    #[tokio::test]
763    async fn test_admin_maintenance() {
764        let app = setup_axum_app().await;
765        let response = app
766            .oneshot(
767                Request::builder()
768                    .uri("/admin/maintenance")
769                    .body(Body::empty())
770                    .unwrap(),
771            )
772            .await
773            .unwrap();
774        assert_eq!(response.status(), StatusCode::OK);
775        let body = get_body_string(response).await;
776        assert!(body.contains("enabled"));
777    }
778
779    #[tokio::test]
780    async fn test_admin_maintenance_status() {
781        let app = setup_axum_app().await;
782        let response = app
783            .oneshot(
784                Request::builder()
785                    .uri("/admin/maintenance/status")
786                    .body(Body::empty())
787                    .unwrap(),
788            )
789            .await
790            .unwrap();
791        assert_eq!(response.status(), StatusCode::OK);
792        let body = get_body_string(response).await;
793        assert!(body.contains("enabled"));
794    }
795
796    #[tokio::test]
797    async fn test_admin_maintenance_enable_disable() {
798        // Enable maintenance
799        let response = setup_axum_app()
800            .await
801            .oneshot(
802                Request::builder()
803                    .method(Method::POST)
804                    .uri("/admin/maintenance/enable")
805                    .body(Body::empty())
806                    .unwrap(),
807            )
808            .await
809            .unwrap();
810        assert_eq!(response.status(), StatusCode::OK);
811        let body = get_body_string(response).await;
812        assert!(body.contains("enabled"));
813        // Disable maintenance
814        let response = setup_axum_app()
815            .await
816            .oneshot(
817                Request::builder()
818                    .method(Method::POST)
819                    .uri("/admin/maintenance/disable")
820                    .body(Body::empty())
821                    .unwrap(),
822            )
823            .await
824            .unwrap();
825        assert_eq!(response.status(), StatusCode::OK);
826        let body = get_body_string(response).await;
827        assert!(body.contains("enabled"));
828    }
829
830    #[tokio::test]
831    async fn test_admin_procedure_manager_status() {
832        let app = setup_axum_app().await;
833        let response = app
834            .oneshot(
835                Request::builder()
836                    .uri("/admin/procedure-manager/status")
837                    .body(Body::empty())
838                    .unwrap(),
839            )
840            .await
841            .unwrap();
842        assert_eq!(response.status(), StatusCode::OK);
843        let body = get_body_string(response).await;
844        assert!(body.contains("status"));
845    }
846
847    #[tokio::test]
848    async fn test_admin_procedure_manager_pause_resume() {
849        // Pause
850        let response = setup_axum_app()
851            .await
852            .oneshot(
853                Request::builder()
854                    .method(Method::POST)
855                    .uri("/admin/procedure-manager/pause")
856                    .body(Body::empty())
857                    .unwrap(),
858            )
859            .await
860            .unwrap();
861        assert_eq!(response.status(), StatusCode::OK);
862        let body = get_body_string(response).await;
863        assert!(body.contains("paused"));
864        // Resume
865        let response = setup_axum_app()
866            .await
867            .oneshot(
868                Request::builder()
869                    .method(Method::POST)
870                    .uri("/admin/procedure-manager/resume")
871                    .body(Body::empty())
872                    .unwrap(),
873            )
874            .await
875            .unwrap();
876        assert_eq!(response.status(), StatusCode::OK);
877        let body = get_body_string(response).await;
878        assert!(body.contains("running"));
879    }
880
881    #[tokio::test]
882    async fn test_admin_recovery() {
883        let app = setup_axum_app().await;
884        let response = app
885            .clone()
886            .oneshot(
887                Request::builder()
888                    .uri("/admin/recovery/status")
889                    .method(Method::GET)
890                    .body(Body::empty())
891                    .unwrap(),
892            )
893            .await
894            .unwrap();
895        assert_eq!(response.status(), StatusCode::OK);
896        let body = get_body_string(response).await;
897        assert!(body.contains("false"));
898
899        // Enable recovery
900        let response = app
901            .clone()
902            .oneshot(
903                Request::builder()
904                    .uri("/admin/recovery/enable")
905                    .method(Method::POST)
906                    .body(Body::empty())
907                    .unwrap(),
908            )
909            .await
910            .unwrap();
911        assert_eq!(response.status(), StatusCode::OK);
912        let body = get_body_string(response).await;
913        assert!(body.contains("true"));
914
915        let response = app
916            .clone()
917            .oneshot(
918                Request::builder()
919                    .uri("/admin/recovery/status")
920                    .method(Method::GET)
921                    .body(Body::empty())
922                    .unwrap(),
923            )
924            .await
925            .unwrap();
926        assert_eq!(response.status(), StatusCode::OK);
927        let body = get_body_string(response).await;
928        assert!(body.contains("true"));
929
930        // Disable recovery
931        let response = app
932            .clone()
933            .oneshot(
934                Request::builder()
935                    .uri("/admin/recovery/disable")
936                    .method(Method::POST)
937                    .body(Body::empty())
938                    .unwrap(),
939            )
940            .await
941            .unwrap();
942        assert_eq!(response.status(), StatusCode::OK);
943        let body = get_body_string(response).await;
944        assert!(body.contains("false"));
945
946        let response = app
947            .clone()
948            .oneshot(
949                Request::builder()
950                    .uri("/admin/recovery/status")
951                    .method(Method::GET)
952                    .body(Body::empty())
953                    .unwrap(),
954            )
955            .await
956            .unwrap();
957        assert_eq!(response.status(), StatusCode::OK);
958        let body = get_body_string(response).await;
959        assert!(body.contains("false"));
960    }
961
962    #[tokio::test]
963    async fn test_admin_sequence_table_id() {
964        common_telemetry::init_default_ut_logging();
965        let kv_backend = Arc::new(MemoryKvBackend::new());
966        let metasrv = MetasrvBuilder::new()
967            .options(MetasrvOptions::default())
968            .kv_backend(kv_backend)
969            .build()
970            .await
971            .unwrap();
972        let metasrv = Arc::new(metasrv);
973        let runtime_switch_manager = metasrv.runtime_switch_manager().clone();
974        let app = admin_axum_router(metasrv);
975        // Set recovery mode to true
976        runtime_switch_manager.set_recovery_mode().await.unwrap();
977        let response = app
978            .clone()
979            .oneshot(
980                Request::builder()
981                    .method(Method::GET)
982                    .uri("/admin/sequence/table/next-id")
983                    .body(Body::empty())
984                    .unwrap(),
985            )
986            .await
987            .unwrap();
988        assert_eq!(response.status(), StatusCode::OK);
989        let body = into_bytes(response).await;
990        let resp: NextTableIdResponse = serde_json::from_slice(&body).unwrap();
991        assert_eq!(resp.next_table_id, 1024);
992
993        // Bad request
994        let response = app
995            .clone()
996            .oneshot(
997                Request::builder()
998                    .method(Method::POST)
999                    .header(http::header::CONTENT_TYPE, "application/json")
1000                    .uri("/admin/sequence/table/set-next-id")
1001                    .body(Body::empty())
1002                    .unwrap(),
1003            )
1004            .await
1005            .unwrap();
1006        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
1007
1008        // Bad next id
1009        let response = app
1010            .clone()
1011            .oneshot(
1012                Request::builder()
1013                    .method(Method::POST)
1014                    .header(http::header::CONTENT_TYPE, "application/json")
1015                    .uri("/admin/sequence/table/set-next-id")
1016                    .body(Body::from(r#"{"next_table_id": 0}"#))
1017                    .unwrap(),
1018            )
1019            .await
1020            .unwrap();
1021        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
1022        let body = get_body_string(response).await;
1023        assert!(body.contains("is not greater than the current next value"));
1024
1025        // Set next id
1026        let response = app
1027            .clone()
1028            .oneshot(
1029                Request::builder()
1030                    .method(Method::POST)
1031                    .header(http::header::CONTENT_TYPE, "application/json")
1032                    .uri("/admin/sequence/table/set-next-id")
1033                    .body(Body::from(r#"{"next_table_id": 2048}"#))
1034                    .unwrap(),
1035            )
1036            .await
1037            .unwrap();
1038        assert_eq!(response.status(), StatusCode::OK);
1039
1040        // Set next id
1041        let response = app
1042            .clone()
1043            .oneshot(
1044                Request::builder()
1045                    .method(Method::GET)
1046                    .uri("/admin/sequence/table/next-id")
1047                    .body(Body::empty())
1048                    .unwrap(),
1049            )
1050            .await
1051            .unwrap();
1052        assert_eq!(response.status(), StatusCode::OK);
1053        let body = into_bytes(response).await;
1054        let resp: NextTableIdResponse = serde_json::from_slice(&body).unwrap();
1055        assert_eq!(resp.next_table_id, 2048);
1056
1057        // Set recovery mode to false
1058        runtime_switch_manager.unset_recovery_mode().await.unwrap();
1059        // Set next id with recovery mode disabled
1060        let response = app
1061            .clone()
1062            .oneshot(
1063                Request::builder()
1064                    .method(Method::POST)
1065                    .header(http::header::CONTENT_TYPE, "application/json")
1066                    .uri("/admin/sequence/table/set-next-id")
1067                    .body(Body::from(r#"{"next_table_id": 2049}"#))
1068                    .unwrap(),
1069            )
1070            .await
1071            .unwrap();
1072        assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
1073        let body = get_body_string(response).await;
1074        assert!(body.contains("Setting next table id is only allowed in recovery mode"));
1075    }
1076}