common_meta/rpc/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17pub use api::v1::meta::{MigrateRegionResponse, ProcedureStateResponse};
18use api::v1::meta::{
19    ProcedureDetailResponse as PbProcedureDetailResponse, ProcedureId as PbProcedureId,
20    ProcedureMeta as PbProcedureMeta, ProcedureStateResponse as PbProcedureStateResponse,
21    ProcedureStatus as PbProcedureStatus,
22};
23use common_error::ext::ErrorExt;
24use common_procedure::{ProcedureId, ProcedureInfo, ProcedureState};
25use snafu::ResultExt;
26use table::metadata::TableId;
27
28use crate::error::{ParseProcedureIdSnafu, Result};
29
30/// A request to migrate region.
31#[derive(Clone)]
32pub struct MigrateRegionRequest {
33    pub region_id: u64,
34    pub from_peer: u64,
35    pub to_peer: u64,
36    pub timeout: Duration,
37}
38
39/// A request to add region follower.
40#[derive(Debug, Clone)]
41pub struct AddRegionFollowerRequest {
42    /// The region id to add follower.
43    pub region_id: u64,
44    /// The peer id to add follower.
45    pub peer_id: u64,
46}
47
48#[derive(Debug, Clone)]
49pub struct AddTableFollowerRequest {
50    pub catalog_name: String,
51    pub schema_name: String,
52    pub table_name: String,
53    pub table_id: TableId,
54}
55
56#[derive(Debug, Clone)]
57pub struct RemoveTableFollowerRequest {
58    pub catalog_name: String,
59    pub schema_name: String,
60    pub table_name: String,
61    pub table_id: TableId,
62}
63
64#[derive(Debug, Clone)]
65pub enum ManageRegionFollowerRequest {
66    AddRegionFollower(AddRegionFollowerRequest),
67    RemoveRegionFollower(RemoveRegionFollowerRequest),
68    AddTableFollower(AddTableFollowerRequest),
69    RemoveTableFollower(RemoveTableFollowerRequest),
70}
71
72/// A request to remove region follower.
73#[derive(Debug, Clone)]
74pub struct RemoveRegionFollowerRequest {
75    /// The region id to remove follower.
76    pub region_id: u64,
77    /// The peer id to remove follower.
78    pub peer_id: u64,
79}
80
81#[derive(Debug, Clone)]
82pub struct GcRegionsRequest {
83    pub region_ids: Vec<u64>,
84    pub full_file_listing: bool,
85    pub timeout: Duration,
86}
87
88#[derive(Debug, Clone)]
89pub struct GcTableRequest {
90    pub catalog_name: String,
91    pub schema_name: String,
92    pub table_name: String,
93    pub full_file_listing: bool,
94    pub timeout: Duration,
95}
96
97#[derive(Debug, Clone, Default, PartialEq, Eq)]
98pub struct GcResponse {
99    pub processed_regions: u64,
100    pub need_retry_regions: Vec<u64>,
101    pub deleted_files: u64,
102    pub deleted_indexes: u64,
103}
104
105/// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`].
106pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result<ProcedureId> {
107    ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| {
108        ParseProcedureIdSnafu {
109            key: hex::encode(&pid.key),
110        }
111    })
112}
113
114/// Cast the common [`ProcedureId`] to protobuf [`ProcedureId`].
115pub fn pid_to_pb_pid(pid: ProcedureId) -> PbProcedureId {
116    PbProcedureId {
117        key: pid.to_string().into(),
118    }
119}
120
121/// Cast the [`ProcedureState`] to protobuf [`PbProcedureStatus`].
122pub fn procedure_state_to_pb_state(state: &ProcedureState) -> (PbProcedureStatus, String) {
123    match state {
124        ProcedureState::Running => (PbProcedureStatus::Running, String::default()),
125        ProcedureState::Done { .. } => (PbProcedureStatus::Done, String::default()),
126        ProcedureState::Retrying { error } => (PbProcedureStatus::Retrying, error.output_msg()),
127        ProcedureState::Failed { error } => (PbProcedureStatus::Failed, error.output_msg()),
128        ProcedureState::PrepareRollback { error } => {
129            (PbProcedureStatus::PrepareRollback, error.output_msg())
130        }
131        ProcedureState::RollingBack { error } => {
132            (PbProcedureStatus::RollingBack, error.output_msg())
133        }
134        ProcedureState::Poisoned { error, .. } => (PbProcedureStatus::Poisoned, error.output_msg()),
135    }
136}
137
138/// Cast the common [`ProcedureState`] to pb [`ProcedureStateResponse`].
139pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStateResponse {
140    let (status, error) = procedure_state_to_pb_state(state);
141    PbProcedureStateResponse {
142        status: status.into(),
143        error,
144        ..Default::default()
145    }
146}
147
148pub fn procedure_details_to_pb_response(metas: Vec<ProcedureInfo>) -> PbProcedureDetailResponse {
149    let procedures = metas
150        .into_iter()
151        .map(|meta| {
152            let (status, error) = procedure_state_to_pb_state(&meta.state);
153            PbProcedureMeta {
154                id: Some(pid_to_pb_pid(meta.id)),
155                type_name: meta.type_name.clone(),
156                status: status.into(),
157                start_time_ms: meta.start_time_ms,
158                end_time_ms: meta.end_time_ms,
159                lock_keys: meta.lock_keys,
160                error,
161            }
162        })
163        .collect();
164    PbProcedureDetailResponse {
165        procedures,
166        ..Default::default()
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use std::sync::Arc;
173
174    use common_procedure::Error;
175    use snafu::Location;
176
177    use super::*;
178
179    #[test]
180    fn test_pid_pb_pid_conversion() {
181        let pid = ProcedureId::random();
182
183        let pb_pid = pid_to_pb_pid(pid);
184
185        assert_eq!(pid, pb_pid_to_pid(&pb_pid).unwrap());
186    }
187
188    #[test]
189    fn test_procedure_state_to_pb_response() {
190        let state = ProcedureState::Running;
191        let resp = procedure_state_to_pb_response(&state);
192        assert_eq!(PbProcedureStatus::Running as i32, resp.status);
193        assert!(resp.error.is_empty());
194
195        let state = ProcedureState::Done { output: None };
196        let resp = procedure_state_to_pb_response(&state);
197        assert_eq!(PbProcedureStatus::Done as i32, resp.status);
198        assert!(resp.error.is_empty());
199
200        let state = ProcedureState::Retrying {
201            error: Arc::new(Error::ManagerNotStart {
202                location: Location::default(),
203            }),
204        };
205        let resp = procedure_state_to_pb_response(&state);
206        assert_eq!(PbProcedureStatus::Retrying as i32, resp.status);
207        assert_eq!("Procedure Manager is stopped", resp.error);
208
209        let state = ProcedureState::Failed {
210            error: Arc::new(Error::ManagerNotStart {
211                location: Location::default(),
212            }),
213        };
214        let resp = procedure_state_to_pb_response(&state);
215        assert_eq!(PbProcedureStatus::Failed as i32, resp.status);
216        assert_eq!("Procedure Manager is stopped", resp.error);
217    }
218}