common_procedure/
watcher.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::debug;
16use snafu::ResultExt;
17use tokio::sync::watch::Receiver;
18
19use crate::error::{ProcedureExecSnafu, Result, WaitWatcherSnafu};
20use crate::procedure::{Output, ProcedureState};
21
22/// Watcher to watch procedure state.
23pub type Watcher = Receiver<ProcedureState>;
24
25/// Wait the [Watcher] until the [ProcedureState] is done.
26pub async fn wait(watcher: &mut Watcher) -> Result<Option<Output>> {
27    loop {
28        watcher.changed().await.context(WaitWatcherSnafu)?;
29        match &*watcher.borrow() {
30            ProcedureState::Running => (),
31            ProcedureState::Done { output } => {
32                return Ok(output.clone());
33            }
34            ProcedureState::Failed { error } => {
35                return Err(error.clone()).context(ProcedureExecSnafu);
36            }
37            ProcedureState::Retrying { error } => {
38                debug!("retrying, source: {}", error)
39            }
40            ProcedureState::RollingBack { error } => {
41                debug!("rolling back, source: {:?}", error)
42            }
43            ProcedureState::PrepareRollback { error } => {
44                debug!("commit rollback, source: {}", error)
45            }
46            ProcedureState::Poisoned { error, .. } => {
47                debug!("poisoned, source: {}", error);
48                return Err(error.clone()).context(ProcedureExecSnafu);
49            }
50        }
51    }
52}
53
54#[cfg(test)]
55mod tests {
56
57    use std::sync::Arc;
58    use std::time::Duration;
59
60    use async_trait::async_trait;
61    use common_error::mock::MockError;
62    use common_error::status_code::StatusCode;
63    use common_test_util::temp_dir::create_temp_dir;
64
65    use super::*;
66    use crate::error::Error;
67    use crate::local::{test_util, LocalManager, ManagerConfig};
68    use crate::procedure::PoisonKeys;
69    use crate::store::state_store::ObjectStateStore;
70    use crate::test_util::InMemoryPoisonStore;
71    use crate::{
72        Context, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureWithId, Status,
73    };
74
75    #[tokio::test]
76    async fn test_success_after_retry() {
77        let dir = create_temp_dir("after_retry");
78        let config = ManagerConfig {
79            parent_path: "data/".to_string(),
80            max_retry_times: 3,
81            retry_delay: Duration::from_millis(500),
82            ..Default::default()
83        };
84        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
85        let poison_manager = Arc::new(InMemoryPoisonStore::default());
86        let manager = LocalManager::new(config, state_store, poison_manager);
87        manager.start().await.unwrap();
88
89        #[derive(Debug)]
90        struct MockProcedure {
91            error: bool,
92        }
93
94        #[async_trait]
95        impl Procedure for MockProcedure {
96            fn type_name(&self) -> &str {
97                "MockProcedure"
98            }
99
100            async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
101                if self.error {
102                    self.error = !self.error;
103                    Err(Error::retry_later(MockError::new(StatusCode::Internal)))
104                } else {
105                    Ok(Status::done_with_output("hello"))
106                }
107            }
108
109            fn dump(&self) -> Result<String> {
110                Ok(String::new())
111            }
112
113            fn lock_key(&self) -> LockKey {
114                LockKey::single_exclusive("test.submit")
115            }
116
117            fn poison_keys(&self) -> PoisonKeys {
118                PoisonKeys::default()
119            }
120        }
121
122        let procedure_id = ProcedureId::random();
123        let mut watcher = manager
124            .submit(ProcedureWithId {
125                id: procedure_id,
126                procedure: Box::new(MockProcedure { error: true }),
127            })
128            .await
129            .unwrap();
130
131        let output = wait(&mut watcher).await.unwrap().unwrap();
132        let output = output.downcast::<&str>().unwrap();
133        assert_eq!(output.as_ref(), &"hello");
134    }
135}