common_meta/rpc/
procedure.rs1use std::time::Duration;
16
17pub use api::v1::meta::{MigrateRegionResponse, ProcedureStateResponse};
18use api::v1::meta::{
19 ProcedureDetailResponse as PbProcedureDetailResponse, ProcedureId as PbProcedureId,
20 ProcedureMeta as PbProcedureMeta, ProcedureStateResponse as PbProcedureStateResponse,
21 ProcedureStatus as PbProcedureStatus,
22};
23use common_error::ext::ErrorExt;
24use common_procedure::{ProcedureId, ProcedureInfo, ProcedureState};
25use snafu::ResultExt;
26
27use crate::error::{ParseProcedureIdSnafu, Result};
28
29#[derive(Clone)]
31pub struct MigrateRegionRequest {
32 pub region_id: u64,
33 pub from_peer: u64,
34 pub to_peer: u64,
35 pub timeout: Duration,
36}
37
38#[derive(Debug, Clone)]
40pub struct AddRegionFollowerRequest {
41 pub region_id: u64,
43 pub peer_id: u64,
45}
46
47#[derive(Debug, Clone)]
49pub struct RemoveRegionFollowerRequest {
50 pub region_id: u64,
52 pub peer_id: u64,
54}
55
56pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result<ProcedureId> {
58 ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| {
59 ParseProcedureIdSnafu {
60 key: hex::encode(&pid.key),
61 }
62 })
63}
64
65pub fn pid_to_pb_pid(pid: ProcedureId) -> PbProcedureId {
67 PbProcedureId {
68 key: pid.to_string().into(),
69 }
70}
71
72pub fn procedure_state_to_pb_state(state: &ProcedureState) -> (PbProcedureStatus, String) {
74 match state {
75 ProcedureState::Running => (PbProcedureStatus::Running, String::default()),
76 ProcedureState::Done { .. } => (PbProcedureStatus::Done, String::default()),
77 ProcedureState::Retrying { error } => (PbProcedureStatus::Retrying, error.output_msg()),
78 ProcedureState::Failed { error } => (PbProcedureStatus::Failed, error.output_msg()),
79 ProcedureState::PrepareRollback { error } => {
80 (PbProcedureStatus::PrepareRollback, error.output_msg())
81 }
82 ProcedureState::RollingBack { error } => {
83 (PbProcedureStatus::RollingBack, error.output_msg())
84 }
85 ProcedureState::Poisoned { error, .. } => (PbProcedureStatus::Poisoned, error.output_msg()),
86 }
87}
88
89pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStateResponse {
91 let (status, error) = procedure_state_to_pb_state(state);
92 PbProcedureStateResponse {
93 status: status.into(),
94 error,
95 ..Default::default()
96 }
97}
98
99pub fn procedure_details_to_pb_response(metas: Vec<ProcedureInfo>) -> PbProcedureDetailResponse {
100 let procedures = metas
101 .into_iter()
102 .map(|meta| {
103 let (status, error) = procedure_state_to_pb_state(&meta.state);
104 PbProcedureMeta {
105 id: Some(pid_to_pb_pid(meta.id)),
106 type_name: meta.type_name.to_string(),
107 status: status.into(),
108 start_time_ms: meta.start_time_ms,
109 end_time_ms: meta.end_time_ms,
110 lock_keys: meta.lock_keys,
111 error,
112 }
113 })
114 .collect();
115 PbProcedureDetailResponse {
116 procedures,
117 ..Default::default()
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use std::sync::Arc;
124
125 use common_procedure::Error;
126 use snafu::Location;
127
128 use super::*;
129
130 #[test]
131 fn test_pid_pb_pid_conversion() {
132 let pid = ProcedureId::random();
133
134 let pb_pid = pid_to_pb_pid(pid);
135
136 assert_eq!(pid, pb_pid_to_pid(&pb_pid).unwrap());
137 }
138
139 #[test]
140 fn test_procedure_state_to_pb_response() {
141 let state = ProcedureState::Running;
142 let resp = procedure_state_to_pb_response(&state);
143 assert_eq!(PbProcedureStatus::Running as i32, resp.status);
144 assert!(resp.error.is_empty());
145
146 let state = ProcedureState::Done { output: None };
147 let resp = procedure_state_to_pb_response(&state);
148 assert_eq!(PbProcedureStatus::Done as i32, resp.status);
149 assert!(resp.error.is_empty());
150
151 let state = ProcedureState::Retrying {
152 error: Arc::new(Error::ManagerNotStart {
153 location: Location::default(),
154 }),
155 };
156 let resp = procedure_state_to_pb_response(&state);
157 assert_eq!(PbProcedureStatus::Retrying as i32, resp.status);
158 assert_eq!("Procedure Manager is stopped", resp.error);
159
160 let state = ProcedureState::Failed {
161 error: Arc::new(Error::ManagerNotStart {
162 location: Location::default(),
163 }),
164 };
165 let resp = procedure_state_to_pb_response(&state);
166 assert_eq!(PbProcedureStatus::Failed as i32, resp.status);
167 assert_eq!("Procedure Manager is stopped", resp.error);
168 }
169}