meta_srv/service/admin/
procedure.rs1use std::collections::HashMap;
16
17use axum::extract::State;
18use axum::response::{IntoResponse, Response};
19use axum::Json;
20use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
21use common_telemetry::info;
22use serde::{Deserialize, Serialize};
23use snafu::ResultExt;
24use tonic::codegen::http;
25
26use crate::error::RuntimeSwitchManagerSnafu;
27use crate::service::admin::util::{to_json_response, to_not_found_response, ErrorHandler};
28use crate::service::admin::HttpHandler;
29
30#[derive(Clone)]
31pub struct ProcedureManagerHandler {
32 pub manager: RuntimeSwitchManagerRef,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub(crate) struct ProcedureManagerStatusResponse {
37 status: ProcedureManagerStatus,
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(rename_all = "snake_case")]
42enum ProcedureManagerStatus {
43 Paused,
44 Running,
45}
46
47#[axum_macros::debug_handler]
49pub(crate) async fn status(State(handler): State<ProcedureManagerHandler>) -> Response {
50 handler
51 .get_procedure_manager_status()
52 .await
53 .map(Json)
54 .map_err(ErrorHandler::new)
55 .into_response()
56}
57
58#[axum_macros::debug_handler]
60pub(crate) async fn pause(State(handler): State<ProcedureManagerHandler>) -> Response {
61 handler
62 .pause_procedure_manager()
63 .await
64 .map(Json)
65 .map_err(ErrorHandler::new)
66 .into_response()
67}
68
69#[axum_macros::debug_handler]
71pub(crate) async fn resume(State(handler): State<ProcedureManagerHandler>) -> Response {
72 handler
73 .resume_procedure_manager()
74 .await
75 .map(Json)
76 .map_err(ErrorHandler::new)
77 .into_response()
78}
79
80impl ProcedureManagerHandler {
81 pub(crate) async fn pause_procedure_manager(
82 &self,
83 ) -> crate::Result<ProcedureManagerStatusResponse> {
84 self.manager
85 .pasue_procedure()
86 .await
87 .context(RuntimeSwitchManagerSnafu)?;
88 info!("Pause the procedure manager.");
90 Ok(ProcedureManagerStatusResponse {
91 status: ProcedureManagerStatus::Paused,
92 })
93 }
94
95 pub(crate) async fn resume_procedure_manager(
96 &self,
97 ) -> crate::Result<ProcedureManagerStatusResponse> {
98 self.manager
99 .resume_procedure()
100 .await
101 .context(RuntimeSwitchManagerSnafu)?;
102 info!("Resume the procedure manager.");
104 Ok(ProcedureManagerStatusResponse {
105 status: ProcedureManagerStatus::Running,
106 })
107 }
108
109 pub(crate) async fn get_procedure_manager_status(
110 &self,
111 ) -> crate::Result<ProcedureManagerStatusResponse> {
112 let is_paused = self
113 .manager
114 .is_procedure_paused()
115 .await
116 .context(RuntimeSwitchManagerSnafu)?;
117 let response = ProcedureManagerStatusResponse {
118 status: if is_paused {
119 ProcedureManagerStatus::Paused
120 } else {
121 ProcedureManagerStatus::Running
122 },
123 };
124
125 Ok(response)
126 }
127}
128
129#[async_trait::async_trait]
130impl HttpHandler for ProcedureManagerHandler {
131 async fn handle(
132 &self,
133 path: &str,
134 method: http::Method,
135 _: &HashMap<String, String>,
136 ) -> crate::Result<http::Response<String>> {
137 match method {
138 http::Method::GET => {
139 if path.ends_with("status") {
140 let response = self.get_procedure_manager_status().await?;
141 to_json_response(response)
142 } else {
143 to_not_found_response()
144 }
145 }
146 http::Method::POST => {
147 if path.ends_with("pause") {
148 let response = self.pause_procedure_manager().await?;
149 to_json_response(response)
150 } else if path.ends_with("resume") {
151 let response = self.resume_procedure_manager().await?;
152 to_json_response(response)
153 } else {
154 to_not_found_response()
155 }
156 }
157 _ => to_not_found_response(),
158 }
159 }
160}