common_procedure/
local.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod runner;
16
17use std::collections::hash_map::Entry;
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20use std::sync::{Arc, Mutex, RwLock};
21use std::time::{Duration, Instant};
22
23use async_trait::async_trait;
24use backon::ExponentialBuilder;
25use common_error::ext::BoxedError;
26use common_runtime::{RepeatedTask, TaskFunction};
27use common_telemetry::tracing_context::{FutureExt, TracingContext};
28use common_telemetry::{error, info, tracing};
29use snafu::{ensure, OptionExt, ResultExt};
30use tokio::sync::watch::{self, Receiver, Sender};
31use tokio::sync::{Mutex as TokioMutex, Notify};
32
33use crate::error::{
34    self, CheckStatusSnafu, DuplicateProcedureSnafu, Error, LoaderConflictSnafu,
35    ManagerNotStartSnafu, ManagerPasuedSnafu, PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu,
36    Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
37    TooManyRunningProceduresSnafu,
38};
39use crate::local::runner::Runner;
40use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
41use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
42use crate::store::poison_store::PoisonStoreRef;
43use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
44use crate::{
45    BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
46    ProcedureState, ProcedureWithId, StringKey, Watcher,
47};
48
49/// The expired time of a procedure's metadata.
50const META_TTL: Duration = Duration::from_secs(60 * 10);
51
52/// Shared metadata of a procedure.
53///
54/// # Note
55/// [Notify] is not a condition variable, we can't guarantee the waiters are notified
56/// if they didn't call `notified()` before we signal the notify. So we
57/// 1. use dedicated notify for each condition, such as waiting for a lock, waiting
58///    for children;
59/// 2. always use `notify_one` and ensure there are only one waiter.
60#[derive(Debug)]
61pub(crate) struct ProcedureMeta {
62    /// Id of this procedure.
63    id: ProcedureId,
64    /// Type name of this procedure.
65    type_name: String,
66    /// Parent procedure id.
67    parent_id: Option<ProcedureId>,
68    /// Notify to wait for subprocedures.
69    child_notify: Notify,
70    /// Lock required by this procedure.
71    lock_key: LockKey,
72    /// Poison keys that may cause this procedure to become poisoned during execution.
73    poison_keys: PoisonKeys,
74    /// Sender to notify the procedure state.
75    state_sender: Sender<ProcedureState>,
76    /// Receiver to watch the procedure state.
77    state_receiver: Receiver<ProcedureState>,
78    /// Id of child procedures.
79    children: Mutex<Vec<ProcedureId>>,
80    /// Start execution time of this procedure.
81    start_time_ms: AtomicI64,
82    /// End execution time of this procedure.
83    end_time_ms: AtomicI64,
84}
85
86impl ProcedureMeta {
87    fn new(
88        id: ProcedureId,
89        procedure_state: ProcedureState,
90        parent_id: Option<ProcedureId>,
91        lock_key: LockKey,
92        poison_keys: PoisonKeys,
93        type_name: &str,
94    ) -> ProcedureMeta {
95        let (state_sender, state_receiver) = watch::channel(procedure_state);
96        ProcedureMeta {
97            id,
98            parent_id,
99            child_notify: Notify::new(),
100            lock_key,
101            poison_keys,
102            state_sender,
103            state_receiver,
104            children: Mutex::new(Vec::new()),
105            start_time_ms: AtomicI64::new(0),
106            end_time_ms: AtomicI64::new(0),
107            type_name: type_name.to_string(),
108        }
109    }
110
111    /// Returns current [ProcedureState].
112    fn state(&self) -> ProcedureState {
113        self.state_receiver.borrow().clone()
114    }
115
116    /// Update current [ProcedureState].
117    fn set_state(&self, state: ProcedureState) {
118        // Safety: ProcedureMeta also holds the receiver, so `send()` should never fail.
119        self.state_sender.send(state).unwrap();
120    }
121
122    /// Push `procedure_id` of the subprocedure to the metadata.
123    fn push_child(&self, procedure_id: ProcedureId) {
124        let mut children = self.children.lock().unwrap();
125        children.push(procedure_id);
126    }
127
128    /// Append subprocedures to given `buffer`.
129    fn list_children(&self, buffer: &mut Vec<ProcedureId>) {
130        let children = self.children.lock().unwrap();
131        buffer.extend_from_slice(&children);
132    }
133
134    /// Returns the number of subprocedures.
135    fn num_children(&self) -> usize {
136        self.children.lock().unwrap().len()
137    }
138
139    /// update the start time of the procedure.
140    fn set_start_time_ms(&self) {
141        self.start_time_ms
142            .store(common_time::util::current_time_millis(), Ordering::Relaxed);
143    }
144
145    /// update the end time of the procedure.
146    fn set_end_time_ms(&self) {
147        self.end_time_ms
148            .store(common_time::util::current_time_millis(), Ordering::Relaxed);
149    }
150}
151
152/// Reference counted pointer to [ProcedureMeta].
153type ProcedureMetaRef = Arc<ProcedureMeta>;
154
155/// Procedure loaded from store.
156struct LoadedProcedure {
157    procedure: BoxedProcedure,
158    step: u32,
159}
160
161/// The dynamic lock for procedure execution.
162///
163/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
164/// during execution. They are only held when the procedure specifically needs these keys
165/// and are released as soon as the procedure no longer needs them.
166/// This allows for more fine-grained concurrency control during procedure execution.
167pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
168
169/// Acquires a dynamic key lock for the given key.
170///
171/// This function takes a reference to the dynamic key lock and a pointer to the key.
172/// It then matches the key type and acquires the appropriate lock.
173pub async fn acquire_dynamic_key_lock(
174    lock: &DynamicKeyLock,
175    key: &StringKey,
176) -> DynamicKeyLockGuard {
177    match key {
178        StringKey::Share(key) => {
179            let guard = lock.read(key.to_string()).await;
180            DynamicKeyLockGuard {
181                guard: Some(OwnedKeyRwLockGuard::from(guard)),
182                key: key.to_string(),
183                lock: lock.clone(),
184            }
185        }
186        StringKey::Exclusive(key) => {
187            let guard = lock.write(key.to_string()).await;
188            DynamicKeyLockGuard {
189                guard: Some(OwnedKeyRwLockGuard::from(guard)),
190                key: key.to_string(),
191                lock: lock.clone(),
192            }
193        }
194    }
195}
196/// A guard for the dynamic key lock.
197///
198/// This guard is used to release the lock when the procedure no longer needs it.
199/// It also ensures that the lock is cleaned up when the guard is dropped.
200pub struct DynamicKeyLockGuard {
201    guard: Option<OwnedKeyRwLockGuard>,
202    key: String,
203    lock: DynamicKeyLock,
204}
205
206impl Drop for DynamicKeyLockGuard {
207    fn drop(&mut self) {
208        if let Some(guard) = self.guard.take() {
209            drop(guard);
210        }
211        self.lock.clean_keys(&[self.key.to_string()]);
212    }
213}
214
215/// Shared context of the manager.
216pub(crate) struct ManagerContext {
217    /// Procedure loaders. The key is the type name of the procedure which the loader returns.
218    loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
219    /// The key lock for the procedure.
220    ///
221    /// The lock keys are defined in `Procedure::lock_key()`.
222    /// These locks are acquired before the procedure starts and released after the procedure finishes.
223    /// They ensure exclusive access to resources throughout the entire procedure lifecycle.
224    key_lock: KeyRwLock<String>,
225    /// The dynamic lock for procedure execution.
226    ///
227    /// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
228    /// during execution. They are only held when the procedure specifically needs these keys
229    /// and are released as soon as the procedure no longer needs them.
230    /// This allows for more fine-grained concurrency control during procedure execution.
231    dynamic_key_lock: DynamicKeyLock,
232    /// Procedures in the manager.
233    procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
234    /// Running procedures.
235    running_procedures: Mutex<HashSet<ProcedureId>>,
236    /// Ids and finished time of finished procedures.
237    finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
238    /// Running flag.
239    running: Arc<AtomicBool>,
240    /// Poison manager.
241    poison_manager: PoisonStoreRef,
242}
243
244#[async_trait]
245impl ContextProvider for ManagerContext {
246    async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
247        Ok(self.state(procedure_id))
248    }
249
250    async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
251        {
252            // validate the procedure exists
253            let procedures = self.procedures.read().unwrap();
254            let procedure = procedures
255                .get(&procedure_id)
256                .context(ProcedureNotFoundSnafu { procedure_id })?;
257
258            // validate the poison key is defined
259            ensure!(
260                procedure.poison_keys.contains(key),
261                PoisonKeyNotDefinedSnafu {
262                    key: key.clone(),
263                    procedure_id
264                }
265            );
266        }
267        let key = key.to_string();
268        let procedure_id = procedure_id.to_string();
269        self.poison_manager.try_put_poison(key, procedure_id).await
270    }
271
272    async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
273        acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
274    }
275}
276
277impl ManagerContext {
278    /// Returns a new [ManagerContext].
279    fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
280        ManagerContext {
281            key_lock: KeyRwLock::new(),
282            dynamic_key_lock: Arc::new(KeyRwLock::new()),
283            loaders: Mutex::new(HashMap::new()),
284            procedures: RwLock::new(HashMap::new()),
285            running_procedures: Mutex::new(HashSet::new()),
286            finished_procedures: Mutex::new(VecDeque::new()),
287            running: Arc::new(AtomicBool::new(false)),
288            poison_manager,
289        }
290    }
291
292    #[cfg(test)]
293    pub(crate) fn set_running(&self) {
294        self.running.store(true, Ordering::Relaxed);
295    }
296
297    /// Set the running flag.
298    pub(crate) fn start(&self) {
299        self.running.store(true, Ordering::Relaxed);
300    }
301
302    pub(crate) fn stop(&self) {
303        self.running.store(false, Ordering::Relaxed);
304    }
305
306    /// Return `ProcedureManager` is running.
307    pub(crate) fn running(&self) -> bool {
308        self.running.load(Ordering::Relaxed)
309    }
310
311    /// Returns true if the procedure with specific `procedure_id` exists.
312    fn contains_procedure(&self, procedure_id: ProcedureId) -> bool {
313        let procedures = self.procedures.read().unwrap();
314        procedures.contains_key(&procedure_id)
315    }
316
317    /// Returns the number of running procedures.
318    fn num_running_procedures(&self) -> usize {
319        self.running_procedures.lock().unwrap().len()
320    }
321
322    /// Try to insert the `procedure` to the context if there is no procedure
323    /// with same [ProcedureId].
324    ///
325    /// Returns `false` if there is already a procedure using the same [ProcedureId].
326    fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
327        let procedure_id = meta.id;
328        let mut procedures = self.procedures.write().unwrap();
329        match procedures.entry(procedure_id) {
330            Entry::Occupied(_) => return false,
331            Entry::Vacant(vacant_entry) => {
332                vacant_entry.insert(meta);
333            }
334        }
335
336        let mut running_procedures = self.running_procedures.lock().unwrap();
337        running_procedures.insert(procedure_id);
338
339        true
340    }
341
342    /// Returns the [ProcedureState] of specific `procedure_id`.
343    fn state(&self, procedure_id: ProcedureId) -> Option<ProcedureState> {
344        let procedures = self.procedures.read().unwrap();
345        procedures.get(&procedure_id).map(|meta| meta.state())
346    }
347
348    /// Returns the [ProcedureMeta] of all procedures.
349    fn list_procedure(&self) -> Vec<ProcedureInfo> {
350        let procedures = self.procedures.read().unwrap();
351        procedures
352            .values()
353            .map(|meta| ProcedureInfo {
354                id: meta.id,
355                type_name: meta.type_name.clone(),
356                start_time_ms: meta.start_time_ms.load(Ordering::Relaxed),
357                end_time_ms: meta.end_time_ms.load(Ordering::Relaxed),
358                state: meta.state(),
359                lock_keys: meta.lock_key.get_keys(),
360            })
361            .collect()
362    }
363
364    /// Returns the [Watcher] of specific `procedure_id`.
365    fn watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
366        let procedures = self.procedures.read().unwrap();
367        procedures
368            .get(&procedure_id)
369            .map(|meta| meta.state_receiver.clone())
370    }
371
372    /// Notify a suspended parent procedure with specific `procedure_id` by its subprocedure.
373    fn notify_by_subprocedure(&self, procedure_id: ProcedureId) {
374        let procedures = self.procedures.read().unwrap();
375        if let Some(meta) = procedures.get(&procedure_id) {
376            meta.child_notify.notify_one();
377        }
378    }
379
380    /// Load procedure from specific [ProcedureMessage].
381    fn load_one_procedure_from_message(
382        &self,
383        procedure_id: ProcedureId,
384        message: &ProcedureMessage,
385    ) -> Option<LoadedProcedure> {
386        let loaders = self.loaders.lock().unwrap();
387        let loader = loaders.get(&message.type_name).or_else(|| {
388            error!(
389                "Loader not found, procedure_id: {}, type_name: {}",
390                procedure_id, message.type_name
391            );
392            None
393        })?;
394
395        let procedure = loader(&message.data)
396            .map_err(|e| {
397                error!(
398                    "Failed to load procedure data, key: {}, source: {:?}",
399                    procedure_id, e
400                );
401                e
402            })
403            .ok()?;
404
405        Some(LoadedProcedure {
406            procedure,
407            step: message.step,
408        })
409    }
410
411    /// Returns all procedures in the tree (including given `root` procedure).
412    ///
413    /// If callers need a consistent view of the tree, they must ensure no new
414    /// procedure is added to the tree during using this method.
415    fn procedures_in_tree(&self, root: &ProcedureMetaRef) -> Vec<ProcedureId> {
416        let sub_num = root.num_children();
417        // Reserve capacity for the root procedure and its children.
418        let mut procedures = Vec::with_capacity(1 + sub_num);
419
420        let mut queue = VecDeque::with_capacity(1 + sub_num);
421        // Push the root procedure to the queue.
422        queue.push_back(root.clone());
423
424        let mut children_ids = Vec::with_capacity(sub_num);
425        let mut children = Vec::with_capacity(sub_num);
426        while let Some(meta) = queue.pop_front() {
427            procedures.push(meta.id);
428
429            // Find metadatas of children.
430            children_ids.clear();
431            meta.list_children(&mut children_ids);
432            self.find_procedures(&children_ids, &mut children);
433
434            // Traverse children later.
435            for child in children.drain(..) {
436                queue.push_back(child);
437            }
438        }
439
440        procedures
441    }
442
443    /// Finds procedures by given `procedure_ids`.
444    ///
445    /// Ignores the id if corresponding procedure is not found.
446    fn find_procedures(&self, procedure_ids: &[ProcedureId], metas: &mut Vec<ProcedureMetaRef>) {
447        let procedures = self.procedures.read().unwrap();
448        for procedure_id in procedure_ids {
449            if let Some(meta) = procedures.get(procedure_id) {
450                metas.push(meta.clone());
451            }
452        }
453    }
454
455    /// Clean resources of finished procedures.
456    fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
457        // Since users need to query the procedure state, so we can't remove the
458        // meta of the procedure directly.
459        let now = Instant::now();
460        let mut finished_procedures = self.finished_procedures.lock().unwrap();
461        finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
462
463        // Remove the procedures from the running set.
464        let mut running_procedures = self.running_procedures.lock().unwrap();
465        for procedure_id in procedure_ids {
466            running_procedures.remove(procedure_id);
467        }
468    }
469
470    /// Remove metadata of outdated procedures.
471    fn remove_outdated_meta(&self, ttl: Duration) {
472        let ids = {
473            let mut finished_procedures = self.finished_procedures.lock().unwrap();
474            if finished_procedures.is_empty() {
475                return;
476            }
477
478            let mut ids_to_remove = Vec::new();
479            while let Some((id, finish_time)) = finished_procedures.front() {
480                if finish_time.elapsed() > ttl {
481                    ids_to_remove.push(*id);
482                    let _ = finished_procedures.pop_front();
483                } else {
484                    // The rest procedures are finished later, so we can break
485                    // the loop.
486                    break;
487                }
488            }
489            ids_to_remove
490        };
491
492        if ids.is_empty() {
493            return;
494        }
495
496        let mut procedures = self.procedures.write().unwrap();
497        for id in ids {
498            let _ = procedures.remove(&id);
499        }
500    }
501}
502
503/// Config for [LocalManager].
504#[derive(Debug)]
505pub struct ManagerConfig {
506    pub parent_path: String,
507    pub max_retry_times: usize,
508    pub retry_delay: Duration,
509    pub remove_outdated_meta_task_interval: Duration,
510    pub remove_outdated_meta_ttl: Duration,
511    pub max_running_procedures: usize,
512}
513
514impl Default for ManagerConfig {
515    fn default() -> Self {
516        Self {
517            parent_path: String::default(),
518            max_retry_times: 3,
519            retry_delay: Duration::from_millis(500),
520            remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
521            remove_outdated_meta_ttl: META_TTL,
522            max_running_procedures: 128,
523        }
524    }
525}
526
527type PauseAwareRef = Arc<dyn PauseAware>;
528
529#[async_trait]
530pub trait PauseAware: Send + Sync {
531    /// Returns true if the procedure manager is paused.
532    async fn is_paused(&self) -> std::result::Result<bool, BoxedError>;
533}
534
535/// A [ProcedureManager] that maintains procedure states locally.
536pub struct LocalManager {
537    manager_ctx: Arc<ManagerContext>,
538    procedure_store: Arc<ProcedureStore>,
539    max_retry_times: usize,
540    retry_delay: Duration,
541    /// GC task.
542    remove_outdated_meta_task: TokioMutex<Option<RepeatedTask<Error>>>,
543    config: ManagerConfig,
544    pause_aware: Option<PauseAwareRef>,
545}
546
547impl LocalManager {
548    /// Create a new [LocalManager] with specific `config`.
549    pub fn new(
550        config: ManagerConfig,
551        state_store: StateStoreRef,
552        poison_store: PoisonStoreRef,
553        pause_aware: Option<PauseAwareRef>,
554    ) -> LocalManager {
555        let manager_ctx = Arc::new(ManagerContext::new(poison_store));
556
557        LocalManager {
558            manager_ctx,
559            procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)),
560            max_retry_times: config.max_retry_times,
561            retry_delay: config.retry_delay,
562            remove_outdated_meta_task: TokioMutex::new(None),
563            config,
564            pause_aware,
565        }
566    }
567
568    /// Build remove outedated meta task
569    pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask<Error> {
570        RepeatedTask::new(
571            self.config.remove_outdated_meta_task_interval,
572            Box::new(RemoveOutdatedMetaFunction {
573                manager_ctx: self.manager_ctx.clone(),
574                ttl: self.config.remove_outdated_meta_ttl,
575            }),
576        )
577    }
578
579    /// Submit a root procedure with given `procedure_id`.
580    fn submit_root(
581        &self,
582        procedure_id: ProcedureId,
583        procedure_state: ProcedureState,
584        step: u32,
585        procedure: BoxedProcedure,
586    ) -> Result<Watcher> {
587        ensure!(self.manager_ctx.running(), ManagerNotStartSnafu);
588
589        let meta = Arc::new(ProcedureMeta::new(
590            procedure_id,
591            procedure_state,
592            None,
593            procedure.lock_key(),
594            procedure.poison_keys(),
595            procedure.type_name(),
596        ));
597        let runner = Runner {
598            meta: meta.clone(),
599            procedure,
600            manager_ctx: self.manager_ctx.clone(),
601            step,
602            exponential_builder: ExponentialBuilder::default()
603                .with_min_delay(self.retry_delay)
604                .with_max_times(self.max_retry_times),
605            store: self.procedure_store.clone(),
606            rolling_back: false,
607        };
608
609        let watcher = meta.state_receiver.clone();
610
611        ensure!(
612            self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
613            TooManyRunningProceduresSnafu {
614                max_running_procedures: self.config.max_running_procedures,
615            }
616        );
617
618        // Inserts meta into the manager before actually spawnd the runner.
619        ensure!(
620            self.manager_ctx.try_insert_procedure(meta),
621            DuplicateProcedureSnafu { procedure_id },
622        );
623
624        let tracing_context = TracingContext::from_current_span();
625
626        let _handle = common_runtime::spawn_global(async move {
627            // Run the root procedure.
628            // The task was moved to another runtime for execution.
629            // In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
630            runner
631                .run()
632                .trace(
633                    tracing_context
634                        .attach(tracing::info_span!("LocalManager::submit_root_procedure")),
635                )
636                .await;
637        });
638
639        Ok(watcher)
640    }
641
642    fn submit_recovered_messages(
643        &self,
644        messages: HashMap<ProcedureId, ProcedureMessage>,
645        init_state: InitProcedureState,
646    ) {
647        for (procedure_id, message) in &messages {
648            if message.parent_id.is_none() {
649                // This is the root procedure. We only submit the root procedure as it will
650                // submit sub-procedures to the manager.
651                let Some(mut loaded_procedure) = self
652                    .manager_ctx
653                    .load_one_procedure_from_message(*procedure_id, message)
654                else {
655                    // Try to load other procedures.
656                    continue;
657                };
658
659                info!(
660                    "Recover root procedure {}-{}, step: {}",
661                    loaded_procedure.procedure.type_name(),
662                    procedure_id,
663                    loaded_procedure.step
664                );
665
666                let procedure_state = match init_state {
667                    InitProcedureState::RollingBack => ProcedureState::RollingBack {
668                        error: Arc::new(
669                            error::RollbackProcedureRecoveredSnafu {
670                                error: message.error.clone().unwrap_or("Unknown error".to_string()),
671                            }
672                            .build(),
673                        ),
674                    },
675                    InitProcedureState::Running => ProcedureState::Running,
676                };
677
678                if let Err(e) = loaded_procedure.procedure.recover() {
679                    error!(e; "Failed to recover procedure {}", procedure_id);
680                }
681
682                if let Err(e) = self.submit_root(
683                    *procedure_id,
684                    procedure_state,
685                    loaded_procedure.step,
686                    loaded_procedure.procedure,
687                ) {
688                    error!(e; "Failed to recover procedure {}", procedure_id);
689                }
690            }
691        }
692    }
693
694    /// Recovers unfinished procedures and reruns them.
695    async fn recover(&self) -> Result<()> {
696        info!("LocalManager start to recover");
697        let recover_start = Instant::now();
698
699        let ProcedureMessages {
700            messages,
701            rollback_messages,
702            finished_ids,
703        } = self.procedure_store.load_messages().await?;
704        // Submits recovered messages first.
705        self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack);
706        self.submit_recovered_messages(messages, InitProcedureState::Running);
707
708        if !finished_ids.is_empty() {
709            info!(
710                "LocalManager try to clean finished procedures, num: {}",
711                finished_ids.len()
712            );
713
714            for procedure_id in finished_ids {
715                if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
716                    error!(e; "Failed to delete procedure {}", procedure_id);
717                }
718            }
719        }
720
721        info!(
722            "LocalManager finish recovery, cost: {}ms",
723            recover_start.elapsed().as_millis()
724        );
725
726        Ok(())
727    }
728
729    #[cfg(any(test, feature = "testing"))]
730    /// Returns true if contains a specified loader.
731    pub fn contains_loader(&self, name: &str) -> bool {
732        let loaders = self.manager_ctx.loaders.lock().unwrap();
733        loaders.contains_key(name)
734    }
735
736    async fn check_status(&self) -> Result<()> {
737        if let Some(pause_aware) = self.pause_aware.as_ref() {
738            ensure!(
739                !pause_aware.is_paused().await.context(CheckStatusSnafu)?,
740                ManagerPasuedSnafu
741            );
742        }
743
744        Ok(())
745    }
746}
747
748#[async_trait]
749impl ProcedureManager for LocalManager {
750    fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> {
751        let mut loaders = self.manager_ctx.loaders.lock().unwrap();
752        ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name });
753
754        let _ = loaders.insert(name.to_string(), loader);
755
756        Ok(())
757    }
758
759    async fn start(&self) -> Result<()> {
760        let mut task = self.remove_outdated_meta_task.lock().await;
761
762        if task.is_some() {
763            return Ok(());
764        }
765
766        let task_inner = self.build_remove_outdated_meta_task();
767
768        task_inner
769            .start(common_runtime::global_runtime())
770            .context(StartRemoveOutdatedMetaTaskSnafu)?;
771
772        *task = Some(task_inner);
773
774        self.manager_ctx.start();
775
776        info!("LocalManager is start.");
777
778        self.recover().await
779    }
780
781    async fn stop(&self) -> Result<()> {
782        let mut task = self.remove_outdated_meta_task.lock().await;
783
784        if let Some(task) = task.take() {
785            task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?;
786        }
787
788        self.manager_ctx.stop();
789
790        info!("LocalManager is stopped.");
791
792        Ok(())
793    }
794
795    async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher> {
796        let procedure_id = procedure.id;
797        ensure!(
798            !self.manager_ctx.contains_procedure(procedure_id),
799            DuplicateProcedureSnafu { procedure_id }
800        );
801        self.check_status().await?;
802
803        self.submit_root(
804            procedure.id,
805            ProcedureState::Running,
806            0,
807            procedure.procedure,
808        )
809    }
810
811    async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
812        Ok(self.manager_ctx.state(procedure_id))
813    }
814
815    fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
816        self.manager_ctx.watcher(procedure_id)
817    }
818
819    async fn list_procedures(&self) -> Result<Vec<ProcedureInfo>> {
820        Ok(self.manager_ctx.list_procedure())
821    }
822}
823
824struct RemoveOutdatedMetaFunction {
825    manager_ctx: Arc<ManagerContext>,
826    ttl: Duration,
827}
828
829#[async_trait::async_trait]
830impl TaskFunction<Error> for RemoveOutdatedMetaFunction {
831    fn name(&self) -> &str {
832        "ProcedureManager-remove-outdated-meta-task"
833    }
834
835    async fn call(&mut self) -> Result<()> {
836        self.manager_ctx.remove_outdated_meta(self.ttl);
837        Ok(())
838    }
839}
840
841/// Create a new [ProcedureMeta] for test purpose.
842#[cfg(test)]
843pub(crate) mod test_util {
844    use common_test_util::temp_dir::TempDir;
845    use object_store::services::Fs as Builder;
846    use object_store::ObjectStore;
847
848    use super::*;
849
850    pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
851        ProcedureMeta::new(
852            ProcedureId::random(),
853            ProcedureState::Running,
854            None,
855            LockKey::default(),
856            PoisonKeys::default(),
857            "ProcedureAdapter",
858        )
859    }
860
861    pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
862        let store_dir = dir.path().to_str().unwrap();
863        let builder = Builder::default();
864        ObjectStore::new(builder.root(store_dir)).unwrap().finish()
865    }
866}
867
868#[cfg(test)]
869mod tests {
870    use std::assert_matches::assert_matches;
871
872    use common_error::mock::MockError;
873    use common_error::status_code::StatusCode;
874    use common_test_util::temp_dir::create_temp_dir;
875    use tokio::time::timeout;
876
877    use super::*;
878    use crate::error::{self, Error};
879    use crate::store::state_store::ObjectStateStore;
880    use crate::test_util::InMemoryPoisonStore;
881    use crate::{Context, Procedure, Status};
882
883    fn new_test_manager_context() -> ManagerContext {
884        let poison_manager = Arc::new(InMemoryPoisonStore::default());
885        ManagerContext::new(poison_manager)
886    }
887
888    #[test]
889    fn test_manager_context() {
890        let ctx = new_test_manager_context();
891        let meta = Arc::new(test_util::procedure_meta_for_test());
892
893        assert!(!ctx.contains_procedure(meta.id));
894        assert!(ctx.state(meta.id).is_none());
895
896        assert!(ctx.try_insert_procedure(meta.clone()));
897        assert!(ctx.contains_procedure(meta.id));
898
899        assert!(ctx.state(meta.id).unwrap().is_running());
900        meta.set_state(ProcedureState::Done { output: None });
901        assert!(ctx.state(meta.id).unwrap().is_done());
902    }
903
904    #[test]
905    fn test_manager_context_insert_duplicate() {
906        let ctx = new_test_manager_context();
907        let meta = Arc::new(test_util::procedure_meta_for_test());
908
909        assert!(ctx.try_insert_procedure(meta.clone()));
910        assert!(!ctx.try_insert_procedure(meta));
911    }
912
913    fn new_child(parent_id: ProcedureId, ctx: &ManagerContext) -> ProcedureMetaRef {
914        let mut child = test_util::procedure_meta_for_test();
915        child.parent_id = Some(parent_id);
916        let child = Arc::new(child);
917        assert!(ctx.try_insert_procedure(child.clone()));
918
919        let mut parent = Vec::new();
920        ctx.find_procedures(&[parent_id], &mut parent);
921        parent[0].push_child(child.id);
922
923        child
924    }
925
926    #[test]
927    fn test_procedures_in_tree() {
928        let ctx = new_test_manager_context();
929        let root = Arc::new(test_util::procedure_meta_for_test());
930        assert!(ctx.try_insert_procedure(root.clone()));
931
932        assert_eq!(1, ctx.procedures_in_tree(&root).len());
933
934        let child1 = new_child(root.id, &ctx);
935        let child2 = new_child(root.id, &ctx);
936
937        let child3 = new_child(child1.id, &ctx);
938        let child4 = new_child(child1.id, &ctx);
939
940        let child5 = new_child(child2.id, &ctx);
941
942        let expect = vec![
943            root.id, child1.id, child2.id, child3.id, child4.id, child5.id,
944        ];
945        assert_eq!(expect, ctx.procedures_in_tree(&root));
946    }
947
948    #[derive(Debug)]
949    struct ProcedureToLoad {
950        content: String,
951        lock_key: LockKey,
952        poison_keys: PoisonKeys,
953    }
954
955    #[async_trait]
956    impl Procedure for ProcedureToLoad {
957        fn type_name(&self) -> &str {
958            "ProcedureToLoad"
959        }
960
961        async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
962            Ok(Status::done())
963        }
964
965        fn dump(&self) -> Result<String> {
966            Ok(self.content.clone())
967        }
968
969        fn lock_key(&self) -> LockKey {
970            self.lock_key.clone()
971        }
972
973        fn poison_keys(&self) -> PoisonKeys {
974            self.poison_keys.clone()
975        }
976    }
977
978    impl ProcedureToLoad {
979        fn new(content: &str) -> ProcedureToLoad {
980            ProcedureToLoad {
981                content: content.to_string(),
982                lock_key: LockKey::default(),
983                poison_keys: PoisonKeys::default(),
984            }
985        }
986
987        fn loader() -> BoxedProcedureLoader {
988            let f = |json: &str| {
989                let procedure = ProcedureToLoad::new(json);
990                Ok(Box::new(procedure) as _)
991            };
992            Box::new(f)
993        }
994    }
995
996    #[test]
997    fn test_register_loader() {
998        let dir = create_temp_dir("register");
999        let config = ManagerConfig {
1000            parent_path: "data/".to_string(),
1001            max_retry_times: 3,
1002            retry_delay: Duration::from_millis(500),
1003            ..Default::default()
1004        };
1005        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1006        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1007        let manager = LocalManager::new(config, state_store, poison_manager, None);
1008        manager.manager_ctx.start();
1009
1010        manager
1011            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1012            .unwrap();
1013        // Register duplicate loader.
1014        let err = manager
1015            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1016            .unwrap_err();
1017        assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
1018    }
1019
1020    #[tokio::test]
1021    async fn test_recover() {
1022        let dir = create_temp_dir("recover");
1023        let object_store = test_util::new_object_store(&dir);
1024        let config = ManagerConfig {
1025            parent_path: "data/".to_string(),
1026            max_retry_times: 3,
1027            retry_delay: Duration::from_millis(500),
1028            ..Default::default()
1029        };
1030        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1031        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1032        let manager = LocalManager::new(config, state_store, poison_manager, None);
1033        manager.manager_ctx.start();
1034
1035        manager
1036            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1037            .unwrap();
1038
1039        // Prepare data
1040        let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1041        let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
1042        let root_id = ProcedureId::random();
1043        // Prepare data for the root procedure.
1044        for step in 0..3 {
1045            let type_name = root.type_name().to_string();
1046            let data = root.dump().unwrap();
1047            procedure_store
1048                .store_procedure(root_id, step, type_name, data, None)
1049                .await
1050                .unwrap();
1051        }
1052
1053        let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure"));
1054        let child_id = ProcedureId::random();
1055        // Prepare data for the child procedure
1056        for step in 0..2 {
1057            let type_name = child.type_name().to_string();
1058            let data = child.dump().unwrap();
1059            procedure_store
1060                .store_procedure(child_id, step, type_name, data, Some(root_id))
1061                .await
1062                .unwrap();
1063        }
1064
1065        // Recover the manager
1066        manager.recover().await.unwrap();
1067
1068        // The manager should submit the root procedure.
1069        let _ = manager.procedure_state(root_id).await.unwrap().unwrap();
1070        // Since the mocked root procedure actually doesn't submit subprocedures, so there is no
1071        // related state.
1072        assert!(manager.procedure_state(child_id).await.unwrap().is_none());
1073    }
1074
1075    #[tokio::test]
1076    async fn test_submit_procedure() {
1077        let dir = create_temp_dir("submit");
1078        let config = ManagerConfig {
1079            parent_path: "data/".to_string(),
1080            max_retry_times: 3,
1081            retry_delay: Duration::from_millis(500),
1082            ..Default::default()
1083        };
1084        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1085        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1086        let manager = LocalManager::new(config, state_store, poison_manager, None);
1087        manager.manager_ctx.start();
1088
1089        let procedure_id = ProcedureId::random();
1090        assert!(manager
1091            .procedure_state(procedure_id)
1092            .await
1093            .unwrap()
1094            .is_none());
1095        assert!(manager.procedure_watcher(procedure_id).is_none());
1096
1097        let mut procedure = ProcedureToLoad::new("submit");
1098        procedure.lock_key = LockKey::single_exclusive("test.submit");
1099        assert!(manager
1100            .submit(ProcedureWithId {
1101                id: procedure_id,
1102                procedure: Box::new(procedure),
1103            })
1104            .await
1105            .is_ok());
1106        assert!(manager
1107            .procedure_state(procedure_id)
1108            .await
1109            .unwrap()
1110            .is_some());
1111        // Wait for the procedure done.
1112        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1113        watcher.changed().await.unwrap();
1114        assert!(watcher.borrow().is_done());
1115
1116        // Try to submit procedure with same id again.
1117        let err = manager
1118            .submit(ProcedureWithId {
1119                id: procedure_id,
1120                procedure: Box::new(ProcedureToLoad::new("submit")),
1121            })
1122            .await
1123            .unwrap_err();
1124        assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}");
1125    }
1126
1127    #[tokio::test]
1128    async fn test_state_changed_on_err() {
1129        let dir = create_temp_dir("on_err");
1130        let config = ManagerConfig {
1131            parent_path: "data/".to_string(),
1132            max_retry_times: 3,
1133            retry_delay: Duration::from_millis(500),
1134            ..Default::default()
1135        };
1136        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1137        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1138        let manager = LocalManager::new(config, state_store, poison_manager, None);
1139        manager.manager_ctx.start();
1140
1141        #[derive(Debug)]
1142        struct MockProcedure {
1143            panic: bool,
1144        }
1145
1146        #[async_trait]
1147        impl Procedure for MockProcedure {
1148            fn type_name(&self) -> &str {
1149                "MockProcedure"
1150            }
1151
1152            async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1153                if self.panic {
1154                    // Test the runner can set the state to failed even the procedure
1155                    // panics.
1156                    panic!();
1157                } else {
1158                    Err(Error::external(MockError::new(StatusCode::Unexpected)))
1159                }
1160            }
1161
1162            async fn rollback(&mut self, _: &Context) -> Result<()> {
1163                Ok(())
1164            }
1165
1166            fn rollback_supported(&self) -> bool {
1167                true
1168            }
1169
1170            fn dump(&self) -> Result<String> {
1171                Ok(String::new())
1172            }
1173
1174            fn lock_key(&self) -> LockKey {
1175                LockKey::single_exclusive("test.submit")
1176            }
1177
1178            fn poison_keys(&self) -> PoisonKeys {
1179                PoisonKeys::default()
1180            }
1181        }
1182
1183        let check_procedure = |procedure| async {
1184            let procedure_id = ProcedureId::random();
1185            manager
1186                .submit(ProcedureWithId {
1187                    id: procedure_id,
1188                    procedure: Box::new(procedure),
1189                })
1190                .await
1191                .unwrap()
1192        };
1193
1194        let mut watcher = check_procedure(MockProcedure { panic: false }).await;
1195        // Wait for the notification.
1196        watcher.changed().await.unwrap();
1197        assert!(watcher.borrow().is_prepare_rollback());
1198        watcher.changed().await.unwrap();
1199        assert!(watcher.borrow().is_rolling_back());
1200        watcher.changed().await.unwrap();
1201        assert!(watcher.borrow().is_failed());
1202        // The runner won't rollback a panicked procedure.
1203        let mut watcher = check_procedure(MockProcedure { panic: true }).await;
1204        watcher.changed().await.unwrap();
1205        assert!(watcher.borrow().is_failed());
1206    }
1207
1208    #[tokio::test]
1209    async fn test_procedure_manager_stopped() {
1210        let dir = create_temp_dir("procedure_manager_stopped");
1211        let config = ManagerConfig {
1212            parent_path: "data/".to_string(),
1213            max_retry_times: 3,
1214            retry_delay: Duration::from_millis(500),
1215            ..Default::default()
1216        };
1217        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1218        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1219        let manager = LocalManager::new(config, state_store, poison_manager, None);
1220
1221        let mut procedure = ProcedureToLoad::new("submit");
1222        procedure.lock_key = LockKey::single_exclusive("test.submit");
1223        let procedure_id = ProcedureId::random();
1224        assert_matches!(
1225            manager
1226                .submit(ProcedureWithId {
1227                    id: procedure_id,
1228                    procedure: Box::new(procedure),
1229                })
1230                .await
1231                .unwrap_err(),
1232            error::Error::ManagerNotStart { .. }
1233        );
1234    }
1235
1236    #[tokio::test]
1237    async fn test_procedure_manager_restart() {
1238        let dir = create_temp_dir("procedure_manager_restart");
1239        let config = ManagerConfig {
1240            parent_path: "data/".to_string(),
1241            max_retry_times: 3,
1242            retry_delay: Duration::from_millis(500),
1243            ..Default::default()
1244        };
1245        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1246        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1247        let manager = LocalManager::new(config, state_store, poison_manager, None);
1248
1249        manager.start().await.unwrap();
1250        manager.stop().await.unwrap();
1251        manager.start().await.unwrap();
1252
1253        let mut procedure = ProcedureToLoad::new("submit");
1254        procedure.lock_key = LockKey::single_exclusive("test.submit");
1255        let procedure_id = ProcedureId::random();
1256        assert!(manager
1257            .submit(ProcedureWithId {
1258                id: procedure_id,
1259                procedure: Box::new(procedure),
1260            })
1261            .await
1262            .is_ok());
1263        assert!(manager
1264            .procedure_state(procedure_id)
1265            .await
1266            .unwrap()
1267            .is_some());
1268    }
1269
1270    #[tokio::test(flavor = "multi_thread")]
1271    async fn test_remove_outdated_meta_task() {
1272        let dir = create_temp_dir("remove_outdated_meta_task");
1273        let object_store = test_util::new_object_store(&dir);
1274        let config = ManagerConfig {
1275            parent_path: "data/".to_string(),
1276            max_retry_times: 3,
1277            retry_delay: Duration::from_millis(500),
1278            remove_outdated_meta_task_interval: Duration::from_millis(1),
1279            remove_outdated_meta_ttl: Duration::from_millis(1),
1280            max_running_procedures: 128,
1281        };
1282        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1283        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1284        let manager = LocalManager::new(config, state_store, poison_manager, None);
1285        manager.manager_ctx.set_running();
1286
1287        let mut procedure = ProcedureToLoad::new("submit");
1288        procedure.lock_key = LockKey::single_exclusive("test.submit");
1289        let procedure_id = ProcedureId::random();
1290        assert!(manager
1291            .submit(ProcedureWithId {
1292                id: procedure_id,
1293                procedure: Box::new(procedure),
1294            })
1295            .await
1296            .is_ok());
1297        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1298        watcher.changed().await.unwrap();
1299
1300        manager.start().await.unwrap();
1301        tokio::time::sleep(Duration::from_millis(300)).await;
1302        assert!(manager
1303            .procedure_state(procedure_id)
1304            .await
1305            .unwrap()
1306            .is_none());
1307
1308        // The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed.
1309        manager.stop().await.unwrap();
1310        let mut procedure = ProcedureToLoad::new("submit");
1311        procedure.lock_key = LockKey::single_exclusive("test.submit");
1312        let procedure_id = ProcedureId::random();
1313
1314        manager.manager_ctx.set_running();
1315        assert!(manager
1316            .submit(ProcedureWithId {
1317                id: procedure_id,
1318                procedure: Box::new(procedure),
1319            })
1320            .await
1321            .is_ok());
1322        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1323        watcher.changed().await.unwrap();
1324        tokio::time::sleep(Duration::from_millis(300)).await;
1325        assert!(manager
1326            .procedure_state(procedure_id)
1327            .await
1328            .unwrap()
1329            .is_some());
1330
1331        // After restart
1332        let mut procedure = ProcedureToLoad::new("submit");
1333        procedure.lock_key = LockKey::single_exclusive("test.submit");
1334        let procedure_id = ProcedureId::random();
1335        assert!(manager
1336            .submit(ProcedureWithId {
1337                id: procedure_id,
1338                procedure: Box::new(procedure),
1339            })
1340            .await
1341            .is_ok());
1342        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1343        watcher.changed().await.unwrap();
1344
1345        manager.start().await.unwrap();
1346        tokio::time::sleep(Duration::from_millis(300)).await;
1347        assert!(manager
1348            .procedure_state(procedure_id)
1349            .await
1350            .unwrap()
1351            .is_none());
1352    }
1353
1354    #[tokio::test]
1355    async fn test_too_many_running_procedures() {
1356        let dir = create_temp_dir("too_many_running_procedures");
1357        let config = ManagerConfig {
1358            parent_path: "data/".to_string(),
1359            max_retry_times: 3,
1360            retry_delay: Duration::from_millis(500),
1361            max_running_procedures: 1,
1362            ..Default::default()
1363        };
1364        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1365        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1366        let manager = LocalManager::new(config, state_store, poison_manager, None);
1367        manager.manager_ctx.set_running();
1368
1369        manager
1370            .manager_ctx
1371            .running_procedures
1372            .lock()
1373            .unwrap()
1374            .insert(ProcedureId::random());
1375        manager.start().await.unwrap();
1376
1377        // Submit a new procedure should fail.
1378        let mut procedure = ProcedureToLoad::new("submit");
1379        procedure.lock_key = LockKey::single_exclusive("test.submit");
1380        let procedure_id = ProcedureId::random();
1381        let err = manager
1382            .submit(ProcedureWithId {
1383                id: procedure_id,
1384                procedure: Box::new(procedure),
1385            })
1386            .await
1387            .unwrap_err();
1388        assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
1389
1390        manager
1391            .manager_ctx
1392            .running_procedures
1393            .lock()
1394            .unwrap()
1395            .clear();
1396
1397        // Submit a new procedure should succeed.
1398        let mut procedure = ProcedureToLoad::new("submit");
1399        procedure.lock_key = LockKey::single_exclusive("test.submit");
1400        assert!(manager
1401            .submit(ProcedureWithId {
1402                id: procedure_id,
1403                procedure: Box::new(procedure),
1404            })
1405            .await
1406            .is_ok());
1407        assert!(manager
1408            .procedure_state(procedure_id)
1409            .await
1410            .unwrap()
1411            .is_some());
1412        // Wait for the procedure done.
1413        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1414        watcher.changed().await.unwrap();
1415        assert!(watcher.borrow().is_done());
1416    }
1417
1418    #[derive(Debug)]
1419    struct ProcedureToRecover {
1420        content: String,
1421        lock_key: LockKey,
1422        notify: Option<Arc<Notify>>,
1423        poison_keys: PoisonKeys,
1424    }
1425
1426    #[async_trait]
1427    impl Procedure for ProcedureToRecover {
1428        fn type_name(&self) -> &str {
1429            "ProcedureToRecover"
1430        }
1431
1432        async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1433            Ok(Status::done())
1434        }
1435
1436        fn dump(&self) -> Result<String> {
1437            Ok(self.content.clone())
1438        }
1439
1440        fn lock_key(&self) -> LockKey {
1441            self.lock_key.clone()
1442        }
1443
1444        fn recover(&mut self) -> Result<()> {
1445            self.notify.as_ref().unwrap().notify_one();
1446            Ok(())
1447        }
1448
1449        fn poison_keys(&self) -> PoisonKeys {
1450            self.poison_keys.clone()
1451        }
1452    }
1453
1454    impl ProcedureToRecover {
1455        fn new(content: &str) -> ProcedureToRecover {
1456            ProcedureToRecover {
1457                content: content.to_string(),
1458                lock_key: LockKey::default(),
1459                poison_keys: PoisonKeys::default(),
1460                notify: None,
1461            }
1462        }
1463
1464        fn loader(notify: Arc<Notify>) -> BoxedProcedureLoader {
1465            let f = move |json: &str| {
1466                let procedure = ProcedureToRecover {
1467                    content: json.to_string(),
1468                    lock_key: LockKey::default(),
1469                    poison_keys: PoisonKeys::default(),
1470                    notify: Some(notify.clone()),
1471                };
1472                Ok(Box::new(procedure) as _)
1473            };
1474            Box::new(f)
1475        }
1476    }
1477
1478    #[tokio::test]
1479    async fn test_procedure_recover() {
1480        common_telemetry::init_default_ut_logging();
1481        let dir = create_temp_dir("procedure_recover");
1482        let object_store = test_util::new_object_store(&dir);
1483        let config = ManagerConfig {
1484            parent_path: "data/".to_string(),
1485            max_retry_times: 3,
1486            retry_delay: Duration::from_millis(500),
1487            ..Default::default()
1488        };
1489        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1490        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1491        let manager = LocalManager::new(config, state_store, poison_manager, None);
1492        manager.manager_ctx.start();
1493
1494        let notify = Arc::new(Notify::new());
1495        manager
1496            .register_loader(
1497                "ProcedureToRecover",
1498                ProcedureToRecover::loader(notify.clone()),
1499            )
1500            .unwrap();
1501
1502        // Prepare data
1503        let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1504        let root: BoxedProcedure = Box::new(ProcedureToRecover::new("test procedure recovery"));
1505        let root_id = ProcedureId::random();
1506        // Prepare data for the root procedure.
1507        for step in 0..3 {
1508            let type_name = root.type_name().to_string();
1509            let data = root.dump().unwrap();
1510            procedure_store
1511                .store_procedure(root_id, step, type_name, data, None)
1512                .await
1513                .unwrap();
1514        }
1515
1516        // Recover the manager
1517        manager.recover().await.unwrap();
1518        timeout(Duration::from_secs(10), notify.notified())
1519            .await
1520            .unwrap();
1521    }
1522}