meta_srv/service/
admin.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod health;
16mod heartbeat;
17mod leader;
18mod maintenance;
19mod node_lease;
20mod util;
21
22use std::collections::HashMap;
23use std::convert::Infallible;
24use std::sync::Arc;
25use std::task::{Context, Poll};
26
27use bytes::Bytes;
28use http_body_util::{BodyExt, Full};
29use tonic::body::BoxBody;
30use tonic::codegen::{empty_body, http, BoxFuture, Service};
31use tonic::server::NamedService;
32
33use crate::metasrv::Metasrv;
34
35pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
36    let router = Router::new().route("/health", health::HealthHandler);
37
38    let router = router.route(
39        "/node-lease",
40        node_lease::NodeLeaseHandler {
41            meta_peer_client: metasrv.meta_peer_client().clone(),
42        },
43    );
44
45    let handler = heartbeat::HeartBeatHandler {
46        meta_peer_client: metasrv.meta_peer_client().clone(),
47    };
48    let router = router
49        .route("/heartbeat", handler.clone())
50        .route("/heartbeat/help", handler);
51
52    let router = router.route(
53        "/leader",
54        leader::LeaderHandler {
55            election: metasrv.election().cloned(),
56        },
57    );
58
59    let router = router.route(
60        "/maintenance",
61        maintenance::MaintenanceHandler {
62            manager: metasrv.maintenance_mode_manager().clone(),
63        },
64    );
65    let router = Router::nest("/admin", router);
66
67    Admin::new(router)
68}
69
70#[async_trait::async_trait]
71pub trait HttpHandler: Send + Sync {
72    async fn handle(
73        &self,
74        path: &str,
75        method: http::Method,
76        params: &HashMap<String, String>,
77    ) -> crate::Result<http::Response<String>>;
78}
79
80#[derive(Clone)]
81pub struct Admin
82where
83    Self: Send,
84{
85    router: Arc<Router>,
86}
87
88impl Admin {
89    pub fn new(router: Router) -> Self {
90        Self {
91            router: Arc::new(router),
92        }
93    }
94}
95
96impl NamedService for Admin {
97    const NAME: &'static str = "admin";
98}
99
100impl<T> Service<http::Request<T>> for Admin
101where
102    T: Send,
103{
104    type Response = http::Response<BoxBody>;
105    type Error = Infallible;
106    type Future = BoxFuture<Self::Response, Self::Error>;
107
108    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
109        Poll::Ready(Ok(()))
110    }
111
112    fn call(&mut self, req: http::Request<T>) -> Self::Future {
113        let router = self.router.clone();
114        let query_params = req
115            .uri()
116            .query()
117            .map(|q| {
118                url::form_urlencoded::parse(q.as_bytes())
119                    .into_owned()
120                    .collect()
121            })
122            .unwrap_or_default();
123        let path = req.uri().path().to_owned();
124        let method = req.method().clone();
125        Box::pin(async move { router.call(&path, method, query_params).await })
126    }
127}
128
129#[derive(Default)]
130pub struct Router {
131    handlers: HashMap<String, Box<dyn HttpHandler>>,
132}
133
134impl Router {
135    pub fn new() -> Self {
136        Self {
137            handlers: HashMap::default(),
138        }
139    }
140
141    pub fn nest(path: &str, router: Router) -> Self {
142        check_path(path);
143
144        let handlers = router
145            .handlers
146            .into_iter()
147            .map(|(url, handler)| (format!("{path}{url}"), handler))
148            .collect();
149
150        Self { handlers }
151    }
152
153    pub fn route(mut self, path: &str, handler: impl HttpHandler + 'static) -> Self {
154        check_path(path);
155
156        let _ = self.handlers.insert(path.to_owned(), Box::new(handler));
157
158        self
159    }
160
161    pub async fn call(
162        &self,
163        path: &str,
164        method: http::Method,
165        params: HashMap<String, String>,
166    ) -> Result<http::Response<BoxBody>, Infallible> {
167        let handler = match self.handlers.get(path) {
168            Some(handler) => handler,
169            None => {
170                return Ok(http::Response::builder()
171                    .status(http::StatusCode::NOT_FOUND)
172                    .body(empty_body())
173                    .unwrap())
174            }
175        };
176
177        let res = match handler.handle(path, method, &params).await {
178            Ok(res) => res.map(boxed),
179            Err(e) => http::Response::builder()
180                .status(http::StatusCode::INTERNAL_SERVER_ERROR)
181                .body(boxed(e.to_string()))
182                .unwrap(),
183        };
184
185        Ok(res)
186    }
187}
188
189fn check_path(path: &str) {
190    if path.is_empty() || !path.starts_with('/') {
191        panic!("paths must start with a `/`")
192    }
193}
194
195/// Returns a [BoxBody] from a string.
196/// The implementation follows [empty_body()].
197fn boxed(body: String) -> BoxBody {
198    Full::new(Bytes::from(body))
199        .map_err(|err| match err {})
200        .boxed_unsync()
201}
202
203#[cfg(test)]
204mod tests {
205    use common_meta::kv_backend::memory::MemoryKvBackend;
206    use common_meta::kv_backend::KvBackendRef;
207    use tokio::io::{AsyncReadExt, AsyncWriteExt};
208
209    use super::*;
210    use crate::metasrv::builder::MetasrvBuilder;
211    use crate::metasrv::MetasrvOptions;
212    use crate::{bootstrap, error};
213
214    struct MockOkHandler;
215
216    #[async_trait::async_trait]
217    impl HttpHandler for MockOkHandler {
218        async fn handle(
219            &self,
220            _: &str,
221            _: http::Method,
222            _: &HashMap<String, String>,
223        ) -> crate::Result<http::Response<String>> {
224            Ok(http::Response::builder()
225                .status(http::StatusCode::OK)
226                .body("Ok".to_string())
227                .unwrap())
228        }
229    }
230    struct MockEmptyKeyErrorHandler;
231
232    #[async_trait::async_trait]
233    impl HttpHandler for MockEmptyKeyErrorHandler {
234        async fn handle(
235            &self,
236            _: &str,
237            _: http::Method,
238            _: &HashMap<String, String>,
239        ) -> crate::Result<http::Response<String>> {
240            error::EmptyKeySnafu {}.fail()
241        }
242    }
243
244    #[test]
245    fn test_route_nest() {
246        let mock_handler = MockOkHandler {};
247        let router = Router::new().route("/test_node", mock_handler);
248        let router = Router::nest("/test_root", router);
249
250        assert_eq!(1, router.handlers.len());
251        assert!(router.handlers.contains_key("/test_root/test_node"));
252    }
253
254    #[should_panic]
255    #[test]
256    fn test_invalid_path() {
257        check_path("test_node")
258    }
259
260    #[should_panic]
261    #[test]
262    fn test_empty_path() {
263        check_path("")
264    }
265
266    #[tokio::test]
267    async fn test_route_call_ok() {
268        let mock_handler = MockOkHandler {};
269        let router = Router::new().route("/test_node", mock_handler);
270        let router = Router::nest("/test_root", router);
271
272        let res = router
273            .call(
274                "/test_root/test_node",
275                http::Method::GET,
276                HashMap::default(),
277            )
278            .await
279            .unwrap();
280
281        assert!(res.status().is_success());
282    }
283
284    #[tokio::test]
285    async fn test_route_call_no_handler() {
286        let router = Router::new();
287
288        let res = router
289            .call(
290                "/test_root/test_node",
291                http::Method::GET,
292                HashMap::default(),
293            )
294            .await
295            .unwrap();
296
297        assert_eq!(http::StatusCode::NOT_FOUND, res.status());
298    }
299
300    #[tokio::test]
301    async fn test_route_call_err() {
302        let mock_handler = MockEmptyKeyErrorHandler {};
303        let router = Router::new().route("/test_node", mock_handler);
304        let router = Router::nest("/test_root", router);
305
306        let res = router
307            .call(
308                "/test_root/test_node",
309                http::Method::GET,
310                HashMap::default(),
311            )
312            .await
313            .unwrap();
314
315        assert_eq!(http::StatusCode::INTERNAL_SERVER_ERROR, res.status());
316    }
317
318    async fn test_metasrv(kv_backend: KvBackendRef) -> Metasrv {
319        let opts = MetasrvOptions::default();
320        let builder = MetasrvBuilder::new()
321            .options(opts)
322            .kv_backend(kv_backend.clone());
323
324        let metasrv = builder.build().await.unwrap();
325        metasrv
326    }
327
328    #[tokio::test(flavor = "multi_thread")]
329    async fn test_metasrv_maintenance_mode() {
330        common_telemetry::init_default_ut_logging();
331        let kv_backend = Arc::new(MemoryKvBackend::new());
332        let metasrv = test_metasrv(kv_backend).await;
333        metasrv.try_start().await.unwrap();
334
335        let (mut client, server) = tokio::io::duplex(1024);
336        let metasrv = Arc::new(metasrv);
337        let service = metasrv.clone();
338        let _handle = tokio::spawn(async move {
339            let router = bootstrap::router(service);
340            router
341                .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
342                .await
343        });
344
345        // Get maintenance mode
346        let http_request = b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n";
347        client.write_all(http_request).await.unwrap();
348        let mut buf = vec![0; 1024];
349        let n = client.read(&mut buf).await.unwrap();
350        let response = String::from_utf8_lossy(&buf[..n]);
351        assert!(response.contains(r#"{"enabled":false}"#));
352        assert!(response.contains("200 OK"));
353
354        // Set maintenance mode to true
355        let http_post = b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n";
356        client.write_all(http_post).await.unwrap();
357        let mut buf = vec![0; 1024];
358        let n = client.read(&mut buf).await.unwrap();
359        let response = String::from_utf8_lossy(&buf[..n]);
360        assert!(response.contains(r#"{"enabled":true}"#));
361        assert!(response.contains("200 OK"));
362
363        let enabled = metasrv
364            .maintenance_mode_manager()
365            .maintenance_mode()
366            .await
367            .unwrap();
368        assert!(enabled);
369
370        // Get maintenance mode again
371        let http_request = b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n";
372        client.write_all(http_request).await.unwrap();
373        let mut buf = vec![0; 1024];
374        let n = client.read(&mut buf).await.unwrap();
375        let response = String::from_utf8_lossy(&buf[..n]);
376        assert!(response.contains(r#"{"enabled":true}"#));
377        assert!(response.contains("200 OK"));
378
379        // Set maintenance mode to false
380        let http_post = b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n";
381        client.write_all(http_post).await.unwrap();
382        let mut buf = vec![0; 1024];
383        let n = client.read(&mut buf).await.unwrap();
384        let response = String::from_utf8_lossy(&buf[..n]);
385        assert!(response.contains(r#"{"enabled":false}"#));
386        assert!(response.contains("200 OK"));
387
388        let enabled = metasrv
389            .maintenance_mode_manager()
390            .maintenance_mode()
391            .await
392            .unwrap();
393        assert!(!enabled);
394
395        // Set maintenance mode to true via GET request
396        let http_request =
397            b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n";
398        client.write_all(http_request).await.unwrap();
399        let mut buf = vec![0; 1024];
400        let n = client.read(&mut buf).await.unwrap();
401        let response = String::from_utf8_lossy(&buf[..n]);
402        assert!(response.contains(r#"{"enabled":true}"#));
403        assert!(response.contains("200 OK"));
404
405        // Set maintenance mode to false via GET request
406        let http_request =
407            b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n";
408        client.write_all(http_request).await.unwrap();
409        let mut buf = vec![0; 1024];
410        let n = client.read(&mut buf).await.unwrap();
411        let response = String::from_utf8_lossy(&buf[..n]);
412        assert!(response.contains(r#"{"enabled":false}"#));
413        assert!(response.contains("200 OK"));
414    }
415}