common_procedure_test/
lib.rs1use std::collections::HashMap;
18use std::sync::Arc;
19
20use async_trait::async_trait;
21use common_procedure::local::{acquire_dynamic_key_lock, DynamicKeyLockGuard};
22use common_procedure::rwlock::KeyRwLock;
23use common_procedure::store::poison_store::PoisonStore;
24use common_procedure::test_util::InMemoryPoisonStore;
25use common_procedure::{
26 Context, ContextProvider, Output, PoisonKey, Procedure, ProcedureId, ProcedureState,
27 ProcedureWithId, Result, Status, StringKey,
28};
29
30#[derive(Default)]
32pub struct MockContextProvider {
33 states: HashMap<ProcedureId, ProcedureState>,
34 poison_manager: InMemoryPoisonStore,
35 dynamic_key_lock: Arc<KeyRwLock<String>>,
36}
37
38impl MockContextProvider {
39 pub fn new(states: HashMap<ProcedureId, ProcedureState>) -> MockContextProvider {
41 MockContextProvider {
42 states,
43 poison_manager: InMemoryPoisonStore::default(),
44 dynamic_key_lock: Arc::new(KeyRwLock::new()),
45 }
46 }
47
48 pub fn poison_manager(&self) -> &InMemoryPoisonStore {
50 &self.poison_manager
51 }
52}
53
54#[async_trait]
55impl ContextProvider for MockContextProvider {
56 async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
57 Ok(self.states.get(&procedure_id).cloned())
58 }
59
60 async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
61 self.poison_manager
62 .try_put_poison(key.to_string(), procedure_id.to_string())
63 .await
64 }
65
66 async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
67 acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
68 }
69}
70
71pub async fn execute_procedure_until_done(procedure: &mut dyn Procedure) -> Option<Output> {
76 let ctx = Context {
77 procedure_id: ProcedureId::random(),
78 provider: Arc::new(MockContextProvider::default()),
79 };
80
81 loop {
82 match procedure.execute(&ctx).await.unwrap() {
83 Status::Executing { .. } => (),
84 Status::Suspended { subprocedures, .. } => assert!(
85 subprocedures.is_empty(),
86 "Executing subprocedure is unsupported"
87 ),
88 Status::Done { output } => return output,
89 Status::Poisoned { .. } => return None,
90 }
91 }
92}
93
94pub async fn execute_procedure_once(
98 procedure_id: ProcedureId,
99 provider: MockContextProvider,
100 procedure: &mut dyn Procedure,
101) -> bool {
102 let ctx = Context {
103 procedure_id,
104 provider: Arc::new(provider),
105 };
106
107 match procedure.execute(&ctx).await.unwrap() {
108 Status::Executing { .. } => false,
109 Status::Suspended { subprocedures, .. } => {
110 assert!(
111 subprocedures.is_empty(),
112 "Executing subprocedure is unsupported"
113 );
114 false
115 }
116 Status::Done { .. } => true,
117 Status::Poisoned { .. } => false,
118 }
119}
120
121pub async fn execute_until_suspended_or_done(
125 procedure_id: ProcedureId,
126 provider: MockContextProvider,
127 procedure: &mut dyn Procedure,
128) -> Option<Vec<ProcedureWithId>> {
129 let ctx = Context {
130 procedure_id,
131 provider: Arc::new(provider),
132 };
133
134 loop {
135 match procedure.execute(&ctx).await.unwrap() {
136 Status::Executing { .. } => (),
137 Status::Suspended { subprocedures, .. } => return Some(subprocedures),
138 Status::Done { .. } => break,
139 Status::Poisoned { .. } => unreachable!(),
140 }
141 }
142
143 None
144}
145
146pub fn new_test_procedure_context() -> Context {
147 Context {
148 procedure_id: ProcedureId::random(),
149 provider: Arc::new(MockContextProvider::default()),
150 }
151}
152
153pub async fn execute_procedure_until<P: Procedure>(procedure: &mut P, until: impl Fn(&P) -> bool) {
154 let mut reached = false;
155 let context = new_test_procedure_context();
156 while !matches!(
157 procedure.execute(&context).await.unwrap(),
158 Status::Done { .. }
159 ) {
160 if until(procedure) {
161 reached = true;
162 break;
163 }
164 }
165 assert!(
166 reached,
167 "procedure '{}' did not reach the expected state",
168 procedure.type_name()
169 );
170}