common_procedure/
local.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod runner;
16
17use std::collections::hash_map::Entry;
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20use std::sync::{Arc, Mutex, RwLock};
21use std::time::{Duration, Instant};
22
23use async_trait::async_trait;
24use backon::ExponentialBuilder;
25use common_runtime::{RepeatedTask, TaskFunction};
26use common_telemetry::tracing_context::{FutureExt, TracingContext};
27use common_telemetry::{error, info, tracing};
28use snafu::{ensure, OptionExt, ResultExt};
29use tokio::sync::watch::{self, Receiver, Sender};
30use tokio::sync::{Mutex as TokioMutex, Notify};
31
32use crate::error::{
33    self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu,
34    PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
35    StopRemoveOutdatedMetaTaskSnafu, TooManyRunningProceduresSnafu,
36};
37use crate::local::runner::Runner;
38use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
39use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
40use crate::store::poison_store::PoisonStoreRef;
41use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
42use crate::{
43    BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
44    ProcedureState, ProcedureWithId, StringKey, Watcher,
45};
46
47/// The expired time of a procedure's metadata.
48const META_TTL: Duration = Duration::from_secs(60 * 10);
49
50/// Shared metadata of a procedure.
51///
52/// # Note
53/// [Notify] is not a condition variable, we can't guarantee the waiters are notified
54/// if they didn't call `notified()` before we signal the notify. So we
55/// 1. use dedicated notify for each condition, such as waiting for a lock, waiting
56///    for children;
57/// 2. always use `notify_one` and ensure there are only one waiter.
58#[derive(Debug)]
59pub(crate) struct ProcedureMeta {
60    /// Id of this procedure.
61    id: ProcedureId,
62    /// Type name of this procedure.
63    type_name: String,
64    /// Parent procedure id.
65    parent_id: Option<ProcedureId>,
66    /// Notify to wait for subprocedures.
67    child_notify: Notify,
68    /// Lock required by this procedure.
69    lock_key: LockKey,
70    /// Poison keys that may cause this procedure to become poisoned during execution.
71    poison_keys: PoisonKeys,
72    /// Sender to notify the procedure state.
73    state_sender: Sender<ProcedureState>,
74    /// Receiver to watch the procedure state.
75    state_receiver: Receiver<ProcedureState>,
76    /// Id of child procedures.
77    children: Mutex<Vec<ProcedureId>>,
78    /// Start execution time of this procedure.
79    start_time_ms: AtomicI64,
80    /// End execution time of this procedure.
81    end_time_ms: AtomicI64,
82}
83
84impl ProcedureMeta {
85    fn new(
86        id: ProcedureId,
87        procedure_state: ProcedureState,
88        parent_id: Option<ProcedureId>,
89        lock_key: LockKey,
90        poison_keys: PoisonKeys,
91        type_name: &str,
92    ) -> ProcedureMeta {
93        let (state_sender, state_receiver) = watch::channel(procedure_state);
94        ProcedureMeta {
95            id,
96            parent_id,
97            child_notify: Notify::new(),
98            lock_key,
99            poison_keys,
100            state_sender,
101            state_receiver,
102            children: Mutex::new(Vec::new()),
103            start_time_ms: AtomicI64::new(0),
104            end_time_ms: AtomicI64::new(0),
105            type_name: type_name.to_string(),
106        }
107    }
108
109    /// Returns current [ProcedureState].
110    fn state(&self) -> ProcedureState {
111        self.state_receiver.borrow().clone()
112    }
113
114    /// Update current [ProcedureState].
115    fn set_state(&self, state: ProcedureState) {
116        // Safety: ProcedureMeta also holds the receiver, so `send()` should never fail.
117        self.state_sender.send(state).unwrap();
118    }
119
120    /// Push `procedure_id` of the subprocedure to the metadata.
121    fn push_child(&self, procedure_id: ProcedureId) {
122        let mut children = self.children.lock().unwrap();
123        children.push(procedure_id);
124    }
125
126    /// Append subprocedures to given `buffer`.
127    fn list_children(&self, buffer: &mut Vec<ProcedureId>) {
128        let children = self.children.lock().unwrap();
129        buffer.extend_from_slice(&children);
130    }
131
132    /// Returns the number of subprocedures.
133    fn num_children(&self) -> usize {
134        self.children.lock().unwrap().len()
135    }
136
137    /// update the start time of the procedure.
138    fn set_start_time_ms(&self) {
139        self.start_time_ms
140            .store(common_time::util::current_time_millis(), Ordering::Relaxed);
141    }
142
143    /// update the end time of the procedure.
144    fn set_end_time_ms(&self) {
145        self.end_time_ms
146            .store(common_time::util::current_time_millis(), Ordering::Relaxed);
147    }
148}
149
150/// Reference counted pointer to [ProcedureMeta].
151type ProcedureMetaRef = Arc<ProcedureMeta>;
152
153/// Procedure loaded from store.
154struct LoadedProcedure {
155    procedure: BoxedProcedure,
156    step: u32,
157}
158
159/// The dynamic lock for procedure execution.
160///
161/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
162/// during execution. They are only held when the procedure specifically needs these keys
163/// and are released as soon as the procedure no longer needs them.
164/// This allows for more fine-grained concurrency control during procedure execution.
165pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
166
167/// Acquires a dynamic key lock for the given key.
168///
169/// This function takes a reference to the dynamic key lock and a pointer to the key.
170/// It then matches the key type and acquires the appropriate lock.
171pub async fn acquire_dynamic_key_lock(
172    lock: &DynamicKeyLock,
173    key: &StringKey,
174) -> DynamicKeyLockGuard {
175    match key {
176        StringKey::Share(key) => {
177            let guard = lock.read(key.to_string()).await;
178            DynamicKeyLockGuard {
179                guard: Some(OwnedKeyRwLockGuard::from(guard)),
180                key: key.to_string(),
181                lock: lock.clone(),
182            }
183        }
184        StringKey::Exclusive(key) => {
185            let guard = lock.write(key.to_string()).await;
186            DynamicKeyLockGuard {
187                guard: Some(OwnedKeyRwLockGuard::from(guard)),
188                key: key.to_string(),
189                lock: lock.clone(),
190            }
191        }
192    }
193}
194/// A guard for the dynamic key lock.
195///
196/// This guard is used to release the lock when the procedure no longer needs it.
197/// It also ensures that the lock is cleaned up when the guard is dropped.
198pub struct DynamicKeyLockGuard {
199    guard: Option<OwnedKeyRwLockGuard>,
200    key: String,
201    lock: DynamicKeyLock,
202}
203
204impl Drop for DynamicKeyLockGuard {
205    fn drop(&mut self) {
206        if let Some(guard) = self.guard.take() {
207            drop(guard);
208        }
209        self.lock.clean_keys(&[self.key.to_string()]);
210    }
211}
212
213/// Shared context of the manager.
214pub(crate) struct ManagerContext {
215    /// Procedure loaders. The key is the type name of the procedure which the loader returns.
216    loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
217    /// The key lock for the procedure.
218    ///
219    /// The lock keys are defined in `Procedure::lock_key()`.
220    /// These locks are acquired before the procedure starts and released after the procedure finishes.
221    /// They ensure exclusive access to resources throughout the entire procedure lifecycle.
222    key_lock: KeyRwLock<String>,
223    /// The dynamic lock for procedure execution.
224    ///
225    /// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
226    /// during execution. They are only held when the procedure specifically needs these keys
227    /// and are released as soon as the procedure no longer needs them.
228    /// This allows for more fine-grained concurrency control during procedure execution.
229    dynamic_key_lock: DynamicKeyLock,
230    /// Procedures in the manager.
231    procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
232    /// Running procedures.
233    running_procedures: Mutex<HashSet<ProcedureId>>,
234    /// Ids and finished time of finished procedures.
235    finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
236    /// Running flag.
237    running: Arc<AtomicBool>,
238    /// Poison manager.
239    poison_manager: PoisonStoreRef,
240}
241
242#[async_trait]
243impl ContextProvider for ManagerContext {
244    async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
245        Ok(self.state(procedure_id))
246    }
247
248    async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
249        {
250            // validate the procedure exists
251            let procedures = self.procedures.read().unwrap();
252            let procedure = procedures
253                .get(&procedure_id)
254                .context(ProcedureNotFoundSnafu { procedure_id })?;
255
256            // validate the poison key is defined
257            ensure!(
258                procedure.poison_keys.contains(key),
259                PoisonKeyNotDefinedSnafu {
260                    key: key.clone(),
261                    procedure_id
262                }
263            );
264        }
265        let key = key.to_string();
266        let procedure_id = procedure_id.to_string();
267        self.poison_manager.try_put_poison(key, procedure_id).await
268    }
269
270    async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
271        acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
272    }
273}
274
275impl ManagerContext {
276    /// Returns a new [ManagerContext].
277    fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
278        ManagerContext {
279            key_lock: KeyRwLock::new(),
280            dynamic_key_lock: Arc::new(KeyRwLock::new()),
281            loaders: Mutex::new(HashMap::new()),
282            procedures: RwLock::new(HashMap::new()),
283            running_procedures: Mutex::new(HashSet::new()),
284            finished_procedures: Mutex::new(VecDeque::new()),
285            running: Arc::new(AtomicBool::new(false)),
286            poison_manager,
287        }
288    }
289
290    #[cfg(test)]
291    pub(crate) fn set_running(&self) {
292        self.running.store(true, Ordering::Relaxed);
293    }
294
295    /// Set the running flag.
296    pub(crate) fn start(&self) {
297        self.running.store(true, Ordering::Relaxed);
298    }
299
300    pub(crate) fn stop(&self) {
301        self.running.store(false, Ordering::Relaxed);
302    }
303
304    /// Return `ProcedureManager` is running.
305    pub(crate) fn running(&self) -> bool {
306        self.running.load(Ordering::Relaxed)
307    }
308
309    /// Returns true if the procedure with specific `procedure_id` exists.
310    fn contains_procedure(&self, procedure_id: ProcedureId) -> bool {
311        let procedures = self.procedures.read().unwrap();
312        procedures.contains_key(&procedure_id)
313    }
314
315    /// Returns the number of running procedures.
316    fn num_running_procedures(&self) -> usize {
317        self.running_procedures.lock().unwrap().len()
318    }
319
320    /// Try to insert the `procedure` to the context if there is no procedure
321    /// with same [ProcedureId].
322    ///
323    /// Returns `false` if there is already a procedure using the same [ProcedureId].
324    fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
325        let procedure_id = meta.id;
326        let mut procedures = self.procedures.write().unwrap();
327        match procedures.entry(procedure_id) {
328            Entry::Occupied(_) => return false,
329            Entry::Vacant(vacant_entry) => {
330                vacant_entry.insert(meta);
331            }
332        }
333
334        let mut running_procedures = self.running_procedures.lock().unwrap();
335        running_procedures.insert(procedure_id);
336
337        true
338    }
339
340    /// Returns the [ProcedureState] of specific `procedure_id`.
341    fn state(&self, procedure_id: ProcedureId) -> Option<ProcedureState> {
342        let procedures = self.procedures.read().unwrap();
343        procedures.get(&procedure_id).map(|meta| meta.state())
344    }
345
346    /// Returns the [ProcedureMeta] of all procedures.
347    fn list_procedure(&self) -> Vec<ProcedureInfo> {
348        let procedures = self.procedures.read().unwrap();
349        procedures
350            .values()
351            .map(|meta| ProcedureInfo {
352                id: meta.id,
353                type_name: meta.type_name.clone(),
354                start_time_ms: meta.start_time_ms.load(Ordering::Relaxed),
355                end_time_ms: meta.end_time_ms.load(Ordering::Relaxed),
356                state: meta.state(),
357                lock_keys: meta.lock_key.get_keys(),
358            })
359            .collect()
360    }
361
362    /// Returns the [Watcher] of specific `procedure_id`.
363    fn watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
364        let procedures = self.procedures.read().unwrap();
365        procedures
366            .get(&procedure_id)
367            .map(|meta| meta.state_receiver.clone())
368    }
369
370    /// Notify a suspended parent procedure with specific `procedure_id` by its subprocedure.
371    fn notify_by_subprocedure(&self, procedure_id: ProcedureId) {
372        let procedures = self.procedures.read().unwrap();
373        if let Some(meta) = procedures.get(&procedure_id) {
374            meta.child_notify.notify_one();
375        }
376    }
377
378    /// Load procedure from specific [ProcedureMessage].
379    fn load_one_procedure_from_message(
380        &self,
381        procedure_id: ProcedureId,
382        message: &ProcedureMessage,
383    ) -> Option<LoadedProcedure> {
384        let loaders = self.loaders.lock().unwrap();
385        let loader = loaders.get(&message.type_name).or_else(|| {
386            error!(
387                "Loader not found, procedure_id: {}, type_name: {}",
388                procedure_id, message.type_name
389            );
390            None
391        })?;
392
393        let procedure = loader(&message.data)
394            .map_err(|e| {
395                error!(
396                    "Failed to load procedure data, key: {}, source: {:?}",
397                    procedure_id, e
398                );
399                e
400            })
401            .ok()?;
402
403        Some(LoadedProcedure {
404            procedure,
405            step: message.step,
406        })
407    }
408
409    /// Returns all procedures in the tree (including given `root` procedure).
410    ///
411    /// If callers need a consistent view of the tree, they must ensure no new
412    /// procedure is added to the tree during using this method.
413    fn procedures_in_tree(&self, root: &ProcedureMetaRef) -> Vec<ProcedureId> {
414        let sub_num = root.num_children();
415        // Reserve capacity for the root procedure and its children.
416        let mut procedures = Vec::with_capacity(1 + sub_num);
417
418        let mut queue = VecDeque::with_capacity(1 + sub_num);
419        // Push the root procedure to the queue.
420        queue.push_back(root.clone());
421
422        let mut children_ids = Vec::with_capacity(sub_num);
423        let mut children = Vec::with_capacity(sub_num);
424        while let Some(meta) = queue.pop_front() {
425            procedures.push(meta.id);
426
427            // Find metadatas of children.
428            children_ids.clear();
429            meta.list_children(&mut children_ids);
430            self.find_procedures(&children_ids, &mut children);
431
432            // Traverse children later.
433            for child in children.drain(..) {
434                queue.push_back(child);
435            }
436        }
437
438        procedures
439    }
440
441    /// Finds procedures by given `procedure_ids`.
442    ///
443    /// Ignores the id if corresponding procedure is not found.
444    fn find_procedures(&self, procedure_ids: &[ProcedureId], metas: &mut Vec<ProcedureMetaRef>) {
445        let procedures = self.procedures.read().unwrap();
446        for procedure_id in procedure_ids {
447            if let Some(meta) = procedures.get(procedure_id) {
448                metas.push(meta.clone());
449            }
450        }
451    }
452
453    /// Clean resources of finished procedures.
454    fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
455        // Since users need to query the procedure state, so we can't remove the
456        // meta of the procedure directly.
457        let now = Instant::now();
458        let mut finished_procedures = self.finished_procedures.lock().unwrap();
459        finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
460
461        // Remove the procedures from the running set.
462        let mut running_procedures = self.running_procedures.lock().unwrap();
463        for procedure_id in procedure_ids {
464            running_procedures.remove(procedure_id);
465        }
466    }
467
468    /// Remove metadata of outdated procedures.
469    fn remove_outdated_meta(&self, ttl: Duration) {
470        let ids = {
471            let mut finished_procedures = self.finished_procedures.lock().unwrap();
472            if finished_procedures.is_empty() {
473                return;
474            }
475
476            let mut ids_to_remove = Vec::new();
477            while let Some((id, finish_time)) = finished_procedures.front() {
478                if finish_time.elapsed() > ttl {
479                    ids_to_remove.push(*id);
480                    let _ = finished_procedures.pop_front();
481                } else {
482                    // The rest procedures are finished later, so we can break
483                    // the loop.
484                    break;
485                }
486            }
487            ids_to_remove
488        };
489
490        if ids.is_empty() {
491            return;
492        }
493
494        let mut procedures = self.procedures.write().unwrap();
495        for id in ids {
496            let _ = procedures.remove(&id);
497        }
498    }
499}
500
501/// Config for [LocalManager].
502#[derive(Debug)]
503pub struct ManagerConfig {
504    pub parent_path: String,
505    pub max_retry_times: usize,
506    pub retry_delay: Duration,
507    pub remove_outdated_meta_task_interval: Duration,
508    pub remove_outdated_meta_ttl: Duration,
509    pub max_running_procedures: usize,
510}
511
512impl Default for ManagerConfig {
513    fn default() -> Self {
514        Self {
515            parent_path: String::default(),
516            max_retry_times: 3,
517            retry_delay: Duration::from_millis(500),
518            remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
519            remove_outdated_meta_ttl: META_TTL,
520            max_running_procedures: 128,
521        }
522    }
523}
524
525/// A [ProcedureManager] that maintains procedure states locally.
526pub struct LocalManager {
527    manager_ctx: Arc<ManagerContext>,
528    procedure_store: Arc<ProcedureStore>,
529    max_retry_times: usize,
530    retry_delay: Duration,
531    /// GC task.
532    remove_outdated_meta_task: TokioMutex<Option<RepeatedTask<Error>>>,
533    config: ManagerConfig,
534}
535
536impl LocalManager {
537    /// Create a new [LocalManager] with specific `config`.
538    pub fn new(
539        config: ManagerConfig,
540        state_store: StateStoreRef,
541        poison_store: PoisonStoreRef,
542    ) -> LocalManager {
543        let manager_ctx = Arc::new(ManagerContext::new(poison_store));
544
545        LocalManager {
546            manager_ctx,
547            procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)),
548            max_retry_times: config.max_retry_times,
549            retry_delay: config.retry_delay,
550            remove_outdated_meta_task: TokioMutex::new(None),
551            config,
552        }
553    }
554
555    /// Build remove outedated meta task
556    pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask<Error> {
557        RepeatedTask::new(
558            self.config.remove_outdated_meta_task_interval,
559            Box::new(RemoveOutdatedMetaFunction {
560                manager_ctx: self.manager_ctx.clone(),
561                ttl: self.config.remove_outdated_meta_ttl,
562            }),
563        )
564    }
565
566    /// Submit a root procedure with given `procedure_id`.
567    fn submit_root(
568        &self,
569        procedure_id: ProcedureId,
570        procedure_state: ProcedureState,
571        step: u32,
572        procedure: BoxedProcedure,
573    ) -> Result<Watcher> {
574        ensure!(self.manager_ctx.running(), ManagerNotStartSnafu);
575
576        let meta = Arc::new(ProcedureMeta::new(
577            procedure_id,
578            procedure_state,
579            None,
580            procedure.lock_key(),
581            procedure.poison_keys(),
582            procedure.type_name(),
583        ));
584        let runner = Runner {
585            meta: meta.clone(),
586            procedure,
587            manager_ctx: self.manager_ctx.clone(),
588            step,
589            exponential_builder: ExponentialBuilder::default()
590                .with_min_delay(self.retry_delay)
591                .with_max_times(self.max_retry_times),
592            store: self.procedure_store.clone(),
593            rolling_back: false,
594        };
595
596        let watcher = meta.state_receiver.clone();
597
598        ensure!(
599            self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
600            TooManyRunningProceduresSnafu {
601                max_running_procedures: self.config.max_running_procedures,
602            }
603        );
604
605        // Inserts meta into the manager before actually spawnd the runner.
606        ensure!(
607            self.manager_ctx.try_insert_procedure(meta),
608            DuplicateProcedureSnafu { procedure_id },
609        );
610
611        let tracing_context = TracingContext::from_current_span();
612
613        let _handle = common_runtime::spawn_global(async move {
614            // Run the root procedure.
615            // The task was moved to another runtime for execution.
616            // In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
617            runner
618                .run()
619                .trace(
620                    tracing_context
621                        .attach(tracing::info_span!("LocalManager::submit_root_procedure")),
622                )
623                .await;
624        });
625
626        Ok(watcher)
627    }
628
629    fn submit_recovered_messages(
630        &self,
631        messages: HashMap<ProcedureId, ProcedureMessage>,
632        init_state: InitProcedureState,
633    ) {
634        for (procedure_id, message) in &messages {
635            if message.parent_id.is_none() {
636                // This is the root procedure. We only submit the root procedure as it will
637                // submit sub-procedures to the manager.
638                let Some(mut loaded_procedure) = self
639                    .manager_ctx
640                    .load_one_procedure_from_message(*procedure_id, message)
641                else {
642                    // Try to load other procedures.
643                    continue;
644                };
645
646                info!(
647                    "Recover root procedure {}-{}, step: {}",
648                    loaded_procedure.procedure.type_name(),
649                    procedure_id,
650                    loaded_procedure.step
651                );
652
653                let procedure_state = match init_state {
654                    InitProcedureState::RollingBack => ProcedureState::RollingBack {
655                        error: Arc::new(
656                            error::RollbackProcedureRecoveredSnafu {
657                                error: message.error.clone().unwrap_or("Unknown error".to_string()),
658                            }
659                            .build(),
660                        ),
661                    },
662                    InitProcedureState::Running => ProcedureState::Running,
663                };
664
665                if let Err(e) = loaded_procedure.procedure.recover() {
666                    error!(e; "Failed to recover procedure {}", procedure_id);
667                }
668
669                if let Err(e) = self.submit_root(
670                    *procedure_id,
671                    procedure_state,
672                    loaded_procedure.step,
673                    loaded_procedure.procedure,
674                ) {
675                    error!(e; "Failed to recover procedure {}", procedure_id);
676                }
677            }
678        }
679    }
680
681    /// Recovers unfinished procedures and reruns them.
682    async fn recover(&self) -> Result<()> {
683        info!("LocalManager start to recover");
684        let recover_start = Instant::now();
685
686        let ProcedureMessages {
687            messages,
688            rollback_messages,
689            finished_ids,
690        } = self.procedure_store.load_messages().await?;
691        // Submits recovered messages first.
692        self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack);
693        self.submit_recovered_messages(messages, InitProcedureState::Running);
694
695        if !finished_ids.is_empty() {
696            info!(
697                "LocalManager try to clean finished procedures, num: {}",
698                finished_ids.len()
699            );
700
701            for procedure_id in finished_ids {
702                if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
703                    error!(e; "Failed to delete procedure {}", procedure_id);
704                }
705            }
706        }
707
708        info!(
709            "LocalManager finish recovery, cost: {}ms",
710            recover_start.elapsed().as_millis()
711        );
712
713        Ok(())
714    }
715
716    #[cfg(any(test, feature = "testing"))]
717    /// Returns true if contains a specified loader.
718    pub fn contains_loader(&self, name: &str) -> bool {
719        let loaders = self.manager_ctx.loaders.lock().unwrap();
720        loaders.contains_key(name)
721    }
722}
723
724#[async_trait]
725impl ProcedureManager for LocalManager {
726    fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> {
727        let mut loaders = self.manager_ctx.loaders.lock().unwrap();
728        ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name });
729
730        let _ = loaders.insert(name.to_string(), loader);
731
732        Ok(())
733    }
734
735    async fn start(&self) -> Result<()> {
736        let mut task = self.remove_outdated_meta_task.lock().await;
737
738        if task.is_some() {
739            return Ok(());
740        }
741
742        let task_inner = self.build_remove_outdated_meta_task();
743
744        task_inner
745            .start(common_runtime::global_runtime())
746            .context(StartRemoveOutdatedMetaTaskSnafu)?;
747
748        *task = Some(task_inner);
749
750        self.manager_ctx.start();
751
752        info!("LocalManager is start.");
753
754        self.recover().await
755    }
756
757    async fn stop(&self) -> Result<()> {
758        let mut task = self.remove_outdated_meta_task.lock().await;
759
760        if let Some(task) = task.take() {
761            task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?;
762        }
763
764        self.manager_ctx.stop();
765
766        info!("LocalManager is stopped.");
767
768        Ok(())
769    }
770
771    async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher> {
772        let procedure_id = procedure.id;
773        ensure!(
774            !self.manager_ctx.contains_procedure(procedure_id),
775            DuplicateProcedureSnafu { procedure_id }
776        );
777
778        self.submit_root(
779            procedure.id,
780            ProcedureState::Running,
781            0,
782            procedure.procedure,
783        )
784    }
785
786    async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
787        Ok(self.manager_ctx.state(procedure_id))
788    }
789
790    fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
791        self.manager_ctx.watcher(procedure_id)
792    }
793
794    async fn list_procedures(&self) -> Result<Vec<ProcedureInfo>> {
795        Ok(self.manager_ctx.list_procedure())
796    }
797}
798
799struct RemoveOutdatedMetaFunction {
800    manager_ctx: Arc<ManagerContext>,
801    ttl: Duration,
802}
803
804#[async_trait::async_trait]
805impl TaskFunction<Error> for RemoveOutdatedMetaFunction {
806    fn name(&self) -> &str {
807        "ProcedureManager-remove-outdated-meta-task"
808    }
809
810    async fn call(&mut self) -> Result<()> {
811        self.manager_ctx.remove_outdated_meta(self.ttl);
812        Ok(())
813    }
814}
815
816/// Create a new [ProcedureMeta] for test purpose.
817#[cfg(test)]
818pub(crate) mod test_util {
819    use common_test_util::temp_dir::TempDir;
820    use object_store::services::Fs as Builder;
821    use object_store::ObjectStore;
822
823    use super::*;
824
825    pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
826        ProcedureMeta::new(
827            ProcedureId::random(),
828            ProcedureState::Running,
829            None,
830            LockKey::default(),
831            PoisonKeys::default(),
832            "ProcedureAdapter",
833        )
834    }
835
836    pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
837        let store_dir = dir.path().to_str().unwrap();
838        let builder = Builder::default();
839        ObjectStore::new(builder.root(store_dir)).unwrap().finish()
840    }
841}
842
843#[cfg(test)]
844mod tests {
845    use std::assert_matches::assert_matches;
846
847    use common_error::mock::MockError;
848    use common_error::status_code::StatusCode;
849    use common_test_util::temp_dir::create_temp_dir;
850    use tokio::time::timeout;
851
852    use super::*;
853    use crate::error::{self, Error};
854    use crate::store::state_store::ObjectStateStore;
855    use crate::test_util::InMemoryPoisonStore;
856    use crate::{Context, Procedure, Status};
857
858    fn new_test_manager_context() -> ManagerContext {
859        let poison_manager = Arc::new(InMemoryPoisonStore::default());
860        ManagerContext::new(poison_manager)
861    }
862
863    #[test]
864    fn test_manager_context() {
865        let ctx = new_test_manager_context();
866        let meta = Arc::new(test_util::procedure_meta_for_test());
867
868        assert!(!ctx.contains_procedure(meta.id));
869        assert!(ctx.state(meta.id).is_none());
870
871        assert!(ctx.try_insert_procedure(meta.clone()));
872        assert!(ctx.contains_procedure(meta.id));
873
874        assert!(ctx.state(meta.id).unwrap().is_running());
875        meta.set_state(ProcedureState::Done { output: None });
876        assert!(ctx.state(meta.id).unwrap().is_done());
877    }
878
879    #[test]
880    fn test_manager_context_insert_duplicate() {
881        let ctx = new_test_manager_context();
882        let meta = Arc::new(test_util::procedure_meta_for_test());
883
884        assert!(ctx.try_insert_procedure(meta.clone()));
885        assert!(!ctx.try_insert_procedure(meta));
886    }
887
888    fn new_child(parent_id: ProcedureId, ctx: &ManagerContext) -> ProcedureMetaRef {
889        let mut child = test_util::procedure_meta_for_test();
890        child.parent_id = Some(parent_id);
891        let child = Arc::new(child);
892        assert!(ctx.try_insert_procedure(child.clone()));
893
894        let mut parent = Vec::new();
895        ctx.find_procedures(&[parent_id], &mut parent);
896        parent[0].push_child(child.id);
897
898        child
899    }
900
901    #[test]
902    fn test_procedures_in_tree() {
903        let ctx = new_test_manager_context();
904        let root = Arc::new(test_util::procedure_meta_for_test());
905        assert!(ctx.try_insert_procedure(root.clone()));
906
907        assert_eq!(1, ctx.procedures_in_tree(&root).len());
908
909        let child1 = new_child(root.id, &ctx);
910        let child2 = new_child(root.id, &ctx);
911
912        let child3 = new_child(child1.id, &ctx);
913        let child4 = new_child(child1.id, &ctx);
914
915        let child5 = new_child(child2.id, &ctx);
916
917        let expect = vec![
918            root.id, child1.id, child2.id, child3.id, child4.id, child5.id,
919        ];
920        assert_eq!(expect, ctx.procedures_in_tree(&root));
921    }
922
923    #[derive(Debug)]
924    struct ProcedureToLoad {
925        content: String,
926        lock_key: LockKey,
927        poison_keys: PoisonKeys,
928    }
929
930    #[async_trait]
931    impl Procedure for ProcedureToLoad {
932        fn type_name(&self) -> &str {
933            "ProcedureToLoad"
934        }
935
936        async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
937            Ok(Status::done())
938        }
939
940        fn dump(&self) -> Result<String> {
941            Ok(self.content.clone())
942        }
943
944        fn lock_key(&self) -> LockKey {
945            self.lock_key.clone()
946        }
947
948        fn poison_keys(&self) -> PoisonKeys {
949            self.poison_keys.clone()
950        }
951    }
952
953    impl ProcedureToLoad {
954        fn new(content: &str) -> ProcedureToLoad {
955            ProcedureToLoad {
956                content: content.to_string(),
957                lock_key: LockKey::default(),
958                poison_keys: PoisonKeys::default(),
959            }
960        }
961
962        fn loader() -> BoxedProcedureLoader {
963            let f = |json: &str| {
964                let procedure = ProcedureToLoad::new(json);
965                Ok(Box::new(procedure) as _)
966            };
967            Box::new(f)
968        }
969    }
970
971    #[test]
972    fn test_register_loader() {
973        let dir = create_temp_dir("register");
974        let config = ManagerConfig {
975            parent_path: "data/".to_string(),
976            max_retry_times: 3,
977            retry_delay: Duration::from_millis(500),
978            ..Default::default()
979        };
980        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
981        let poison_manager = Arc::new(InMemoryPoisonStore::new());
982        let manager = LocalManager::new(config, state_store, poison_manager);
983        manager.manager_ctx.start();
984
985        manager
986            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
987            .unwrap();
988        // Register duplicate loader.
989        let err = manager
990            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
991            .unwrap_err();
992        assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
993    }
994
995    #[tokio::test]
996    async fn test_recover() {
997        let dir = create_temp_dir("recover");
998        let object_store = test_util::new_object_store(&dir);
999        let config = ManagerConfig {
1000            parent_path: "data/".to_string(),
1001            max_retry_times: 3,
1002            retry_delay: Duration::from_millis(500),
1003            ..Default::default()
1004        };
1005        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1006        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1007        let manager = LocalManager::new(config, state_store, poison_manager);
1008        manager.manager_ctx.start();
1009
1010        manager
1011            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1012            .unwrap();
1013
1014        // Prepare data
1015        let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1016        let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
1017        let root_id = ProcedureId::random();
1018        // Prepare data for the root procedure.
1019        for step in 0..3 {
1020            let type_name = root.type_name().to_string();
1021            let data = root.dump().unwrap();
1022            procedure_store
1023                .store_procedure(root_id, step, type_name, data, None)
1024                .await
1025                .unwrap();
1026        }
1027
1028        let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure"));
1029        let child_id = ProcedureId::random();
1030        // Prepare data for the child procedure
1031        for step in 0..2 {
1032            let type_name = child.type_name().to_string();
1033            let data = child.dump().unwrap();
1034            procedure_store
1035                .store_procedure(child_id, step, type_name, data, Some(root_id))
1036                .await
1037                .unwrap();
1038        }
1039
1040        // Recover the manager
1041        manager.recover().await.unwrap();
1042
1043        // The manager should submit the root procedure.
1044        let _ = manager.procedure_state(root_id).await.unwrap().unwrap();
1045        // Since the mocked root procedure actually doesn't submit subprocedures, so there is no
1046        // related state.
1047        assert!(manager.procedure_state(child_id).await.unwrap().is_none());
1048    }
1049
1050    #[tokio::test]
1051    async fn test_submit_procedure() {
1052        let dir = create_temp_dir("submit");
1053        let config = ManagerConfig {
1054            parent_path: "data/".to_string(),
1055            max_retry_times: 3,
1056            retry_delay: Duration::from_millis(500),
1057            ..Default::default()
1058        };
1059        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1060        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1061        let manager = LocalManager::new(config, state_store, poison_manager);
1062        manager.manager_ctx.start();
1063
1064        let procedure_id = ProcedureId::random();
1065        assert!(manager
1066            .procedure_state(procedure_id)
1067            .await
1068            .unwrap()
1069            .is_none());
1070        assert!(manager.procedure_watcher(procedure_id).is_none());
1071
1072        let mut procedure = ProcedureToLoad::new("submit");
1073        procedure.lock_key = LockKey::single_exclusive("test.submit");
1074        assert!(manager
1075            .submit(ProcedureWithId {
1076                id: procedure_id,
1077                procedure: Box::new(procedure),
1078            })
1079            .await
1080            .is_ok());
1081        assert!(manager
1082            .procedure_state(procedure_id)
1083            .await
1084            .unwrap()
1085            .is_some());
1086        // Wait for the procedure done.
1087        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1088        watcher.changed().await.unwrap();
1089        assert!(watcher.borrow().is_done());
1090
1091        // Try to submit procedure with same id again.
1092        let err = manager
1093            .submit(ProcedureWithId {
1094                id: procedure_id,
1095                procedure: Box::new(ProcedureToLoad::new("submit")),
1096            })
1097            .await
1098            .unwrap_err();
1099        assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}");
1100    }
1101
1102    #[tokio::test]
1103    async fn test_state_changed_on_err() {
1104        let dir = create_temp_dir("on_err");
1105        let config = ManagerConfig {
1106            parent_path: "data/".to_string(),
1107            max_retry_times: 3,
1108            retry_delay: Duration::from_millis(500),
1109            ..Default::default()
1110        };
1111        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1112        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1113        let manager = LocalManager::new(config, state_store, poison_manager);
1114        manager.manager_ctx.start();
1115
1116        #[derive(Debug)]
1117        struct MockProcedure {
1118            panic: bool,
1119        }
1120
1121        #[async_trait]
1122        impl Procedure for MockProcedure {
1123            fn type_name(&self) -> &str {
1124                "MockProcedure"
1125            }
1126
1127            async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1128                if self.panic {
1129                    // Test the runner can set the state to failed even the procedure
1130                    // panics.
1131                    panic!();
1132                } else {
1133                    Err(Error::external(MockError::new(StatusCode::Unexpected)))
1134                }
1135            }
1136
1137            async fn rollback(&mut self, _: &Context) -> Result<()> {
1138                Ok(())
1139            }
1140
1141            fn rollback_supported(&self) -> bool {
1142                true
1143            }
1144
1145            fn dump(&self) -> Result<String> {
1146                Ok(String::new())
1147            }
1148
1149            fn lock_key(&self) -> LockKey {
1150                LockKey::single_exclusive("test.submit")
1151            }
1152
1153            fn poison_keys(&self) -> PoisonKeys {
1154                PoisonKeys::default()
1155            }
1156        }
1157
1158        let check_procedure = |procedure| async {
1159            let procedure_id = ProcedureId::random();
1160            manager
1161                .submit(ProcedureWithId {
1162                    id: procedure_id,
1163                    procedure: Box::new(procedure),
1164                })
1165                .await
1166                .unwrap()
1167        };
1168
1169        let mut watcher = check_procedure(MockProcedure { panic: false }).await;
1170        // Wait for the notification.
1171        watcher.changed().await.unwrap();
1172        assert!(watcher.borrow().is_prepare_rollback());
1173        watcher.changed().await.unwrap();
1174        assert!(watcher.borrow().is_rolling_back());
1175        watcher.changed().await.unwrap();
1176        assert!(watcher.borrow().is_failed());
1177        // The runner won't rollback a panicked procedure.
1178        let mut watcher = check_procedure(MockProcedure { panic: true }).await;
1179        watcher.changed().await.unwrap();
1180        assert!(watcher.borrow().is_failed());
1181    }
1182
1183    #[tokio::test]
1184    async fn test_procedure_manager_stopped() {
1185        let dir = create_temp_dir("procedure_manager_stopped");
1186        let config = ManagerConfig {
1187            parent_path: "data/".to_string(),
1188            max_retry_times: 3,
1189            retry_delay: Duration::from_millis(500),
1190            ..Default::default()
1191        };
1192        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1193        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1194        let manager = LocalManager::new(config, state_store, poison_manager);
1195
1196        let mut procedure = ProcedureToLoad::new("submit");
1197        procedure.lock_key = LockKey::single_exclusive("test.submit");
1198        let procedure_id = ProcedureId::random();
1199        assert_matches!(
1200            manager
1201                .submit(ProcedureWithId {
1202                    id: procedure_id,
1203                    procedure: Box::new(procedure),
1204                })
1205                .await
1206                .unwrap_err(),
1207            error::Error::ManagerNotStart { .. }
1208        );
1209    }
1210
1211    #[tokio::test]
1212    async fn test_procedure_manager_restart() {
1213        let dir = create_temp_dir("procedure_manager_restart");
1214        let config = ManagerConfig {
1215            parent_path: "data/".to_string(),
1216            max_retry_times: 3,
1217            retry_delay: Duration::from_millis(500),
1218            ..Default::default()
1219        };
1220        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1221        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1222        let manager = LocalManager::new(config, state_store, poison_manager);
1223
1224        manager.start().await.unwrap();
1225        manager.stop().await.unwrap();
1226        manager.start().await.unwrap();
1227
1228        let mut procedure = ProcedureToLoad::new("submit");
1229        procedure.lock_key = LockKey::single_exclusive("test.submit");
1230        let procedure_id = ProcedureId::random();
1231        assert!(manager
1232            .submit(ProcedureWithId {
1233                id: procedure_id,
1234                procedure: Box::new(procedure),
1235            })
1236            .await
1237            .is_ok());
1238        assert!(manager
1239            .procedure_state(procedure_id)
1240            .await
1241            .unwrap()
1242            .is_some());
1243    }
1244
1245    #[tokio::test(flavor = "multi_thread")]
1246    async fn test_remove_outdated_meta_task() {
1247        let dir = create_temp_dir("remove_outdated_meta_task");
1248        let object_store = test_util::new_object_store(&dir);
1249        let config = ManagerConfig {
1250            parent_path: "data/".to_string(),
1251            max_retry_times: 3,
1252            retry_delay: Duration::from_millis(500),
1253            remove_outdated_meta_task_interval: Duration::from_millis(1),
1254            remove_outdated_meta_ttl: Duration::from_millis(1),
1255            max_running_procedures: 128,
1256        };
1257        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1258        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1259        let manager = LocalManager::new(config, state_store, poison_manager);
1260        manager.manager_ctx.set_running();
1261
1262        let mut procedure = ProcedureToLoad::new("submit");
1263        procedure.lock_key = LockKey::single_exclusive("test.submit");
1264        let procedure_id = ProcedureId::random();
1265        assert!(manager
1266            .submit(ProcedureWithId {
1267                id: procedure_id,
1268                procedure: Box::new(procedure),
1269            })
1270            .await
1271            .is_ok());
1272        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1273        watcher.changed().await.unwrap();
1274
1275        manager.start().await.unwrap();
1276        tokio::time::sleep(Duration::from_millis(300)).await;
1277        assert!(manager
1278            .procedure_state(procedure_id)
1279            .await
1280            .unwrap()
1281            .is_none());
1282
1283        // The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed.
1284        manager.stop().await.unwrap();
1285        let mut procedure = ProcedureToLoad::new("submit");
1286        procedure.lock_key = LockKey::single_exclusive("test.submit");
1287        let procedure_id = ProcedureId::random();
1288
1289        manager.manager_ctx.set_running();
1290        assert!(manager
1291            .submit(ProcedureWithId {
1292                id: procedure_id,
1293                procedure: Box::new(procedure),
1294            })
1295            .await
1296            .is_ok());
1297        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1298        watcher.changed().await.unwrap();
1299        tokio::time::sleep(Duration::from_millis(300)).await;
1300        assert!(manager
1301            .procedure_state(procedure_id)
1302            .await
1303            .unwrap()
1304            .is_some());
1305
1306        // After restart
1307        let mut procedure = ProcedureToLoad::new("submit");
1308        procedure.lock_key = LockKey::single_exclusive("test.submit");
1309        let procedure_id = ProcedureId::random();
1310        assert!(manager
1311            .submit(ProcedureWithId {
1312                id: procedure_id,
1313                procedure: Box::new(procedure),
1314            })
1315            .await
1316            .is_ok());
1317        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1318        watcher.changed().await.unwrap();
1319
1320        manager.start().await.unwrap();
1321        tokio::time::sleep(Duration::from_millis(300)).await;
1322        assert!(manager
1323            .procedure_state(procedure_id)
1324            .await
1325            .unwrap()
1326            .is_none());
1327    }
1328
1329    #[tokio::test]
1330    async fn test_too_many_running_procedures() {
1331        let dir = create_temp_dir("too_many_running_procedures");
1332        let config = ManagerConfig {
1333            parent_path: "data/".to_string(),
1334            max_retry_times: 3,
1335            retry_delay: Duration::from_millis(500),
1336            max_running_procedures: 1,
1337            ..Default::default()
1338        };
1339        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1340        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1341        let manager = LocalManager::new(config, state_store, poison_manager);
1342        manager.manager_ctx.set_running();
1343
1344        manager
1345            .manager_ctx
1346            .running_procedures
1347            .lock()
1348            .unwrap()
1349            .insert(ProcedureId::random());
1350        manager.start().await.unwrap();
1351
1352        // Submit a new procedure should fail.
1353        let mut procedure = ProcedureToLoad::new("submit");
1354        procedure.lock_key = LockKey::single_exclusive("test.submit");
1355        let procedure_id = ProcedureId::random();
1356        let err = manager
1357            .submit(ProcedureWithId {
1358                id: procedure_id,
1359                procedure: Box::new(procedure),
1360            })
1361            .await
1362            .unwrap_err();
1363        assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
1364
1365        manager
1366            .manager_ctx
1367            .running_procedures
1368            .lock()
1369            .unwrap()
1370            .clear();
1371
1372        // Submit a new procedure should succeed.
1373        let mut procedure = ProcedureToLoad::new("submit");
1374        procedure.lock_key = LockKey::single_exclusive("test.submit");
1375        assert!(manager
1376            .submit(ProcedureWithId {
1377                id: procedure_id,
1378                procedure: Box::new(procedure),
1379            })
1380            .await
1381            .is_ok());
1382        assert!(manager
1383            .procedure_state(procedure_id)
1384            .await
1385            .unwrap()
1386            .is_some());
1387        // Wait for the procedure done.
1388        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1389        watcher.changed().await.unwrap();
1390        assert!(watcher.borrow().is_done());
1391    }
1392
1393    #[derive(Debug)]
1394    struct ProcedureToRecover {
1395        content: String,
1396        lock_key: LockKey,
1397        notify: Option<Arc<Notify>>,
1398        poison_keys: PoisonKeys,
1399    }
1400
1401    #[async_trait]
1402    impl Procedure for ProcedureToRecover {
1403        fn type_name(&self) -> &str {
1404            "ProcedureToRecover"
1405        }
1406
1407        async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1408            Ok(Status::done())
1409        }
1410
1411        fn dump(&self) -> Result<String> {
1412            Ok(self.content.clone())
1413        }
1414
1415        fn lock_key(&self) -> LockKey {
1416            self.lock_key.clone()
1417        }
1418
1419        fn recover(&mut self) -> Result<()> {
1420            self.notify.as_ref().unwrap().notify_one();
1421            Ok(())
1422        }
1423
1424        fn poison_keys(&self) -> PoisonKeys {
1425            self.poison_keys.clone()
1426        }
1427    }
1428
1429    impl ProcedureToRecover {
1430        fn new(content: &str) -> ProcedureToRecover {
1431            ProcedureToRecover {
1432                content: content.to_string(),
1433                lock_key: LockKey::default(),
1434                poison_keys: PoisonKeys::default(),
1435                notify: None,
1436            }
1437        }
1438
1439        fn loader(notify: Arc<Notify>) -> BoxedProcedureLoader {
1440            let f = move |json: &str| {
1441                let procedure = ProcedureToRecover {
1442                    content: json.to_string(),
1443                    lock_key: LockKey::default(),
1444                    poison_keys: PoisonKeys::default(),
1445                    notify: Some(notify.clone()),
1446                };
1447                Ok(Box::new(procedure) as _)
1448            };
1449            Box::new(f)
1450        }
1451    }
1452
1453    #[tokio::test]
1454    async fn test_procedure_recover() {
1455        common_telemetry::init_default_ut_logging();
1456        let dir = create_temp_dir("procedure_recover");
1457        let object_store = test_util::new_object_store(&dir);
1458        let config = ManagerConfig {
1459            parent_path: "data/".to_string(),
1460            max_retry_times: 3,
1461            retry_delay: Duration::from_millis(500),
1462            ..Default::default()
1463        };
1464        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1465        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1466        let manager = LocalManager::new(config, state_store, poison_manager);
1467        manager.manager_ctx.start();
1468
1469        let notify = Arc::new(Notify::new());
1470        manager
1471            .register_loader(
1472                "ProcedureToRecover",
1473                ProcedureToRecover::loader(notify.clone()),
1474            )
1475            .unwrap();
1476
1477        // Prepare data
1478        let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1479        let root: BoxedProcedure = Box::new(ProcedureToRecover::new("test procedure recovery"));
1480        let root_id = ProcedureId::random();
1481        // Prepare data for the root procedure.
1482        for step in 0..3 {
1483            let type_name = root.type_name().to_string();
1484            let data = root.dump().unwrap();
1485            procedure_store
1486                .store_procedure(root_id, step, type_name, data, None)
1487                .await
1488                .unwrap();
1489        }
1490
1491        // Recover the manager
1492        manager.recover().await.unwrap();
1493        timeout(Duration::from_secs(10), notify.notified())
1494            .await
1495            .unwrap();
1496    }
1497}