common_meta/rpc/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17pub use api::v1::meta::{MigrateRegionResponse, ProcedureStateResponse};
18use api::v1::meta::{
19    ProcedureDetailResponse as PbProcedureDetailResponse, ProcedureId as PbProcedureId,
20    ProcedureMeta as PbProcedureMeta, ProcedureStateResponse as PbProcedureStateResponse,
21    ProcedureStatus as PbProcedureStatus,
22};
23use common_error::ext::ErrorExt;
24use common_procedure::{ProcedureId, ProcedureInfo, ProcedureState};
25use snafu::ResultExt;
26
27use crate::error::{ParseProcedureIdSnafu, Result};
28
29/// A request to migrate region.
30#[derive(Clone)]
31pub struct MigrateRegionRequest {
32    pub region_id: u64,
33    pub from_peer: u64,
34    pub to_peer: u64,
35    pub timeout: Duration,
36}
37
38/// A request to add region follower.
39#[derive(Debug, Clone)]
40pub struct AddRegionFollowerRequest {
41    /// The region id to add follower.
42    pub region_id: u64,
43    /// The peer id to add follower.
44    pub peer_id: u64,
45}
46
47/// A request to remove region follower.
48#[derive(Debug, Clone)]
49pub struct RemoveRegionFollowerRequest {
50    /// The region id to remove follower.
51    pub region_id: u64,
52    /// The peer id to remove follower.
53    pub peer_id: u64,
54}
55
56/// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`].
57pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result<ProcedureId> {
58    ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| {
59        ParseProcedureIdSnafu {
60            key: hex::encode(&pid.key),
61        }
62    })
63}
64
65/// Cast the common [`ProcedureId`] to protobuf [`ProcedureId`].
66pub fn pid_to_pb_pid(pid: ProcedureId) -> PbProcedureId {
67    PbProcedureId {
68        key: pid.to_string().into(),
69    }
70}
71
72/// Cast the [`ProcedureState`] to protobuf [`PbProcedureStatus`].
73pub fn procedure_state_to_pb_state(state: &ProcedureState) -> (PbProcedureStatus, String) {
74    match state {
75        ProcedureState::Running => (PbProcedureStatus::Running, String::default()),
76        ProcedureState::Done { .. } => (PbProcedureStatus::Done, String::default()),
77        ProcedureState::Retrying { error } => (PbProcedureStatus::Retrying, error.output_msg()),
78        ProcedureState::Failed { error } => (PbProcedureStatus::Failed, error.output_msg()),
79        ProcedureState::PrepareRollback { error } => {
80            (PbProcedureStatus::PrepareRollback, error.output_msg())
81        }
82        ProcedureState::RollingBack { error } => {
83            (PbProcedureStatus::RollingBack, error.output_msg())
84        }
85        ProcedureState::Poisoned { error, .. } => (PbProcedureStatus::Poisoned, error.output_msg()),
86    }
87}
88
89/// Cast the common [`ProcedureState`] to pb [`ProcedureStateResponse`].
90pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStateResponse {
91    let (status, error) = procedure_state_to_pb_state(state);
92    PbProcedureStateResponse {
93        status: status.into(),
94        error,
95        ..Default::default()
96    }
97}
98
99pub fn procedure_details_to_pb_response(metas: Vec<ProcedureInfo>) -> PbProcedureDetailResponse {
100    let procedures = metas
101        .into_iter()
102        .map(|meta| {
103            let (status, error) = procedure_state_to_pb_state(&meta.state);
104            PbProcedureMeta {
105                id: Some(pid_to_pb_pid(meta.id)),
106                type_name: meta.type_name.to_string(),
107                status: status.into(),
108                start_time_ms: meta.start_time_ms,
109                end_time_ms: meta.end_time_ms,
110                lock_keys: meta.lock_keys,
111                error,
112            }
113        })
114        .collect();
115    PbProcedureDetailResponse {
116        procedures,
117        ..Default::default()
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use std::sync::Arc;
124
125    use common_procedure::Error;
126    use snafu::Location;
127
128    use super::*;
129
130    #[test]
131    fn test_pid_pb_pid_conversion() {
132        let pid = ProcedureId::random();
133
134        let pb_pid = pid_to_pb_pid(pid);
135
136        assert_eq!(pid, pb_pid_to_pid(&pb_pid).unwrap());
137    }
138
139    #[test]
140    fn test_procedure_state_to_pb_response() {
141        let state = ProcedureState::Running;
142        let resp = procedure_state_to_pb_response(&state);
143        assert_eq!(PbProcedureStatus::Running as i32, resp.status);
144        assert!(resp.error.is_empty());
145
146        let state = ProcedureState::Done { output: None };
147        let resp = procedure_state_to_pb_response(&state);
148        assert_eq!(PbProcedureStatus::Done as i32, resp.status);
149        assert!(resp.error.is_empty());
150
151        let state = ProcedureState::Retrying {
152            error: Arc::new(Error::ManagerNotStart {
153                location: Location::default(),
154            }),
155        };
156        let resp = procedure_state_to_pb_response(&state);
157        assert_eq!(PbProcedureStatus::Retrying as i32, resp.status);
158        assert_eq!("Procedure Manager is stopped", resp.error);
159
160        let state = ProcedureState::Failed {
161            error: Arc::new(Error::ManagerNotStart {
162                location: Location::default(),
163            }),
164        };
165        let resp = procedure_state_to_pb_response(&state);
166        assert_eq!(PbProcedureStatus::Failed as i32, resp.status);
167        assert_eq!("Procedure Manager is stopped", resp.error);
168    }
169}