1use std::time::Duration;
16
17pub use api::v1::meta::{MigrateRegionResponse, ProcedureStateResponse};
18use api::v1::meta::{
19 ProcedureDetailResponse as PbProcedureDetailResponse, ProcedureId as PbProcedureId,
20 ProcedureMeta as PbProcedureMeta, ProcedureStateResponse as PbProcedureStateResponse,
21 ProcedureStatus as PbProcedureStatus,
22};
23use common_error::ext::ErrorExt;
24use common_procedure::{ProcedureId, ProcedureInfo, ProcedureState};
25use snafu::ResultExt;
26use table::metadata::TableId;
27
28use crate::error::{ParseProcedureIdSnafu, Result};
29
30#[derive(Clone)]
32pub struct MigrateRegionRequest {
33 pub region_id: u64,
34 pub from_peer: u64,
35 pub to_peer: u64,
36 pub timeout: Duration,
37}
38
39#[derive(Debug, Clone)]
41pub struct AddRegionFollowerRequest {
42 pub region_id: u64,
44 pub peer_id: u64,
46}
47
48#[derive(Debug, Clone)]
49pub struct AddTableFollowerRequest {
50 pub catalog_name: String,
51 pub schema_name: String,
52 pub table_name: String,
53 pub table_id: TableId,
54}
55
56#[derive(Debug, Clone)]
57pub struct RemoveTableFollowerRequest {
58 pub catalog_name: String,
59 pub schema_name: String,
60 pub table_name: String,
61 pub table_id: TableId,
62}
63
64#[derive(Debug, Clone)]
65pub enum ManageRegionFollowerRequest {
66 AddRegionFollower(AddRegionFollowerRequest),
67 RemoveRegionFollower(RemoveRegionFollowerRequest),
68 AddTableFollower(AddTableFollowerRequest),
69 RemoveTableFollower(RemoveTableFollowerRequest),
70}
71
72#[derive(Debug, Clone)]
74pub struct RemoveRegionFollowerRequest {
75 pub region_id: u64,
77 pub peer_id: u64,
79}
80
81#[derive(Debug, Clone)]
82pub struct GcRegionsRequest {
83 pub region_ids: Vec<u64>,
84 pub full_file_listing: bool,
85 pub timeout: Duration,
86}
87
88#[derive(Debug, Clone)]
89pub struct GcTableRequest {
90 pub catalog_name: String,
91 pub schema_name: String,
92 pub table_name: String,
93 pub full_file_listing: bool,
94 pub timeout: Duration,
95}
96
97#[derive(Debug, Clone, Default, PartialEq, Eq)]
98pub struct GcResponse {
99 pub processed_regions: u64,
100 pub need_retry_regions: Vec<u64>,
101 pub deleted_files: u64,
102 pub deleted_indexes: u64,
103}
104
105pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result<ProcedureId> {
107 ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| {
108 ParseProcedureIdSnafu {
109 key: hex::encode(&pid.key),
110 }
111 })
112}
113
114pub fn pid_to_pb_pid(pid: ProcedureId) -> PbProcedureId {
116 PbProcedureId {
117 key: pid.to_string().into(),
118 }
119}
120
121pub fn procedure_state_to_pb_state(state: &ProcedureState) -> (PbProcedureStatus, String) {
123 match state {
124 ProcedureState::Running => (PbProcedureStatus::Running, String::default()),
125 ProcedureState::Done { .. } => (PbProcedureStatus::Done, String::default()),
126 ProcedureState::Retrying { error } => (PbProcedureStatus::Retrying, error.output_msg()),
127 ProcedureState::Failed { error } => (PbProcedureStatus::Failed, error.output_msg()),
128 ProcedureState::PrepareRollback { error } => {
129 (PbProcedureStatus::PrepareRollback, error.output_msg())
130 }
131 ProcedureState::RollingBack { error } => {
132 (PbProcedureStatus::RollingBack, error.output_msg())
133 }
134 ProcedureState::Poisoned { error, .. } => (PbProcedureStatus::Poisoned, error.output_msg()),
135 }
136}
137
138pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStateResponse {
140 let (status, error) = procedure_state_to_pb_state(state);
141 PbProcedureStateResponse {
142 status: status.into(),
143 error,
144 ..Default::default()
145 }
146}
147
148pub fn procedure_details_to_pb_response(metas: Vec<ProcedureInfo>) -> PbProcedureDetailResponse {
149 let procedures = metas
150 .into_iter()
151 .map(|meta| {
152 let (status, error) = procedure_state_to_pb_state(&meta.state);
153 PbProcedureMeta {
154 id: Some(pid_to_pb_pid(meta.id)),
155 type_name: meta.type_name.clone(),
156 status: status.into(),
157 start_time_ms: meta.start_time_ms,
158 end_time_ms: meta.end_time_ms,
159 lock_keys: meta.lock_keys,
160 error,
161 }
162 })
163 .collect();
164 PbProcedureDetailResponse {
165 procedures,
166 ..Default::default()
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use std::sync::Arc;
173
174 use common_procedure::Error;
175 use snafu::Location;
176
177 use super::*;
178
179 #[test]
180 fn test_pid_pb_pid_conversion() {
181 let pid = ProcedureId::random();
182
183 let pb_pid = pid_to_pb_pid(pid);
184
185 assert_eq!(pid, pb_pid_to_pid(&pb_pid).unwrap());
186 }
187
188 #[test]
189 fn test_procedure_state_to_pb_response() {
190 let state = ProcedureState::Running;
191 let resp = procedure_state_to_pb_response(&state);
192 assert_eq!(PbProcedureStatus::Running as i32, resp.status);
193 assert!(resp.error.is_empty());
194
195 let state = ProcedureState::Done { output: None };
196 let resp = procedure_state_to_pb_response(&state);
197 assert_eq!(PbProcedureStatus::Done as i32, resp.status);
198 assert!(resp.error.is_empty());
199
200 let state = ProcedureState::Retrying {
201 error: Arc::new(Error::ManagerNotStart {
202 location: Location::default(),
203 }),
204 };
205 let resp = procedure_state_to_pb_response(&state);
206 assert_eq!(PbProcedureStatus::Retrying as i32, resp.status);
207 assert_eq!("Procedure Manager is stopped", resp.error);
208
209 let state = ProcedureState::Failed {
210 error: Arc::new(Error::ManagerNotStart {
211 location: Location::default(),
212 }),
213 };
214 let resp = procedure_state_to_pb_response(&state);
215 assert_eq!(PbProcedureStatus::Failed as i32, resp.status);
216 assert_eq!("Procedure Manager is stopped", resp.error);
217 }
218}