common_procedure/
watcher.rs1use common_telemetry::debug;
16use snafu::ResultExt;
17use tokio::sync::watch::Receiver;
18
19use crate::error::{ProcedureExecSnafu, Result, WaitWatcherSnafu};
20use crate::procedure::{Output, ProcedureState};
21
22pub type Watcher = Receiver<ProcedureState>;
24
25pub async fn wait(watcher: &mut Watcher) -> Result<Option<Output>> {
27 loop {
28 watcher.changed().await.context(WaitWatcherSnafu)?;
29 match &*watcher.borrow() {
30 ProcedureState::Running => (),
31 ProcedureState::Done { output } => {
32 return Ok(output.clone());
33 }
34 ProcedureState::Failed { error } => {
35 return Err(error.clone()).context(ProcedureExecSnafu);
36 }
37 ProcedureState::Retrying { error } => {
38 debug!("retrying, source: {}", error)
39 }
40 ProcedureState::RollingBack { error } => {
41 debug!("rolling back, source: {:?}", error)
42 }
43 ProcedureState::PrepareRollback { error } => {
44 debug!("commit rollback, source: {}", error)
45 }
46 ProcedureState::Poisoned { error, .. } => {
47 debug!("poisoned, source: {}", error);
48 return Err(error.clone()).context(ProcedureExecSnafu);
49 }
50 }
51 }
52}
53
54#[cfg(test)]
55mod tests {
56
57 use std::sync::Arc;
58 use std::time::Duration;
59
60 use async_trait::async_trait;
61 use common_error::mock::MockError;
62 use common_error::status_code::StatusCode;
63 use common_test_util::temp_dir::create_temp_dir;
64
65 use super::*;
66 use crate::error::Error;
67 use crate::local::{test_util, LocalManager, ManagerConfig};
68 use crate::procedure::PoisonKeys;
69 use crate::store::state_store::ObjectStateStore;
70 use crate::test_util::InMemoryPoisonStore;
71 use crate::{
72 Context, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureWithId, Status,
73 };
74
75 #[tokio::test]
76 async fn test_success_after_retry() {
77 let dir = create_temp_dir("after_retry");
78 let config = ManagerConfig {
79 parent_path: "data/".to_string(),
80 max_retry_times: 3,
81 retry_delay: Duration::from_millis(500),
82 ..Default::default()
83 };
84 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
85 let poison_manager = Arc::new(InMemoryPoisonStore::default());
86 let manager = LocalManager::new(config, state_store, poison_manager);
87 manager.start().await.unwrap();
88
89 #[derive(Debug)]
90 struct MockProcedure {
91 error: bool,
92 }
93
94 #[async_trait]
95 impl Procedure for MockProcedure {
96 fn type_name(&self) -> &str {
97 "MockProcedure"
98 }
99
100 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
101 if self.error {
102 self.error = !self.error;
103 Err(Error::retry_later(MockError::new(StatusCode::Internal)))
104 } else {
105 Ok(Status::done_with_output("hello"))
106 }
107 }
108
109 fn dump(&self) -> Result<String> {
110 Ok(String::new())
111 }
112
113 fn lock_key(&self) -> LockKey {
114 LockKey::single_exclusive("test.submit")
115 }
116
117 fn poison_keys(&self) -> PoisonKeys {
118 PoisonKeys::default()
119 }
120 }
121
122 let procedure_id = ProcedureId::random();
123 let mut watcher = manager
124 .submit(ProcedureWithId {
125 id: procedure_id,
126 procedure: Box::new(MockProcedure { error: true }),
127 })
128 .await
129 .unwrap();
130
131 let output = wait(&mut watcher).await.unwrap().unwrap();
132 let output = output.downcast::<&str>().unwrap();
133 assert_eq!(output.as_ref(), &"hello");
134 }
135}