meta_srv/service/admin/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use axum::extract::State;
18use axum::response::{IntoResponse, Response};
19use axum::Json;
20use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
21use common_telemetry::info;
22use serde::{Deserialize, Serialize};
23use snafu::ResultExt;
24use tonic::codegen::http;
25
26use crate::error::RuntimeSwitchManagerSnafu;
27use crate::service::admin::util::{to_json_response, to_not_found_response, ErrorHandler};
28use crate::service::admin::HttpHandler;
29
30#[derive(Clone)]
31pub struct ProcedureManagerHandler {
32    pub manager: RuntimeSwitchManagerRef,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub(crate) struct ProcedureManagerStatusResponse {
37    status: ProcedureManagerStatus,
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(rename_all = "snake_case")]
42enum ProcedureManagerStatus {
43    Paused,
44    Running,
45}
46
47/// Get the procedure manager status.
48#[axum_macros::debug_handler]
49pub(crate) async fn status(State(handler): State<ProcedureManagerHandler>) -> Response {
50    handler
51        .get_procedure_manager_status()
52        .await
53        .map(Json)
54        .map_err(ErrorHandler::new)
55        .into_response()
56}
57
58/// Pause the procedure manager.
59#[axum_macros::debug_handler]
60pub(crate) async fn pause(State(handler): State<ProcedureManagerHandler>) -> Response {
61    handler
62        .pause_procedure_manager()
63        .await
64        .map(Json)
65        .map_err(ErrorHandler::new)
66        .into_response()
67}
68
69/// Resume the procedure manager.
70#[axum_macros::debug_handler]
71pub(crate) async fn resume(State(handler): State<ProcedureManagerHandler>) -> Response {
72    handler
73        .resume_procedure_manager()
74        .await
75        .map(Json)
76        .map_err(ErrorHandler::new)
77        .into_response()
78}
79
80impl ProcedureManagerHandler {
81    pub(crate) async fn pause_procedure_manager(
82        &self,
83    ) -> crate::Result<ProcedureManagerStatusResponse> {
84        self.manager
85            .pasue_procedure()
86            .await
87            .context(RuntimeSwitchManagerSnafu)?;
88        // TODO(weny): Add a record to the system events.
89        info!("Pause the procedure manager.");
90        Ok(ProcedureManagerStatusResponse {
91            status: ProcedureManagerStatus::Paused,
92        })
93    }
94
95    pub(crate) async fn resume_procedure_manager(
96        &self,
97    ) -> crate::Result<ProcedureManagerStatusResponse> {
98        self.manager
99            .resume_procedure()
100            .await
101            .context(RuntimeSwitchManagerSnafu)?;
102        // TODO(weny): Add a record to the system events.
103        info!("Resume the procedure manager.");
104        Ok(ProcedureManagerStatusResponse {
105            status: ProcedureManagerStatus::Running,
106        })
107    }
108
109    pub(crate) async fn get_procedure_manager_status(
110        &self,
111    ) -> crate::Result<ProcedureManagerStatusResponse> {
112        let is_paused = self
113            .manager
114            .is_procedure_paused()
115            .await
116            .context(RuntimeSwitchManagerSnafu)?;
117        let response = ProcedureManagerStatusResponse {
118            status: if is_paused {
119                ProcedureManagerStatus::Paused
120            } else {
121                ProcedureManagerStatus::Running
122            },
123        };
124
125        Ok(response)
126    }
127}
128
129#[async_trait::async_trait]
130impl HttpHandler for ProcedureManagerHandler {
131    async fn handle(
132        &self,
133        path: &str,
134        method: http::Method,
135        _: &HashMap<String, String>,
136    ) -> crate::Result<http::Response<String>> {
137        match method {
138            http::Method::GET => {
139                if path.ends_with("status") {
140                    let response = self.get_procedure_manager_status().await?;
141                    to_json_response(response)
142                } else {
143                    to_not_found_response()
144                }
145            }
146            http::Method::POST => {
147                if path.ends_with("pause") {
148                    let response = self.pause_procedure_manager().await?;
149                    to_json_response(response)
150                } else if path.ends_with("resume") {
151                    let response = self.resume_procedure_manager().await?;
152                    to_json_response(response)
153                } else {
154                    to_not_found_response()
155                }
156            }
157            _ => to_not_found_response(),
158        }
159    }
160}