1mod health;
16mod heartbeat;
17mod leader;
18mod maintenance;
19mod node_lease;
20mod procedure;
21mod util;
22
23use std::collections::HashMap;
24use std::convert::Infallible;
25use std::sync::Arc;
26use std::task::{Context, Poll};
27
28use bytes::Bytes;
29use http_body_util::{BodyExt, Full};
30use tonic::body::BoxBody;
31use tonic::codegen::{empty_body, http, BoxFuture, Service};
32use tonic::server::NamedService;
33
34use crate::metasrv::Metasrv;
35
36pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
37 let router = Router::new().route("/health", health::HealthHandler);
38
39 let router = router.route(
40 "/node-lease",
41 node_lease::NodeLeaseHandler {
42 meta_peer_client: metasrv.meta_peer_client().clone(),
43 },
44 );
45
46 let handler = heartbeat::HeartBeatHandler {
47 meta_peer_client: metasrv.meta_peer_client().clone(),
48 };
49 let router = router
50 .route("/heartbeat", handler.clone())
51 .route("/heartbeat/help", handler);
52
53 let router = router.route(
54 "/leader",
55 leader::LeaderHandler {
56 election: metasrv.election().cloned(),
57 },
58 );
59
60 let router = router.routes(
61 &[
62 "/maintenance",
63 "/maintenance/status",
64 "/maintenance/enable",
65 "/maintenance/disable",
66 ],
67 maintenance::MaintenanceHandler {
68 manager: metasrv.runtime_switch_manager().clone(),
69 },
70 );
71 let router = router.routes(
72 &[
73 "/procedure-manager/pause",
74 "/procedure-manager/resume",
75 "/procedure-manager/status",
76 ],
77 procedure::ProcedureManagerHandler {
78 manager: metasrv.runtime_switch_manager().clone(),
79 },
80 );
81 let router = Router::nest("/admin", router);
82
83 Admin::new(router)
84}
85
86#[async_trait::async_trait]
87pub trait HttpHandler: Send + Sync {
88 async fn handle(
89 &self,
90 path: &str,
91 method: http::Method,
92 params: &HashMap<String, String>,
93 ) -> crate::Result<http::Response<String>>;
94}
95
96#[derive(Clone)]
97pub struct Admin
98where
99 Self: Send,
100{
101 router: Arc<Router>,
102}
103
104impl Admin {
105 pub fn new(router: Router) -> Self {
106 Self {
107 router: Arc::new(router),
108 }
109 }
110}
111
112impl NamedService for Admin {
113 const NAME: &'static str = "admin";
114}
115
116impl Service<http::Request<BoxBody>> for Admin {
117 type Response = http::Response<BoxBody>;
118 type Error = Infallible;
119 type Future = BoxFuture<Self::Response, Self::Error>;
120
121 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
122 Poll::Ready(Ok(()))
123 }
124
125 fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
126 let router = self.router.clone();
127 let query_params = req
128 .uri()
129 .query()
130 .map(|q| {
131 url::form_urlencoded::parse(q.as_bytes())
132 .into_owned()
133 .collect()
134 })
135 .unwrap_or_default();
136 let path = req.uri().path().to_owned();
137 let method = req.method().clone();
138 Box::pin(async move { router.call(&path, method, query_params).await })
139 }
140}
141
142#[derive(Default)]
143pub struct Router {
144 handlers: HashMap<String, Arc<dyn HttpHandler>>,
145}
146
147impl Router {
148 pub fn new() -> Self {
149 Self {
150 handlers: HashMap::default(),
151 }
152 }
153
154 pub fn nest(path: &str, router: Router) -> Self {
155 check_path(path);
156
157 let handlers = router
158 .handlers
159 .into_iter()
160 .map(|(url, handler)| (format!("{path}{url}"), handler))
161 .collect();
162
163 Self { handlers }
164 }
165
166 pub fn route(mut self, path: &str, handler: impl HttpHandler + 'static) -> Self {
167 check_path(path);
168
169 let _ = self.handlers.insert(path.to_owned(), Arc::new(handler));
170
171 self
172 }
173
174 pub fn routes(mut self, paths: &[&str], handler: impl HttpHandler + 'static) -> Self {
175 let handler = Arc::new(handler);
176 for path in paths {
177 check_path(path);
178 let _ = self.handlers.insert(path.to_string(), handler.clone());
179 }
180
181 self
182 }
183
184 pub async fn call(
185 &self,
186 path: &str,
187 method: http::Method,
188 params: HashMap<String, String>,
189 ) -> Result<http::Response<BoxBody>, Infallible> {
190 let handler = match self.handlers.get(path) {
191 Some(handler) => handler,
192 None => {
193 return Ok(http::Response::builder()
194 .status(http::StatusCode::NOT_FOUND)
195 .body(empty_body())
196 .unwrap())
197 }
198 };
199
200 let res = match handler.handle(path, method, ¶ms).await {
201 Ok(res) => res.map(boxed),
202 Err(e) => http::Response::builder()
203 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
204 .body(boxed(e.to_string()))
205 .unwrap(),
206 };
207
208 Ok(res)
209 }
210}
211
212fn check_path(path: &str) {
213 if path.is_empty() || !path.starts_with('/') {
214 panic!("paths must start with a `/`")
215 }
216}
217
218fn boxed(body: String) -> BoxBody {
221 Full::new(Bytes::from(body))
222 .map_err(|err| match err {})
223 .boxed_unsync()
224}
225
226#[cfg(test)]
227mod tests {
228 use common_meta::kv_backend::memory::MemoryKvBackend;
229 use common_meta::kv_backend::KvBackendRef;
230 use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
231
232 use super::*;
233 use crate::metasrv::builder::MetasrvBuilder;
234 use crate::metasrv::MetasrvOptions;
235 use crate::{bootstrap, error};
236
237 struct MockOkHandler;
238
239 #[async_trait::async_trait]
240 impl HttpHandler for MockOkHandler {
241 async fn handle(
242 &self,
243 _: &str,
244 _: http::Method,
245 _: &HashMap<String, String>,
246 ) -> crate::Result<http::Response<String>> {
247 Ok(http::Response::builder()
248 .status(http::StatusCode::OK)
249 .body("Ok".to_string())
250 .unwrap())
251 }
252 }
253 struct MockEmptyKeyErrorHandler;
254
255 #[async_trait::async_trait]
256 impl HttpHandler for MockEmptyKeyErrorHandler {
257 async fn handle(
258 &self,
259 _: &str,
260 _: http::Method,
261 _: &HashMap<String, String>,
262 ) -> crate::Result<http::Response<String>> {
263 error::EmptyKeySnafu {}.fail()
264 }
265 }
266
267 #[test]
268 fn test_route_nest() {
269 let mock_handler = MockOkHandler {};
270 let router = Router::new().route("/test_node", mock_handler);
271 let router = Router::nest("/test_root", router);
272
273 assert_eq!(1, router.handlers.len());
274 assert!(router.handlers.contains_key("/test_root/test_node"));
275 }
276
277 #[should_panic]
278 #[test]
279 fn test_invalid_path() {
280 check_path("test_node")
281 }
282
283 #[should_panic]
284 #[test]
285 fn test_empty_path() {
286 check_path("")
287 }
288
289 #[tokio::test]
290 async fn test_route_call_ok() {
291 let mock_handler = MockOkHandler {};
292 let router = Router::new().route("/test_node", mock_handler);
293 let router = Router::nest("/test_root", router);
294
295 let res = router
296 .call(
297 "/test_root/test_node",
298 http::Method::GET,
299 HashMap::default(),
300 )
301 .await
302 .unwrap();
303
304 assert!(res.status().is_success());
305 }
306
307 #[tokio::test]
308 async fn test_route_call_no_handler() {
309 let router = Router::new();
310
311 let res = router
312 .call(
313 "/test_root/test_node",
314 http::Method::GET,
315 HashMap::default(),
316 )
317 .await
318 .unwrap();
319
320 assert_eq!(http::StatusCode::NOT_FOUND, res.status());
321 }
322
323 #[tokio::test]
324 async fn test_route_call_err() {
325 let mock_handler = MockEmptyKeyErrorHandler {};
326 let router = Router::new().route("/test_node", mock_handler);
327 let router = Router::nest("/test_root", router);
328
329 let res = router
330 .call(
331 "/test_root/test_node",
332 http::Method::GET,
333 HashMap::default(),
334 )
335 .await
336 .unwrap();
337
338 assert_eq!(http::StatusCode::INTERNAL_SERVER_ERROR, res.status());
339 }
340
341 async fn test_metasrv(kv_backend: KvBackendRef) -> Metasrv {
342 let opts = MetasrvOptions::default();
343 let builder = MetasrvBuilder::new()
344 .options(opts)
345 .kv_backend(kv_backend.clone());
346
347 let metasrv = builder.build().await.unwrap();
348 metasrv
349 }
350
351 async fn send_request(client: &mut DuplexStream, request: &[u8]) -> String {
352 client.write_all(request).await.unwrap();
353 let mut buf = vec![0; 1024];
354 let n = client.read(&mut buf).await.unwrap();
355 String::from_utf8_lossy(&buf[..n]).to_string()
356 }
357
358 #[tokio::test(flavor = "multi_thread")]
359 async fn test_metasrv_maintenance_mode() {
360 common_telemetry::init_default_ut_logging();
361 let kv_backend = Arc::new(MemoryKvBackend::new());
362 let metasrv = test_metasrv(kv_backend).await;
363 metasrv.try_start().await.unwrap();
364
365 let (mut client, server) = tokio::io::duplex(1024);
366 let metasrv = Arc::new(metasrv);
367 let service = metasrv.clone();
368 let _handle = tokio::spawn(async move {
369 let router = bootstrap::router(service);
370 router
371 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
372 .await
373 });
374
375 let response = send_request(
377 &mut client,
378 b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
379 )
380 .await;
381 assert!(response.contains(r#"{"enabled":false}"#));
382 assert!(response.contains("200 OK"));
383
384 let response = send_request(
386 &mut client,
387 b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
388 )
389 .await;
390 assert!(response.contains(r#"{"enabled":true}"#));
391 assert!(response.contains("200 OK"));
392
393 let enabled = metasrv
394 .runtime_switch_manager()
395 .maintenance_mode()
396 .await
397 .unwrap();
398 assert!(enabled);
399
400 let response = send_request(
402 &mut client,
403 b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
404 )
405 .await;
406 assert!(response.contains(r#"{"enabled":true}"#));
407 assert!(response.contains("200 OK"));
408
409 let response = send_request(
411 &mut client,
412 b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
413 )
414 .await;
415 assert!(response.contains(r#"{"enabled":false}"#));
416 assert!(response.contains("200 OK"));
417
418 let enabled = metasrv
419 .runtime_switch_manager()
420 .maintenance_mode()
421 .await
422 .unwrap();
423 assert!(!enabled);
424
425 let response = send_request(
427 &mut client,
428 b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n",
429 )
430 .await;
431 assert!(response.contains(r#"{"enabled":true}"#));
432 assert!(response.contains("200 OK"));
433
434 let response = send_request(
436 &mut client,
437 b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n",
438 )
439 .await;
440 assert!(response.contains(r#"{"enabled":false}"#));
441 assert!(response.contains("200 OK"));
442
443 let response = send_request(
445 &mut client,
446 b"GET /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
447 )
448 .await;
449 assert!(response.contains(r#"{"enabled":false}"#));
450
451 let response = send_request(
453 &mut client,
454 b"POST /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
455 )
456 .await;
457 assert!(response.contains(r#"{"enabled":true}"#));
458
459 let response = send_request(
461 &mut client,
462 b"POST /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
463 )
464 .await;
465 assert!(response.contains(r#"{"enabled":false}"#));
466
467 let response = send_request(
469 &mut client,
470 b"POST /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
471 )
472 .await;
473 assert!(response.contains("404 Not Found"));
474
475 let response = send_request(
477 &mut client,
478 b"GET /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
479 )
480 .await;
481 assert!(response.contains("404 Not Found"));
482
483 let response = send_request(
485 &mut client,
486 b"GET /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
487 )
488 .await;
489 assert!(response.contains("404 Not Found"));
490 }
491
492 #[tokio::test(flavor = "multi_thread")]
493 async fn test_metasrv_procedure_manager_handler() {
494 common_telemetry::init_default_ut_logging();
495 let kv_backend = Arc::new(MemoryKvBackend::new());
496 let metasrv = test_metasrv(kv_backend).await;
497 metasrv.try_start().await.unwrap();
498
499 let (mut client, server) = tokio::io::duplex(1024);
500 let metasrv = Arc::new(metasrv);
501 let service = metasrv.clone();
502 let _handle = tokio::spawn(async move {
503 let router = bootstrap::router(service);
504 router
505 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
506 .await
507 });
508
509 let response = send_request(
511 &mut client,
512 b"GET /admin/procedure-manager/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
513 )
514 .await;
515 assert!(response.contains("200 OK"));
516 assert!(
517 response.contains(r#"{"status":"running"}"#),
518 "response: {}",
519 response
520 );
521
522 let response = send_request(
524 &mut client,
525 b"POST /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
526 )
527 .await;
528 assert!(response.contains("200 OK"));
529 assert!(response.contains(r#"{"status":"paused"}"#));
530
531 let response = send_request(
533 &mut client,
534 b"POST /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
535 )
536 .await;
537 assert!(response.contains("200 OK"));
538 assert!(
539 response.contains(r#"{"status":"running"}"#),
540 "response: {}",
541 response
542 );
543
544 let response = send_request(
546 &mut client,
547 b"GET /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
548 )
549 .await;
550 assert!(response.contains("404 Not Found"));
551
552 let response = send_request(
554 &mut client,
555 b"GET /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
556 )
557 .await;
558 assert!(response.contains("404 Not Found"));
559 }
560}