1mod health;
16mod heartbeat;
17mod leader;
18mod maintenance;
19mod node_lease;
20mod util;
21
22use std::collections::HashMap;
23use std::convert::Infallible;
24use std::sync::Arc;
25use std::task::{Context, Poll};
26
27use bytes::Bytes;
28use http_body_util::{BodyExt, Full};
29use tonic::body::BoxBody;
30use tonic::codegen::{empty_body, http, BoxFuture, Service};
31use tonic::server::NamedService;
32
33use crate::metasrv::Metasrv;
34
35pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
36 let router = Router::new().route("/health", health::HealthHandler);
37
38 let router = router.route(
39 "/node-lease",
40 node_lease::NodeLeaseHandler {
41 meta_peer_client: metasrv.meta_peer_client().clone(),
42 },
43 );
44
45 let handler = heartbeat::HeartBeatHandler {
46 meta_peer_client: metasrv.meta_peer_client().clone(),
47 };
48 let router = router
49 .route("/heartbeat", handler.clone())
50 .route("/heartbeat/help", handler);
51
52 let router = router.route(
53 "/leader",
54 leader::LeaderHandler {
55 election: metasrv.election().cloned(),
56 },
57 );
58
59 let router = router.route(
60 "/maintenance",
61 maintenance::MaintenanceHandler {
62 manager: metasrv.maintenance_mode_manager().clone(),
63 },
64 );
65 let router = Router::nest("/admin", router);
66
67 Admin::new(router)
68}
69
70#[async_trait::async_trait]
71pub trait HttpHandler: Send + Sync {
72 async fn handle(
73 &self,
74 path: &str,
75 method: http::Method,
76 params: &HashMap<String, String>,
77 ) -> crate::Result<http::Response<String>>;
78}
79
80#[derive(Clone)]
81pub struct Admin
82where
83 Self: Send,
84{
85 router: Arc<Router>,
86}
87
88impl Admin {
89 pub fn new(router: Router) -> Self {
90 Self {
91 router: Arc::new(router),
92 }
93 }
94}
95
96impl NamedService for Admin {
97 const NAME: &'static str = "admin";
98}
99
100impl<T> Service<http::Request<T>> for Admin
101where
102 T: Send,
103{
104 type Response = http::Response<BoxBody>;
105 type Error = Infallible;
106 type Future = BoxFuture<Self::Response, Self::Error>;
107
108 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
109 Poll::Ready(Ok(()))
110 }
111
112 fn call(&mut self, req: http::Request<T>) -> Self::Future {
113 let router = self.router.clone();
114 let query_params = req
115 .uri()
116 .query()
117 .map(|q| {
118 url::form_urlencoded::parse(q.as_bytes())
119 .into_owned()
120 .collect()
121 })
122 .unwrap_or_default();
123 let path = req.uri().path().to_owned();
124 let method = req.method().clone();
125 Box::pin(async move { router.call(&path, method, query_params).await })
126 }
127}
128
129#[derive(Default)]
130pub struct Router {
131 handlers: HashMap<String, Box<dyn HttpHandler>>,
132}
133
134impl Router {
135 pub fn new() -> Self {
136 Self {
137 handlers: HashMap::default(),
138 }
139 }
140
141 pub fn nest(path: &str, router: Router) -> Self {
142 check_path(path);
143
144 let handlers = router
145 .handlers
146 .into_iter()
147 .map(|(url, handler)| (format!("{path}{url}"), handler))
148 .collect();
149
150 Self { handlers }
151 }
152
153 pub fn route(mut self, path: &str, handler: impl HttpHandler + 'static) -> Self {
154 check_path(path);
155
156 let _ = self.handlers.insert(path.to_owned(), Box::new(handler));
157
158 self
159 }
160
161 pub async fn call(
162 &self,
163 path: &str,
164 method: http::Method,
165 params: HashMap<String, String>,
166 ) -> Result<http::Response<BoxBody>, Infallible> {
167 let handler = match self.handlers.get(path) {
168 Some(handler) => handler,
169 None => {
170 return Ok(http::Response::builder()
171 .status(http::StatusCode::NOT_FOUND)
172 .body(empty_body())
173 .unwrap())
174 }
175 };
176
177 let res = match handler.handle(path, method, ¶ms).await {
178 Ok(res) => res.map(boxed),
179 Err(e) => http::Response::builder()
180 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
181 .body(boxed(e.to_string()))
182 .unwrap(),
183 };
184
185 Ok(res)
186 }
187}
188
189fn check_path(path: &str) {
190 if path.is_empty() || !path.starts_with('/') {
191 panic!("paths must start with a `/`")
192 }
193}
194
195fn boxed(body: String) -> BoxBody {
198 Full::new(Bytes::from(body))
199 .map_err(|err| match err {})
200 .boxed_unsync()
201}
202
203#[cfg(test)]
204mod tests {
205 use common_meta::kv_backend::memory::MemoryKvBackend;
206 use common_meta::kv_backend::KvBackendRef;
207 use tokio::io::{AsyncReadExt, AsyncWriteExt};
208
209 use super::*;
210 use crate::metasrv::builder::MetasrvBuilder;
211 use crate::metasrv::MetasrvOptions;
212 use crate::{bootstrap, error};
213
214 struct MockOkHandler;
215
216 #[async_trait::async_trait]
217 impl HttpHandler for MockOkHandler {
218 async fn handle(
219 &self,
220 _: &str,
221 _: http::Method,
222 _: &HashMap<String, String>,
223 ) -> crate::Result<http::Response<String>> {
224 Ok(http::Response::builder()
225 .status(http::StatusCode::OK)
226 .body("Ok".to_string())
227 .unwrap())
228 }
229 }
230 struct MockEmptyKeyErrorHandler;
231
232 #[async_trait::async_trait]
233 impl HttpHandler for MockEmptyKeyErrorHandler {
234 async fn handle(
235 &self,
236 _: &str,
237 _: http::Method,
238 _: &HashMap<String, String>,
239 ) -> crate::Result<http::Response<String>> {
240 error::EmptyKeySnafu {}.fail()
241 }
242 }
243
244 #[test]
245 fn test_route_nest() {
246 let mock_handler = MockOkHandler {};
247 let router = Router::new().route("/test_node", mock_handler);
248 let router = Router::nest("/test_root", router);
249
250 assert_eq!(1, router.handlers.len());
251 assert!(router.handlers.contains_key("/test_root/test_node"));
252 }
253
254 #[should_panic]
255 #[test]
256 fn test_invalid_path() {
257 check_path("test_node")
258 }
259
260 #[should_panic]
261 #[test]
262 fn test_empty_path() {
263 check_path("")
264 }
265
266 #[tokio::test]
267 async fn test_route_call_ok() {
268 let mock_handler = MockOkHandler {};
269 let router = Router::new().route("/test_node", mock_handler);
270 let router = Router::nest("/test_root", router);
271
272 let res = router
273 .call(
274 "/test_root/test_node",
275 http::Method::GET,
276 HashMap::default(),
277 )
278 .await
279 .unwrap();
280
281 assert!(res.status().is_success());
282 }
283
284 #[tokio::test]
285 async fn test_route_call_no_handler() {
286 let router = Router::new();
287
288 let res = router
289 .call(
290 "/test_root/test_node",
291 http::Method::GET,
292 HashMap::default(),
293 )
294 .await
295 .unwrap();
296
297 assert_eq!(http::StatusCode::NOT_FOUND, res.status());
298 }
299
300 #[tokio::test]
301 async fn test_route_call_err() {
302 let mock_handler = MockEmptyKeyErrorHandler {};
303 let router = Router::new().route("/test_node", mock_handler);
304 let router = Router::nest("/test_root", router);
305
306 let res = router
307 .call(
308 "/test_root/test_node",
309 http::Method::GET,
310 HashMap::default(),
311 )
312 .await
313 .unwrap();
314
315 assert_eq!(http::StatusCode::INTERNAL_SERVER_ERROR, res.status());
316 }
317
318 async fn test_metasrv(kv_backend: KvBackendRef) -> Metasrv {
319 let opts = MetasrvOptions::default();
320 let builder = MetasrvBuilder::new()
321 .options(opts)
322 .kv_backend(kv_backend.clone());
323
324 let metasrv = builder.build().await.unwrap();
325 metasrv
326 }
327
328 #[tokio::test(flavor = "multi_thread")]
329 async fn test_metasrv_maintenance_mode() {
330 common_telemetry::init_default_ut_logging();
331 let kv_backend = Arc::new(MemoryKvBackend::new());
332 let metasrv = test_metasrv(kv_backend).await;
333 metasrv.try_start().await.unwrap();
334
335 let (mut client, server) = tokio::io::duplex(1024);
336 let metasrv = Arc::new(metasrv);
337 let service = metasrv.clone();
338 let _handle = tokio::spawn(async move {
339 let router = bootstrap::router(service);
340 router
341 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
342 .await
343 });
344
345 let http_request = b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n";
347 client.write_all(http_request).await.unwrap();
348 let mut buf = vec![0; 1024];
349 let n = client.read(&mut buf).await.unwrap();
350 let response = String::from_utf8_lossy(&buf[..n]);
351 assert!(response.contains(r#"{"enabled":false}"#));
352 assert!(response.contains("200 OK"));
353
354 let http_post = b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n";
356 client.write_all(http_post).await.unwrap();
357 let mut buf = vec![0; 1024];
358 let n = client.read(&mut buf).await.unwrap();
359 let response = String::from_utf8_lossy(&buf[..n]);
360 assert!(response.contains(r#"{"enabled":true}"#));
361 assert!(response.contains("200 OK"));
362
363 let enabled = metasrv
364 .maintenance_mode_manager()
365 .maintenance_mode()
366 .await
367 .unwrap();
368 assert!(enabled);
369
370 let http_request = b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n";
372 client.write_all(http_request).await.unwrap();
373 let mut buf = vec![0; 1024];
374 let n = client.read(&mut buf).await.unwrap();
375 let response = String::from_utf8_lossy(&buf[..n]);
376 assert!(response.contains(r#"{"enabled":true}"#));
377 assert!(response.contains("200 OK"));
378
379 let http_post = b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n";
381 client.write_all(http_post).await.unwrap();
382 let mut buf = vec![0; 1024];
383 let n = client.read(&mut buf).await.unwrap();
384 let response = String::from_utf8_lossy(&buf[..n]);
385 assert!(response.contains(r#"{"enabled":false}"#));
386 assert!(response.contains("200 OK"));
387
388 let enabled = metasrv
389 .maintenance_mode_manager()
390 .maintenance_mode()
391 .await
392 .unwrap();
393 assert!(!enabled);
394
395 let http_request =
397 b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n";
398 client.write_all(http_request).await.unwrap();
399 let mut buf = vec![0; 1024];
400 let n = client.read(&mut buf).await.unwrap();
401 let response = String::from_utf8_lossy(&buf[..n]);
402 assert!(response.contains(r#"{"enabled":true}"#));
403 assert!(response.contains("200 OK"));
404
405 let http_request =
407 b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n";
408 client.write_all(http_request).await.unwrap();
409 let mut buf = vec![0; 1024];
410 let n = client.read(&mut buf).await.unwrap();
411 let response = String::from_utf8_lossy(&buf[..n]);
412 assert!(response.contains(r#"{"enabled":false}"#));
413 assert!(response.contains("200 OK"));
414 }
415}