common_procedure_test/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Test utilities for procedures.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use async_trait::async_trait;
21use common_procedure::local::{acquire_dynamic_key_lock, DynamicKeyLockGuard};
22use common_procedure::rwlock::KeyRwLock;
23use common_procedure::store::poison_store::PoisonStore;
24use common_procedure::test_util::InMemoryPoisonStore;
25use common_procedure::{
26    Context, ContextProvider, Output, PoisonKey, Procedure, ProcedureId, ProcedureState,
27    ProcedureWithId, Result, Status, StringKey,
28};
29
30/// A Mock [ContextProvider].
31#[derive(Default)]
32pub struct MockContextProvider {
33    states: HashMap<ProcedureId, ProcedureState>,
34    poison_manager: InMemoryPoisonStore,
35    dynamic_key_lock: Arc<KeyRwLock<String>>,
36}
37
38impl MockContextProvider {
39    /// Returns a new provider.
40    pub fn new(states: HashMap<ProcedureId, ProcedureState>) -> MockContextProvider {
41        MockContextProvider {
42            states,
43            poison_manager: InMemoryPoisonStore::default(),
44            dynamic_key_lock: Arc::new(KeyRwLock::new()),
45        }
46    }
47
48    /// Returns a reference to the poison manager.
49    pub fn poison_manager(&self) -> &InMemoryPoisonStore {
50        &self.poison_manager
51    }
52}
53
54#[async_trait]
55impl ContextProvider for MockContextProvider {
56    async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
57        Ok(self.states.get(&procedure_id).cloned())
58    }
59
60    async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
61        self.poison_manager
62            .try_put_poison(key.to_string(), procedure_id.to_string())
63            .await
64    }
65
66    async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
67        acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
68    }
69}
70
71/// Executes a procedure until it returns [Status::Done].
72///
73/// # Panics
74/// Panics if the `procedure` has subprocedure to execute.
75pub async fn execute_procedure_until_done(procedure: &mut dyn Procedure) -> Option<Output> {
76    let ctx = Context {
77        procedure_id: ProcedureId::random(),
78        provider: Arc::new(MockContextProvider::default()),
79    };
80
81    loop {
82        match procedure.execute(&ctx).await.unwrap() {
83            Status::Executing { .. } => (),
84            Status::Suspended { subprocedures, .. } => assert!(
85                subprocedures.is_empty(),
86                "Executing subprocedure is unsupported"
87            ),
88            Status::Done { output } => return output,
89            Status::Poisoned { .. } => return None,
90        }
91    }
92}
93
94/// Executes a procedure once.
95///
96/// Returns whether the procedure is done.
97pub async fn execute_procedure_once(
98    procedure_id: ProcedureId,
99    provider: MockContextProvider,
100    procedure: &mut dyn Procedure,
101) -> bool {
102    let ctx = Context {
103        procedure_id,
104        provider: Arc::new(provider),
105    };
106
107    match procedure.execute(&ctx).await.unwrap() {
108        Status::Executing { .. } => false,
109        Status::Suspended { subprocedures, .. } => {
110            assert!(
111                subprocedures.is_empty(),
112                "Executing subprocedure is unsupported"
113            );
114            false
115        }
116        Status::Done { .. } => true,
117        Status::Poisoned { .. } => false,
118    }
119}
120
121/// Executes a procedure until it returns [Status::Suspended] or [Status::Done].
122///
123/// Returns `Some` if it returns [Status::Suspended] or `None` if it returns [Status::Done].
124pub async fn execute_until_suspended_or_done(
125    procedure_id: ProcedureId,
126    provider: MockContextProvider,
127    procedure: &mut dyn Procedure,
128) -> Option<Vec<ProcedureWithId>> {
129    let ctx = Context {
130        procedure_id,
131        provider: Arc::new(provider),
132    };
133
134    loop {
135        match procedure.execute(&ctx).await.unwrap() {
136            Status::Executing { .. } => (),
137            Status::Suspended { subprocedures, .. } => return Some(subprocedures),
138            Status::Done { .. } => break,
139            Status::Poisoned { .. } => unreachable!(),
140        }
141    }
142
143    None
144}
145
146pub fn new_test_procedure_context() -> Context {
147    Context {
148        procedure_id: ProcedureId::random(),
149        provider: Arc::new(MockContextProvider::default()),
150    }
151}
152
153pub async fn execute_procedure_until<P: Procedure>(procedure: &mut P, until: impl Fn(&P) -> bool) {
154    let mut reached = false;
155    let context = new_test_procedure_context();
156    while !matches!(
157        procedure.execute(&context).await.unwrap(),
158        Status::Done { .. }
159    ) {
160        if until(procedure) {
161            reached = true;
162            break;
163        }
164    }
165    assert!(
166        reached,
167        "procedure '{}' did not reach the expected state",
168        procedure.type_name()
169    );
170}