meta_srv/service/
admin.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod health;
16mod heartbeat;
17mod leader;
18mod maintenance;
19mod node_lease;
20mod procedure;
21mod util;
22
23use std::collections::HashMap;
24use std::convert::Infallible;
25use std::sync::Arc;
26use std::task::{Context, Poll};
27
28use bytes::Bytes;
29use http_body_util::{BodyExt, Full};
30use tonic::body::BoxBody;
31use tonic::codegen::{empty_body, http, BoxFuture, Service};
32use tonic::server::NamedService;
33
34use crate::metasrv::Metasrv;
35
36pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
37    let router = Router::new().route("/health", health::HealthHandler);
38
39    let router = router.route(
40        "/node-lease",
41        node_lease::NodeLeaseHandler {
42            meta_peer_client: metasrv.meta_peer_client().clone(),
43        },
44    );
45
46    let handler = heartbeat::HeartBeatHandler {
47        meta_peer_client: metasrv.meta_peer_client().clone(),
48    };
49    let router = router
50        .route("/heartbeat", handler.clone())
51        .route("/heartbeat/help", handler);
52
53    let router = router.route(
54        "/leader",
55        leader::LeaderHandler {
56            election: metasrv.election().cloned(),
57        },
58    );
59
60    let router = router.routes(
61        &[
62            "/maintenance",
63            "/maintenance/status",
64            "/maintenance/enable",
65            "/maintenance/disable",
66        ],
67        maintenance::MaintenanceHandler {
68            manager: metasrv.runtime_switch_manager().clone(),
69        },
70    );
71    let router = router.routes(
72        &[
73            "/procedure-manager/pause",
74            "/procedure-manager/resume",
75            "/procedure-manager/status",
76        ],
77        procedure::ProcedureManagerHandler {
78            manager: metasrv.runtime_switch_manager().clone(),
79        },
80    );
81    let router = Router::nest("/admin", router);
82
83    Admin::new(router)
84}
85
86#[async_trait::async_trait]
87pub trait HttpHandler: Send + Sync {
88    async fn handle(
89        &self,
90        path: &str,
91        method: http::Method,
92        params: &HashMap<String, String>,
93    ) -> crate::Result<http::Response<String>>;
94}
95
96#[derive(Clone)]
97pub struct Admin
98where
99    Self: Send,
100{
101    router: Arc<Router>,
102}
103
104impl Admin {
105    pub fn new(router: Router) -> Self {
106        Self {
107            router: Arc::new(router),
108        }
109    }
110}
111
112impl NamedService for Admin {
113    const NAME: &'static str = "admin";
114}
115
116impl Service<http::Request<BoxBody>> for Admin {
117    type Response = http::Response<BoxBody>;
118    type Error = Infallible;
119    type Future = BoxFuture<Self::Response, Self::Error>;
120
121    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
122        Poll::Ready(Ok(()))
123    }
124
125    fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
126        let router = self.router.clone();
127        let query_params = req
128            .uri()
129            .query()
130            .map(|q| {
131                url::form_urlencoded::parse(q.as_bytes())
132                    .into_owned()
133                    .collect()
134            })
135            .unwrap_or_default();
136        let path = req.uri().path().to_owned();
137        let method = req.method().clone();
138        Box::pin(async move { router.call(&path, method, query_params).await })
139    }
140}
141
142#[derive(Default)]
143pub struct Router {
144    handlers: HashMap<String, Arc<dyn HttpHandler>>,
145}
146
147impl Router {
148    pub fn new() -> Self {
149        Self {
150            handlers: HashMap::default(),
151        }
152    }
153
154    pub fn nest(path: &str, router: Router) -> Self {
155        check_path(path);
156
157        let handlers = router
158            .handlers
159            .into_iter()
160            .map(|(url, handler)| (format!("{path}{url}"), handler))
161            .collect();
162
163        Self { handlers }
164    }
165
166    pub fn route(mut self, path: &str, handler: impl HttpHandler + 'static) -> Self {
167        check_path(path);
168
169        let _ = self.handlers.insert(path.to_owned(), Arc::new(handler));
170
171        self
172    }
173
174    pub fn routes(mut self, paths: &[&str], handler: impl HttpHandler + 'static) -> Self {
175        let handler = Arc::new(handler);
176        for path in paths {
177            check_path(path);
178            let _ = self.handlers.insert(path.to_string(), handler.clone());
179        }
180
181        self
182    }
183
184    pub async fn call(
185        &self,
186        path: &str,
187        method: http::Method,
188        params: HashMap<String, String>,
189    ) -> Result<http::Response<BoxBody>, Infallible> {
190        let handler = match self.handlers.get(path) {
191            Some(handler) => handler,
192            None => {
193                return Ok(http::Response::builder()
194                    .status(http::StatusCode::NOT_FOUND)
195                    .body(empty_body())
196                    .unwrap())
197            }
198        };
199
200        let res = match handler.handle(path, method, &params).await {
201            Ok(res) => res.map(boxed),
202            Err(e) => http::Response::builder()
203                .status(http::StatusCode::INTERNAL_SERVER_ERROR)
204                .body(boxed(e.to_string()))
205                .unwrap(),
206        };
207
208        Ok(res)
209    }
210}
211
212fn check_path(path: &str) {
213    if path.is_empty() || !path.starts_with('/') {
214        panic!("paths must start with a `/`")
215    }
216}
217
218/// Returns a [BoxBody] from a string.
219/// The implementation follows [empty_body()].
220fn boxed(body: String) -> BoxBody {
221    Full::new(Bytes::from(body))
222        .map_err(|err| match err {})
223        .boxed_unsync()
224}
225
226#[cfg(test)]
227mod tests {
228    use common_meta::kv_backend::memory::MemoryKvBackend;
229    use common_meta::kv_backend::KvBackendRef;
230    use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
231
232    use super::*;
233    use crate::metasrv::builder::MetasrvBuilder;
234    use crate::metasrv::MetasrvOptions;
235    use crate::{bootstrap, error};
236
237    struct MockOkHandler;
238
239    #[async_trait::async_trait]
240    impl HttpHandler for MockOkHandler {
241        async fn handle(
242            &self,
243            _: &str,
244            _: http::Method,
245            _: &HashMap<String, String>,
246        ) -> crate::Result<http::Response<String>> {
247            Ok(http::Response::builder()
248                .status(http::StatusCode::OK)
249                .body("Ok".to_string())
250                .unwrap())
251        }
252    }
253    struct MockEmptyKeyErrorHandler;
254
255    #[async_trait::async_trait]
256    impl HttpHandler for MockEmptyKeyErrorHandler {
257        async fn handle(
258            &self,
259            _: &str,
260            _: http::Method,
261            _: &HashMap<String, String>,
262        ) -> crate::Result<http::Response<String>> {
263            error::EmptyKeySnafu {}.fail()
264        }
265    }
266
267    #[test]
268    fn test_route_nest() {
269        let mock_handler = MockOkHandler {};
270        let router = Router::new().route("/test_node", mock_handler);
271        let router = Router::nest("/test_root", router);
272
273        assert_eq!(1, router.handlers.len());
274        assert!(router.handlers.contains_key("/test_root/test_node"));
275    }
276
277    #[should_panic]
278    #[test]
279    fn test_invalid_path() {
280        check_path("test_node")
281    }
282
283    #[should_panic]
284    #[test]
285    fn test_empty_path() {
286        check_path("")
287    }
288
289    #[tokio::test]
290    async fn test_route_call_ok() {
291        let mock_handler = MockOkHandler {};
292        let router = Router::new().route("/test_node", mock_handler);
293        let router = Router::nest("/test_root", router);
294
295        let res = router
296            .call(
297                "/test_root/test_node",
298                http::Method::GET,
299                HashMap::default(),
300            )
301            .await
302            .unwrap();
303
304        assert!(res.status().is_success());
305    }
306
307    #[tokio::test]
308    async fn test_route_call_no_handler() {
309        let router = Router::new();
310
311        let res = router
312            .call(
313                "/test_root/test_node",
314                http::Method::GET,
315                HashMap::default(),
316            )
317            .await
318            .unwrap();
319
320        assert_eq!(http::StatusCode::NOT_FOUND, res.status());
321    }
322
323    #[tokio::test]
324    async fn test_route_call_err() {
325        let mock_handler = MockEmptyKeyErrorHandler {};
326        let router = Router::new().route("/test_node", mock_handler);
327        let router = Router::nest("/test_root", router);
328
329        let res = router
330            .call(
331                "/test_root/test_node",
332                http::Method::GET,
333                HashMap::default(),
334            )
335            .await
336            .unwrap();
337
338        assert_eq!(http::StatusCode::INTERNAL_SERVER_ERROR, res.status());
339    }
340
341    async fn test_metasrv(kv_backend: KvBackendRef) -> Metasrv {
342        let opts = MetasrvOptions::default();
343        let builder = MetasrvBuilder::new()
344            .options(opts)
345            .kv_backend(kv_backend.clone());
346
347        let metasrv = builder.build().await.unwrap();
348        metasrv
349    }
350
351    async fn send_request(client: &mut DuplexStream, request: &[u8]) -> String {
352        client.write_all(request).await.unwrap();
353        let mut buf = vec![0; 1024];
354        let n = client.read(&mut buf).await.unwrap();
355        String::from_utf8_lossy(&buf[..n]).to_string()
356    }
357
358    #[tokio::test(flavor = "multi_thread")]
359    async fn test_metasrv_maintenance_mode() {
360        common_telemetry::init_default_ut_logging();
361        let kv_backend = Arc::new(MemoryKvBackend::new());
362        let metasrv = test_metasrv(kv_backend).await;
363        metasrv.try_start().await.unwrap();
364
365        let (mut client, server) = tokio::io::duplex(1024);
366        let metasrv = Arc::new(metasrv);
367        let service = metasrv.clone();
368        let _handle = tokio::spawn(async move {
369            let router = bootstrap::router(service);
370            router
371                .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
372                .await
373        });
374
375        // Get maintenance mode
376        let response = send_request(
377            &mut client,
378            b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
379        )
380        .await;
381        assert!(response.contains(r#"{"enabled":false}"#));
382        assert!(response.contains("200 OK"));
383
384        // Set maintenance mode to true
385        let response = send_request(
386            &mut client,
387            b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
388        )
389        .await;
390        assert!(response.contains(r#"{"enabled":true}"#));
391        assert!(response.contains("200 OK"));
392
393        let enabled = metasrv
394            .runtime_switch_manager()
395            .maintenance_mode()
396            .await
397            .unwrap();
398        assert!(enabled);
399
400        // Get maintenance mode again
401        let response = send_request(
402            &mut client,
403            b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
404        )
405        .await;
406        assert!(response.contains(r#"{"enabled":true}"#));
407        assert!(response.contains("200 OK"));
408
409        // Set maintenance mode to false
410        let response = send_request(
411            &mut client,
412            b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
413        )
414        .await;
415        assert!(response.contains(r#"{"enabled":false}"#));
416        assert!(response.contains("200 OK"));
417
418        let enabled = metasrv
419            .runtime_switch_manager()
420            .maintenance_mode()
421            .await
422            .unwrap();
423        assert!(!enabled);
424
425        // Set maintenance mode to true via GET request
426        let response = send_request(
427            &mut client,
428            b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n",
429        )
430        .await;
431        assert!(response.contains(r#"{"enabled":true}"#));
432        assert!(response.contains("200 OK"));
433
434        // Set maintenance mode to false via GET request
435        let response = send_request(
436            &mut client,
437            b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n",
438        )
439        .await;
440        assert!(response.contains(r#"{"enabled":false}"#));
441        assert!(response.contains("200 OK"));
442
443        // Get maintenance mode via status path
444        let response = send_request(
445            &mut client,
446            b"GET /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
447        )
448        .await;
449        assert!(response.contains(r#"{"enabled":false}"#));
450
451        // Set maintenance mode via enable path
452        let response = send_request(
453            &mut client,
454            b"POST /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
455        )
456        .await;
457        assert!(response.contains(r#"{"enabled":true}"#));
458
459        // Unset maintenance mode via disable path
460        let response = send_request(
461            &mut client,
462            b"POST /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
463        )
464        .await;
465        assert!(response.contains(r#"{"enabled":false}"#));
466
467        // send POST request to status path
468        let response = send_request(
469            &mut client,
470            b"POST /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
471        )
472        .await;
473        assert!(response.contains("404 Not Found"));
474
475        // send GET request to enable path
476        let response = send_request(
477            &mut client,
478            b"GET /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
479        )
480        .await;
481        assert!(response.contains("404 Not Found"));
482
483        // send GET request to disable path
484        let response = send_request(
485            &mut client,
486            b"GET /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
487        )
488        .await;
489        assert!(response.contains("404 Not Found"));
490    }
491
492    #[tokio::test(flavor = "multi_thread")]
493    async fn test_metasrv_procedure_manager_handler() {
494        common_telemetry::init_default_ut_logging();
495        let kv_backend = Arc::new(MemoryKvBackend::new());
496        let metasrv = test_metasrv(kv_backend).await;
497        metasrv.try_start().await.unwrap();
498
499        let (mut client, server) = tokio::io::duplex(1024);
500        let metasrv = Arc::new(metasrv);
501        let service = metasrv.clone();
502        let _handle = tokio::spawn(async move {
503            let router = bootstrap::router(service);
504            router
505                .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
506                .await
507        });
508
509        // send GET request to procedure-manager/status path
510        let response = send_request(
511            &mut client,
512            b"GET /admin/procedure-manager/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
513        )
514        .await;
515        assert!(response.contains("200 OK"));
516        assert!(
517            response.contains(r#"{"status":"running"}"#),
518            "response: {}",
519            response
520        );
521
522        // send POST request to procedure-manager/pause path
523        let response = send_request(
524            &mut client,
525            b"POST /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
526        )
527        .await;
528        assert!(response.contains("200 OK"));
529        assert!(response.contains(r#"{"status":"paused"}"#));
530
531        // send POST request to procedure-manager/resume path
532        let response = send_request(
533            &mut client,
534            b"POST /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
535        )
536        .await;
537        assert!(response.contains("200 OK"));
538        assert!(
539            response.contains(r#"{"status":"running"}"#),
540            "response: {}",
541            response
542        );
543
544        // send GET request to procedure-manager/resume path
545        let response = send_request(
546            &mut client,
547            b"GET /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
548        )
549        .await;
550        assert!(response.contains("404 Not Found"));
551
552        // send GET request to procedure-manager/pause path
553        let response = send_request(
554            &mut client,
555            b"GET /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
556        )
557        .await;
558        assert!(response.contains("404 Not Found"));
559    }
560}