common_procedure/
local.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod runner;
16
17use std::collections::hash_map::Entry;
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20use std::sync::{Arc, Mutex, RwLock};
21use std::time::{Duration, Instant};
22
23use async_trait::async_trait;
24use backon::ExponentialBuilder;
25use common_error::ext::BoxedError;
26use common_event_recorder::EventRecorderRef;
27use common_runtime::{RepeatedTask, TaskFunction};
28use common_telemetry::tracing_context::{FutureExt, TracingContext};
29use common_telemetry::{error, info, tracing};
30use snafu::{ensure, OptionExt, ResultExt};
31use tokio::sync::watch::{self, Receiver, Sender};
32use tokio::sync::{Mutex as TokioMutex, Notify};
33
34use crate::error::{
35    self, CheckStatusSnafu, DuplicateProcedureSnafu, Error, LoaderConflictSnafu,
36    ManagerNotStartSnafu, ManagerPasuedSnafu, PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu,
37    Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
38    TooManyRunningProceduresSnafu,
39};
40use crate::event::ProcedureEvent;
41use crate::local::runner::Runner;
42use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
43use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
44use crate::store::poison_store::PoisonStoreRef;
45use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
46use crate::{
47    BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
48    ProcedureState, ProcedureWithId, StringKey, UserMetadata, Watcher,
49};
50
51/// The expired time of a procedure's metadata.
52const META_TTL: Duration = Duration::from_secs(60 * 10);
53
54/// Shared metadata of a procedure.
55///
56/// # Note
57/// [Notify] is not a condition variable, we can't guarantee the waiters are notified
58/// if they didn't call `notified()` before we signal the notify. So we
59/// 1. use dedicated notify for each condition, such as waiting for a lock, waiting
60///    for children;
61/// 2. always use `notify_one` and ensure there are only one waiter.
62#[derive(Debug)]
63pub(crate) struct ProcedureMeta {
64    /// Id of this procedure.
65    id: ProcedureId,
66    /// Type name of this procedure.
67    type_name: String,
68    /// Parent procedure id.
69    parent_id: Option<ProcedureId>,
70    /// Notify to wait for subprocedures.
71    child_notify: Notify,
72    /// Lock required by this procedure.
73    lock_key: LockKey,
74    /// Poison keys that may cause this procedure to become poisoned during execution.
75    poison_keys: PoisonKeys,
76    /// Sender to notify the procedure state.
77    state_sender: Sender<ProcedureState>,
78    /// Receiver to watch the procedure state.
79    state_receiver: Receiver<ProcedureState>,
80    /// Id of child procedures.
81    children: Mutex<Vec<ProcedureId>>,
82    /// Start execution time of this procedure.
83    start_time_ms: AtomicI64,
84    /// End execution time of this procedure.
85    end_time_ms: AtomicI64,
86    /// Event recorder.
87    event_recorder: Option<EventRecorderRef>,
88    /// The user metadata of the procedure. It's generated by [Procedure::user_metadata].
89    user_metadata: Option<UserMetadata>,
90}
91
92impl ProcedureMeta {
93    #[allow(clippy::too_many_arguments)]
94    fn new(
95        id: ProcedureId,
96        procedure_state: ProcedureState,
97        parent_id: Option<ProcedureId>,
98        lock_key: LockKey,
99        poison_keys: PoisonKeys,
100        type_name: &str,
101        event_recorder: Option<EventRecorderRef>,
102        user_metadata: Option<UserMetadata>,
103    ) -> ProcedureMeta {
104        let (state_sender, state_receiver) = watch::channel(procedure_state);
105        ProcedureMeta {
106            id,
107            parent_id,
108            child_notify: Notify::new(),
109            lock_key,
110            poison_keys,
111            state_sender,
112            state_receiver,
113            children: Mutex::new(Vec::new()),
114            start_time_ms: AtomicI64::new(0),
115            end_time_ms: AtomicI64::new(0),
116            type_name: type_name.to_string(),
117            event_recorder,
118            user_metadata,
119        }
120    }
121
122    /// Returns current [ProcedureState].
123    fn state(&self) -> ProcedureState {
124        self.state_receiver.borrow().clone()
125    }
126
127    /// Update current [ProcedureState].
128    fn set_state(&self, state: ProcedureState) {
129        // Emit the event to the event recorder if the user metadata contains the eventable object.
130        if let (Some(event_recorder), Some(user_metadata)) =
131            (&self.event_recorder, &self.user_metadata)
132        {
133            if let Some(event) = user_metadata.to_event() {
134                event_recorder.record(Box::new(ProcedureEvent::new(self.id, event, state.clone())));
135            }
136        }
137
138        // Safety: ProcedureMeta also holds the receiver, so `send()` should never fail.
139        self.state_sender.send(state).unwrap();
140    }
141
142    /// Push `procedure_id` of the subprocedure to the metadata.
143    fn push_child(&self, procedure_id: ProcedureId) {
144        let mut children = self.children.lock().unwrap();
145        children.push(procedure_id);
146    }
147
148    /// Append subprocedures to given `buffer`.
149    fn list_children(&self, buffer: &mut Vec<ProcedureId>) {
150        let children = self.children.lock().unwrap();
151        buffer.extend_from_slice(&children);
152    }
153
154    /// Returns the number of subprocedures.
155    fn num_children(&self) -> usize {
156        self.children.lock().unwrap().len()
157    }
158
159    /// update the start time of the procedure.
160    fn set_start_time_ms(&self) {
161        self.start_time_ms
162            .store(common_time::util::current_time_millis(), Ordering::Relaxed);
163    }
164
165    /// update the end time of the procedure.
166    fn set_end_time_ms(&self) {
167        self.end_time_ms
168            .store(common_time::util::current_time_millis(), Ordering::Relaxed);
169    }
170}
171
172/// Reference counted pointer to [ProcedureMeta].
173type ProcedureMetaRef = Arc<ProcedureMeta>;
174
175/// Procedure loaded from store.
176struct LoadedProcedure {
177    procedure: BoxedProcedure,
178    step: u32,
179}
180
181/// The dynamic lock for procedure execution.
182///
183/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
184/// during execution. They are only held when the procedure specifically needs these keys
185/// and are released as soon as the procedure no longer needs them.
186/// This allows for more fine-grained concurrency control during procedure execution.
187pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
188
189/// Acquires a dynamic key lock for the given key.
190///
191/// This function takes a reference to the dynamic key lock and a pointer to the key.
192/// It then matches the key type and acquires the appropriate lock.
193pub async fn acquire_dynamic_key_lock(
194    lock: &DynamicKeyLock,
195    key: &StringKey,
196) -> DynamicKeyLockGuard {
197    match key {
198        StringKey::Share(key) => {
199            let guard = lock.read(key.to_string()).await;
200            DynamicKeyLockGuard {
201                guard: Some(OwnedKeyRwLockGuard::from(guard)),
202                key: key.to_string(),
203                lock: lock.clone(),
204            }
205        }
206        StringKey::Exclusive(key) => {
207            let guard = lock.write(key.to_string()).await;
208            DynamicKeyLockGuard {
209                guard: Some(OwnedKeyRwLockGuard::from(guard)),
210                key: key.to_string(),
211                lock: lock.clone(),
212            }
213        }
214    }
215}
216/// A guard for the dynamic key lock.
217///
218/// This guard is used to release the lock when the procedure no longer needs it.
219/// It also ensures that the lock is cleaned up when the guard is dropped.
220pub struct DynamicKeyLockGuard {
221    guard: Option<OwnedKeyRwLockGuard>,
222    key: String,
223    lock: DynamicKeyLock,
224}
225
226impl Drop for DynamicKeyLockGuard {
227    fn drop(&mut self) {
228        if let Some(guard) = self.guard.take() {
229            drop(guard);
230        }
231        self.lock.clean_keys(&[self.key.to_string()]);
232    }
233}
234
235/// Shared context of the manager.
236pub(crate) struct ManagerContext {
237    /// Procedure loaders. The key is the type name of the procedure which the loader returns.
238    loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
239    /// The key lock for the procedure.
240    ///
241    /// The lock keys are defined in `Procedure::lock_key()`.
242    /// These locks are acquired before the procedure starts and released after the procedure finishes.
243    /// They ensure exclusive access to resources throughout the entire procedure lifecycle.
244    key_lock: KeyRwLock<String>,
245    /// The dynamic lock for procedure execution.
246    ///
247    /// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
248    /// during execution. They are only held when the procedure specifically needs these keys
249    /// and are released as soon as the procedure no longer needs them.
250    /// This allows for more fine-grained concurrency control during procedure execution.
251    dynamic_key_lock: DynamicKeyLock,
252    /// Procedures in the manager.
253    procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
254    /// Running procedures.
255    running_procedures: Mutex<HashSet<ProcedureId>>,
256    /// Ids and finished time of finished procedures.
257    finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
258    /// Running flag.
259    running: Arc<AtomicBool>,
260    /// Poison manager.
261    poison_manager: PoisonStoreRef,
262}
263
264#[async_trait]
265impl ContextProvider for ManagerContext {
266    async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
267        Ok(self.state(procedure_id))
268    }
269
270    async fn procedure_state_receiver(
271        &self,
272        procedure_id: ProcedureId,
273    ) -> Result<Option<Receiver<ProcedureState>>> {
274        Ok(self.state_receiver(procedure_id))
275    }
276
277    async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
278        {
279            // validate the procedure exists
280            let procedures = self.procedures.read().unwrap();
281            let procedure = procedures
282                .get(&procedure_id)
283                .context(ProcedureNotFoundSnafu { procedure_id })?;
284
285            // validate the poison key is defined
286            ensure!(
287                procedure.poison_keys.contains(key),
288                PoisonKeyNotDefinedSnafu {
289                    key: key.clone(),
290                    procedure_id
291                }
292            );
293        }
294        let key = key.to_string();
295        let procedure_id = procedure_id.to_string();
296        self.poison_manager.try_put_poison(key, procedure_id).await
297    }
298
299    async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
300        acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
301    }
302}
303
304impl ManagerContext {
305    /// Returns a new [ManagerContext].
306    fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
307        ManagerContext {
308            key_lock: KeyRwLock::new(),
309            dynamic_key_lock: Arc::new(KeyRwLock::new()),
310            loaders: Mutex::new(HashMap::new()),
311            procedures: RwLock::new(HashMap::new()),
312            running_procedures: Mutex::new(HashSet::new()),
313            finished_procedures: Mutex::new(VecDeque::new()),
314            running: Arc::new(AtomicBool::new(false)),
315            poison_manager,
316        }
317    }
318
319    #[cfg(test)]
320    pub(crate) fn set_running(&self) {
321        self.running.store(true, Ordering::Relaxed);
322    }
323
324    /// Set the running flag.
325    pub(crate) fn start(&self) {
326        self.running.store(true, Ordering::Relaxed);
327    }
328
329    pub(crate) fn stop(&self) {
330        self.running.store(false, Ordering::Relaxed);
331    }
332
333    /// Return `ProcedureManager` is running.
334    pub(crate) fn running(&self) -> bool {
335        self.running.load(Ordering::Relaxed)
336    }
337
338    /// Returns true if the procedure with specific `procedure_id` exists.
339    fn contains_procedure(&self, procedure_id: ProcedureId) -> bool {
340        let procedures = self.procedures.read().unwrap();
341        procedures.contains_key(&procedure_id)
342    }
343
344    /// Returns the number of running procedures.
345    fn num_running_procedures(&self) -> usize {
346        self.running_procedures.lock().unwrap().len()
347    }
348
349    /// Try to insert the `procedure` to the context if there is no procedure
350    /// with same [ProcedureId].
351    ///
352    /// Returns `false` if there is already a procedure using the same [ProcedureId].
353    fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
354        let procedure_id = meta.id;
355        let mut procedures = self.procedures.write().unwrap();
356        match procedures.entry(procedure_id) {
357            Entry::Occupied(_) => return false,
358            Entry::Vacant(vacant_entry) => {
359                vacant_entry.insert(meta);
360            }
361        }
362
363        let mut running_procedures = self.running_procedures.lock().unwrap();
364        running_procedures.insert(procedure_id);
365
366        true
367    }
368
369    /// Returns the [ProcedureState] of specific `procedure_id`.
370    fn state(&self, procedure_id: ProcedureId) -> Option<ProcedureState> {
371        let procedures = self.procedures.read().unwrap();
372        procedures.get(&procedure_id).map(|meta| meta.state())
373    }
374
375    /// Returns the [Receiver<ProcedureState>] of specific `procedure_id`.
376    fn state_receiver(&self, procedure_id: ProcedureId) -> Option<Receiver<ProcedureState>> {
377        let procedures = self.procedures.read().unwrap();
378        procedures
379            .get(&procedure_id)
380            .map(|meta| meta.state_receiver.clone())
381    }
382
383    /// Returns the [ProcedureMeta] of all procedures.
384    fn list_procedure(&self) -> Vec<ProcedureInfo> {
385        let procedures = self.procedures.read().unwrap();
386        procedures
387            .values()
388            .map(|meta| ProcedureInfo {
389                id: meta.id,
390                type_name: meta.type_name.clone(),
391                start_time_ms: meta.start_time_ms.load(Ordering::Relaxed),
392                end_time_ms: meta.end_time_ms.load(Ordering::Relaxed),
393                state: meta.state(),
394                lock_keys: meta.lock_key.get_keys(),
395            })
396            .collect()
397    }
398
399    /// Returns the [Watcher] of specific `procedure_id`.
400    fn watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
401        let procedures = self.procedures.read().unwrap();
402        procedures
403            .get(&procedure_id)
404            .map(|meta| meta.state_receiver.clone())
405    }
406
407    /// Notify a suspended parent procedure with specific `procedure_id` by its subprocedure.
408    fn notify_by_subprocedure(&self, procedure_id: ProcedureId) {
409        let procedures = self.procedures.read().unwrap();
410        if let Some(meta) = procedures.get(&procedure_id) {
411            meta.child_notify.notify_one();
412        }
413    }
414
415    /// Load procedure from specific [ProcedureMessage].
416    fn load_one_procedure_from_message(
417        &self,
418        procedure_id: ProcedureId,
419        message: &ProcedureMessage,
420    ) -> Option<LoadedProcedure> {
421        let loaders = self.loaders.lock().unwrap();
422        let loader = loaders.get(&message.type_name).or_else(|| {
423            error!(
424                "Loader not found, procedure_id: {}, type_name: {}",
425                procedure_id, message.type_name
426            );
427            None
428        })?;
429
430        let procedure = loader(&message.data)
431            .map_err(|e| {
432                error!(
433                    "Failed to load procedure data, key: {}, source: {:?}",
434                    procedure_id, e
435                );
436                e
437            })
438            .ok()?;
439
440        Some(LoadedProcedure {
441            procedure,
442            step: message.step,
443        })
444    }
445
446    /// Returns all procedures in the tree (including given `root` procedure).
447    ///
448    /// If callers need a consistent view of the tree, they must ensure no new
449    /// procedure is added to the tree during using this method.
450    fn procedures_in_tree(&self, root: &ProcedureMetaRef) -> Vec<ProcedureId> {
451        let sub_num = root.num_children();
452        // Reserve capacity for the root procedure and its children.
453        let mut procedures = Vec::with_capacity(1 + sub_num);
454
455        let mut queue = VecDeque::with_capacity(1 + sub_num);
456        // Push the root procedure to the queue.
457        queue.push_back(root.clone());
458
459        let mut children_ids = Vec::with_capacity(sub_num);
460        let mut children = Vec::with_capacity(sub_num);
461        while let Some(meta) = queue.pop_front() {
462            procedures.push(meta.id);
463
464            // Find metadatas of children.
465            children_ids.clear();
466            meta.list_children(&mut children_ids);
467            self.find_procedures(&children_ids, &mut children);
468
469            // Traverse children later.
470            for child in children.drain(..) {
471                queue.push_back(child);
472            }
473        }
474
475        procedures
476    }
477
478    /// Finds procedures by given `procedure_ids`.
479    ///
480    /// Ignores the id if corresponding procedure is not found.
481    fn find_procedures(&self, procedure_ids: &[ProcedureId], metas: &mut Vec<ProcedureMetaRef>) {
482        let procedures = self.procedures.read().unwrap();
483        for procedure_id in procedure_ids {
484            if let Some(meta) = procedures.get(procedure_id) {
485                metas.push(meta.clone());
486            }
487        }
488    }
489
490    /// Clean resources of finished procedures.
491    fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
492        // Since users need to query the procedure state, so we can't remove the
493        // meta of the procedure directly.
494        let now = Instant::now();
495        let mut finished_procedures = self.finished_procedures.lock().unwrap();
496        finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
497
498        // Remove the procedures from the running set.
499        let mut running_procedures = self.running_procedures.lock().unwrap();
500        for procedure_id in procedure_ids {
501            running_procedures.remove(procedure_id);
502        }
503    }
504
505    /// Remove metadata of outdated procedures.
506    fn remove_outdated_meta(&self, ttl: Duration) {
507        let ids = {
508            let mut finished_procedures = self.finished_procedures.lock().unwrap();
509            if finished_procedures.is_empty() {
510                return;
511            }
512
513            let mut ids_to_remove = Vec::new();
514            while let Some((id, finish_time)) = finished_procedures.front() {
515                if finish_time.elapsed() > ttl {
516                    ids_to_remove.push(*id);
517                    let _ = finished_procedures.pop_front();
518                } else {
519                    // The rest procedures are finished later, so we can break
520                    // the loop.
521                    break;
522                }
523            }
524            ids_to_remove
525        };
526
527        if ids.is_empty() {
528            return;
529        }
530
531        let mut procedures = self.procedures.write().unwrap();
532        for id in ids {
533            let _ = procedures.remove(&id);
534        }
535    }
536}
537
538/// Config for [LocalManager].
539#[derive(Debug)]
540pub struct ManagerConfig {
541    pub parent_path: String,
542    pub max_retry_times: usize,
543    pub retry_delay: Duration,
544    pub remove_outdated_meta_task_interval: Duration,
545    pub remove_outdated_meta_ttl: Duration,
546    pub max_running_procedures: usize,
547}
548
549impl Default for ManagerConfig {
550    fn default() -> Self {
551        Self {
552            parent_path: String::default(),
553            max_retry_times: 3,
554            retry_delay: Duration::from_millis(500),
555            remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
556            remove_outdated_meta_ttl: META_TTL,
557            max_running_procedures: 128,
558        }
559    }
560}
561
562type PauseAwareRef = Arc<dyn PauseAware>;
563
564#[async_trait]
565pub trait PauseAware: Send + Sync {
566    /// Returns true if the procedure manager is paused.
567    async fn is_paused(&self) -> std::result::Result<bool, BoxedError>;
568}
569
570/// A [ProcedureManager] that maintains procedure states locally.
571pub struct LocalManager {
572    manager_ctx: Arc<ManagerContext>,
573    procedure_store: Arc<ProcedureStore>,
574    max_retry_times: usize,
575    retry_delay: Duration,
576    /// GC task.
577    remove_outdated_meta_task: TokioMutex<Option<RepeatedTask<Error>>>,
578    config: ManagerConfig,
579    pause_aware: Option<PauseAwareRef>,
580    event_recorder: Option<EventRecorderRef>,
581}
582
583impl LocalManager {
584    /// Create a new [LocalManager] with specific `config`.
585    pub fn new(
586        config: ManagerConfig,
587        state_store: StateStoreRef,
588        poison_store: PoisonStoreRef,
589        pause_aware: Option<PauseAwareRef>,
590        event_recorder: Option<EventRecorderRef>,
591    ) -> LocalManager {
592        let manager_ctx = Arc::new(ManagerContext::new(poison_store));
593
594        LocalManager {
595            manager_ctx,
596            procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)),
597            max_retry_times: config.max_retry_times,
598            retry_delay: config.retry_delay,
599            remove_outdated_meta_task: TokioMutex::new(None),
600            config,
601            pause_aware,
602            event_recorder,
603        }
604    }
605
606    /// Build remove outedated meta task
607    pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask<Error> {
608        RepeatedTask::new(
609            self.config.remove_outdated_meta_task_interval,
610            Box::new(RemoveOutdatedMetaFunction {
611                manager_ctx: self.manager_ctx.clone(),
612                ttl: self.config.remove_outdated_meta_ttl,
613            }),
614        )
615    }
616
617    /// Submit a root procedure with given `procedure_id`.
618    fn submit_root(
619        &self,
620        procedure_id: ProcedureId,
621        procedure_state: ProcedureState,
622        step: u32,
623        procedure: BoxedProcedure,
624    ) -> Result<Watcher> {
625        ensure!(self.manager_ctx.running(), ManagerNotStartSnafu);
626
627        let user_metadata = procedure.user_metadata();
628        let meta = Arc::new(ProcedureMeta::new(
629            procedure_id,
630            procedure_state,
631            None,
632            procedure.lock_key(),
633            procedure.poison_keys(),
634            procedure.type_name(),
635            self.event_recorder.clone(),
636            user_metadata.clone(),
637        ));
638        let runner = Runner {
639            meta: meta.clone(),
640            procedure,
641            manager_ctx: self.manager_ctx.clone(),
642            step,
643            exponential_builder: ExponentialBuilder::default()
644                .with_min_delay(self.retry_delay)
645                .with_max_times(self.max_retry_times),
646            store: self.procedure_store.clone(),
647            rolling_back: false,
648            event_recorder: self.event_recorder.clone(),
649        };
650
651        if let (Some(event_recorder), Some(event)) = (
652            self.event_recorder.as_ref(),
653            user_metadata.and_then(|m| m.to_event()),
654        ) {
655            event_recorder.record(Box::new(ProcedureEvent::new(
656                procedure_id,
657                event,
658                ProcedureState::Running,
659            )));
660        }
661
662        let watcher = meta.state_receiver.clone();
663
664        ensure!(
665            self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
666            TooManyRunningProceduresSnafu {
667                max_running_procedures: self.config.max_running_procedures,
668            }
669        );
670
671        // Inserts meta into the manager before actually spawnd the runner.
672        ensure!(
673            self.manager_ctx.try_insert_procedure(meta),
674            DuplicateProcedureSnafu { procedure_id },
675        );
676
677        let tracing_context = TracingContext::from_current_span();
678
679        let _handle = common_runtime::spawn_global(async move {
680            // Run the root procedure.
681            // The task was moved to another runtime for execution.
682            // In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
683            runner
684                .run()
685                .trace(
686                    tracing_context
687                        .attach(tracing::info_span!("LocalManager::submit_root_procedure")),
688                )
689                .await;
690        });
691
692        Ok(watcher)
693    }
694
695    fn submit_recovered_messages(
696        &self,
697        messages: HashMap<ProcedureId, ProcedureMessage>,
698        init_state: InitProcedureState,
699    ) {
700        for (procedure_id, message) in &messages {
701            if message.parent_id.is_none() {
702                // This is the root procedure. We only submit the root procedure as it will
703                // submit sub-procedures to the manager.
704                let Some(mut loaded_procedure) = self
705                    .manager_ctx
706                    .load_one_procedure_from_message(*procedure_id, message)
707                else {
708                    // Try to load other procedures.
709                    continue;
710                };
711
712                info!(
713                    "Recover root procedure {}-{}, step: {}",
714                    loaded_procedure.procedure.type_name(),
715                    procedure_id,
716                    loaded_procedure.step
717                );
718
719                let procedure_state = match init_state {
720                    InitProcedureState::RollingBack => ProcedureState::RollingBack {
721                        error: Arc::new(
722                            error::RollbackProcedureRecoveredSnafu {
723                                error: message.error.clone().unwrap_or("Unknown error".to_string()),
724                            }
725                            .build(),
726                        ),
727                    },
728                    InitProcedureState::Running => ProcedureState::Running,
729                };
730
731                if let Err(e) = loaded_procedure.procedure.recover() {
732                    error!(e; "Failed to recover procedure {}", procedure_id);
733                }
734
735                if let Err(e) = self.submit_root(
736                    *procedure_id,
737                    procedure_state,
738                    loaded_procedure.step,
739                    loaded_procedure.procedure,
740                ) {
741                    error!(e; "Failed to recover procedure {}", procedure_id);
742                }
743            }
744        }
745    }
746
747    /// Recovers unfinished procedures and reruns them.
748    async fn recover(&self) -> Result<()> {
749        info!("LocalManager start to recover");
750        let recover_start = Instant::now();
751
752        let ProcedureMessages {
753            messages,
754            rollback_messages,
755            finished_ids,
756        } = self.procedure_store.load_messages().await?;
757        // Submits recovered messages first.
758        self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack);
759        self.submit_recovered_messages(messages, InitProcedureState::Running);
760
761        if !finished_ids.is_empty() {
762            info!(
763                "LocalManager try to clean finished procedures, num: {}",
764                finished_ids.len()
765            );
766
767            for procedure_id in finished_ids {
768                if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
769                    error!(e; "Failed to delete procedure {}", procedure_id);
770                }
771            }
772        }
773
774        info!(
775            "LocalManager finish recovery, cost: {}ms",
776            recover_start.elapsed().as_millis()
777        );
778
779        Ok(())
780    }
781
782    #[cfg(any(test, feature = "testing"))]
783    /// Returns true if contains a specified loader.
784    pub fn contains_loader(&self, name: &str) -> bool {
785        let loaders = self.manager_ctx.loaders.lock().unwrap();
786        loaders.contains_key(name)
787    }
788
789    async fn check_status(&self) -> Result<()> {
790        if let Some(pause_aware) = self.pause_aware.as_ref() {
791            ensure!(
792                !pause_aware.is_paused().await.context(CheckStatusSnafu)?,
793                ManagerPasuedSnafu
794            );
795        }
796
797        Ok(())
798    }
799}
800
801#[async_trait]
802impl ProcedureManager for LocalManager {
803    fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> {
804        let mut loaders = self.manager_ctx.loaders.lock().unwrap();
805        ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name });
806
807        let _ = loaders.insert(name.to_string(), loader);
808
809        Ok(())
810    }
811
812    async fn start(&self) -> Result<()> {
813        let mut task = self.remove_outdated_meta_task.lock().await;
814
815        if task.is_some() {
816            return Ok(());
817        }
818
819        let task_inner = self.build_remove_outdated_meta_task();
820
821        task_inner
822            .start(common_runtime::global_runtime())
823            .context(StartRemoveOutdatedMetaTaskSnafu)?;
824
825        *task = Some(task_inner);
826
827        self.manager_ctx.start();
828
829        info!("LocalManager is start.");
830
831        self.recover().await
832    }
833
834    async fn stop(&self) -> Result<()> {
835        let mut task = self.remove_outdated_meta_task.lock().await;
836
837        if let Some(task) = task.take() {
838            task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?;
839        }
840
841        self.manager_ctx.stop();
842
843        info!("LocalManager is stopped.");
844
845        Ok(())
846    }
847
848    async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher> {
849        let procedure_id = procedure.id;
850        ensure!(
851            !self.manager_ctx.contains_procedure(procedure_id),
852            DuplicateProcedureSnafu { procedure_id }
853        );
854        self.check_status().await?;
855
856        self.submit_root(
857            procedure.id,
858            ProcedureState::Running,
859            0,
860            procedure.procedure,
861        )
862    }
863
864    async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
865        Ok(self.manager_ctx.state(procedure_id))
866    }
867
868    fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
869        self.manager_ctx.watcher(procedure_id)
870    }
871
872    async fn list_procedures(&self) -> Result<Vec<ProcedureInfo>> {
873        Ok(self.manager_ctx.list_procedure())
874    }
875}
876
877struct RemoveOutdatedMetaFunction {
878    manager_ctx: Arc<ManagerContext>,
879    ttl: Duration,
880}
881
882#[async_trait::async_trait]
883impl TaskFunction<Error> for RemoveOutdatedMetaFunction {
884    fn name(&self) -> &str {
885        "ProcedureManager-remove-outdated-meta-task"
886    }
887
888    async fn call(&mut self) -> Result<()> {
889        self.manager_ctx.remove_outdated_meta(self.ttl);
890        Ok(())
891    }
892}
893
894/// Create a new [ProcedureMeta] for test purpose.
895#[cfg(test)]
896pub(crate) mod test_util {
897    use common_test_util::temp_dir::TempDir;
898    use object_store::services::Fs as Builder;
899    use object_store::ObjectStore;
900
901    use super::*;
902
903    pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
904        ProcedureMeta::new(
905            ProcedureId::random(),
906            ProcedureState::Running,
907            None,
908            LockKey::default(),
909            PoisonKeys::default(),
910            "ProcedureAdapter",
911            None,
912            None,
913        )
914    }
915
916    pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
917        let store_dir = dir.path().to_str().unwrap();
918        let builder = Builder::default();
919        ObjectStore::new(builder.root(store_dir)).unwrap().finish()
920    }
921}
922
923#[cfg(test)]
924mod tests {
925    use std::assert_matches::assert_matches;
926
927    use common_error::mock::MockError;
928    use common_error::status_code::StatusCode;
929    use common_test_util::temp_dir::create_temp_dir;
930    use tokio::time::timeout;
931
932    use super::*;
933    use crate::error::{self, Error};
934    use crate::store::state_store::ObjectStateStore;
935    use crate::test_util::InMemoryPoisonStore;
936    use crate::{Context, Procedure, Status};
937
938    fn new_test_manager_context() -> ManagerContext {
939        let poison_manager = Arc::new(InMemoryPoisonStore::default());
940        ManagerContext::new(poison_manager)
941    }
942
943    #[test]
944    fn test_manager_context() {
945        let ctx = new_test_manager_context();
946        let meta = Arc::new(test_util::procedure_meta_for_test());
947
948        assert!(!ctx.contains_procedure(meta.id));
949        assert!(ctx.state(meta.id).is_none());
950
951        assert!(ctx.try_insert_procedure(meta.clone()));
952        assert!(ctx.contains_procedure(meta.id));
953
954        assert!(ctx.state(meta.id).unwrap().is_running());
955        meta.set_state(ProcedureState::Done { output: None });
956        assert!(ctx.state(meta.id).unwrap().is_done());
957    }
958
959    #[test]
960    fn test_manager_context_insert_duplicate() {
961        let ctx = new_test_manager_context();
962        let meta = Arc::new(test_util::procedure_meta_for_test());
963
964        assert!(ctx.try_insert_procedure(meta.clone()));
965        assert!(!ctx.try_insert_procedure(meta));
966    }
967
968    fn new_child(parent_id: ProcedureId, ctx: &ManagerContext) -> ProcedureMetaRef {
969        let mut child = test_util::procedure_meta_for_test();
970        child.parent_id = Some(parent_id);
971        let child = Arc::new(child);
972        assert!(ctx.try_insert_procedure(child.clone()));
973
974        let mut parent = Vec::new();
975        ctx.find_procedures(&[parent_id], &mut parent);
976        parent[0].push_child(child.id);
977
978        child
979    }
980
981    #[test]
982    fn test_procedures_in_tree() {
983        let ctx = new_test_manager_context();
984        let root = Arc::new(test_util::procedure_meta_for_test());
985        assert!(ctx.try_insert_procedure(root.clone()));
986
987        assert_eq!(1, ctx.procedures_in_tree(&root).len());
988
989        let child1 = new_child(root.id, &ctx);
990        let child2 = new_child(root.id, &ctx);
991
992        let child3 = new_child(child1.id, &ctx);
993        let child4 = new_child(child1.id, &ctx);
994
995        let child5 = new_child(child2.id, &ctx);
996
997        let expect = vec![
998            root.id, child1.id, child2.id, child3.id, child4.id, child5.id,
999        ];
1000        assert_eq!(expect, ctx.procedures_in_tree(&root));
1001    }
1002
1003    #[derive(Debug)]
1004    struct ProcedureToLoad {
1005        content: String,
1006        lock_key: LockKey,
1007        poison_keys: PoisonKeys,
1008    }
1009
1010    #[async_trait]
1011    impl Procedure for ProcedureToLoad {
1012        fn type_name(&self) -> &str {
1013            "ProcedureToLoad"
1014        }
1015
1016        async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1017            Ok(Status::done())
1018        }
1019
1020        fn dump(&self) -> Result<String> {
1021            Ok(self.content.clone())
1022        }
1023
1024        fn lock_key(&self) -> LockKey {
1025            self.lock_key.clone()
1026        }
1027
1028        fn poison_keys(&self) -> PoisonKeys {
1029            self.poison_keys.clone()
1030        }
1031    }
1032
1033    impl ProcedureToLoad {
1034        fn new(content: &str) -> ProcedureToLoad {
1035            ProcedureToLoad {
1036                content: content.to_string(),
1037                lock_key: LockKey::default(),
1038                poison_keys: PoisonKeys::default(),
1039            }
1040        }
1041
1042        fn loader() -> BoxedProcedureLoader {
1043            let f = |json: &str| {
1044                let procedure = ProcedureToLoad::new(json);
1045                Ok(Box::new(procedure) as _)
1046            };
1047            Box::new(f)
1048        }
1049    }
1050
1051    #[test]
1052    fn test_register_loader() {
1053        let dir = create_temp_dir("register");
1054        let config = ManagerConfig {
1055            parent_path: "data/".to_string(),
1056            max_retry_times: 3,
1057            retry_delay: Duration::from_millis(500),
1058            ..Default::default()
1059        };
1060        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1061        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1062        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1063        manager.manager_ctx.start();
1064
1065        manager
1066            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1067            .unwrap();
1068        // Register duplicate loader.
1069        let err = manager
1070            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1071            .unwrap_err();
1072        assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
1073    }
1074
1075    #[tokio::test]
1076    async fn test_recover() {
1077        let dir = create_temp_dir("recover");
1078        let object_store = test_util::new_object_store(&dir);
1079        let config = ManagerConfig {
1080            parent_path: "data/".to_string(),
1081            max_retry_times: 3,
1082            retry_delay: Duration::from_millis(500),
1083            ..Default::default()
1084        };
1085        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1086        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1087        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1088        manager.manager_ctx.start();
1089
1090        manager
1091            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1092            .unwrap();
1093
1094        // Prepare data
1095        let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1096        let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
1097        let root_id = ProcedureId::random();
1098        // Prepare data for the root procedure.
1099        for step in 0..3 {
1100            let type_name = root.type_name().to_string();
1101            let data = root.dump().unwrap();
1102            procedure_store
1103                .store_procedure(root_id, step, type_name, data, None)
1104                .await
1105                .unwrap();
1106        }
1107
1108        let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure"));
1109        let child_id = ProcedureId::random();
1110        // Prepare data for the child procedure
1111        for step in 0..2 {
1112            let type_name = child.type_name().to_string();
1113            let data = child.dump().unwrap();
1114            procedure_store
1115                .store_procedure(child_id, step, type_name, data, Some(root_id))
1116                .await
1117                .unwrap();
1118        }
1119
1120        // Recover the manager
1121        manager.recover().await.unwrap();
1122
1123        // The manager should submit the root procedure.
1124        let _ = manager.procedure_state(root_id).await.unwrap().unwrap();
1125        // Since the mocked root procedure actually doesn't submit subprocedures, so there is no
1126        // related state.
1127        assert!(manager.procedure_state(child_id).await.unwrap().is_none());
1128    }
1129
1130    #[tokio::test]
1131    async fn test_submit_procedure() {
1132        let dir = create_temp_dir("submit");
1133        let config = ManagerConfig {
1134            parent_path: "data/".to_string(),
1135            max_retry_times: 3,
1136            retry_delay: Duration::from_millis(500),
1137            ..Default::default()
1138        };
1139        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1140        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1141        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1142        manager.manager_ctx.start();
1143
1144        let procedure_id = ProcedureId::random();
1145        assert!(manager
1146            .procedure_state(procedure_id)
1147            .await
1148            .unwrap()
1149            .is_none());
1150        assert!(manager.procedure_watcher(procedure_id).is_none());
1151
1152        let mut procedure = ProcedureToLoad::new("submit");
1153        procedure.lock_key = LockKey::single_exclusive("test.submit");
1154        assert!(manager
1155            .submit(ProcedureWithId {
1156                id: procedure_id,
1157                procedure: Box::new(procedure),
1158            })
1159            .await
1160            .is_ok());
1161        assert!(manager
1162            .procedure_state(procedure_id)
1163            .await
1164            .unwrap()
1165            .is_some());
1166        // Wait for the procedure done.
1167        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1168        watcher.changed().await.unwrap();
1169        assert!(watcher.borrow().is_done());
1170
1171        // Try to submit procedure with same id again.
1172        let err = manager
1173            .submit(ProcedureWithId {
1174                id: procedure_id,
1175                procedure: Box::new(ProcedureToLoad::new("submit")),
1176            })
1177            .await
1178            .unwrap_err();
1179        assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}");
1180    }
1181
1182    #[tokio::test]
1183    async fn test_state_changed_on_err() {
1184        let dir = create_temp_dir("on_err");
1185        let config = ManagerConfig {
1186            parent_path: "data/".to_string(),
1187            max_retry_times: 3,
1188            retry_delay: Duration::from_millis(500),
1189            ..Default::default()
1190        };
1191        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1192        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1193        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1194        manager.manager_ctx.start();
1195
1196        #[derive(Debug)]
1197        struct MockProcedure {
1198            panic: bool,
1199        }
1200
1201        #[async_trait]
1202        impl Procedure for MockProcedure {
1203            fn type_name(&self) -> &str {
1204                "MockProcedure"
1205            }
1206
1207            async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1208                if self.panic {
1209                    // Test the runner can set the state to failed even the procedure
1210                    // panics.
1211                    panic!();
1212                } else {
1213                    Err(Error::external(MockError::new(StatusCode::Unexpected)))
1214                }
1215            }
1216
1217            async fn rollback(&mut self, _: &Context) -> Result<()> {
1218                Ok(())
1219            }
1220
1221            fn rollback_supported(&self) -> bool {
1222                true
1223            }
1224
1225            fn dump(&self) -> Result<String> {
1226                Ok(String::new())
1227            }
1228
1229            fn lock_key(&self) -> LockKey {
1230                LockKey::single_exclusive("test.submit")
1231            }
1232
1233            fn poison_keys(&self) -> PoisonKeys {
1234                PoisonKeys::default()
1235            }
1236        }
1237
1238        let check_procedure = |procedure| async {
1239            let procedure_id = ProcedureId::random();
1240            manager
1241                .submit(ProcedureWithId {
1242                    id: procedure_id,
1243                    procedure: Box::new(procedure),
1244                })
1245                .await
1246                .unwrap()
1247        };
1248
1249        let mut watcher = check_procedure(MockProcedure { panic: false }).await;
1250        // Wait for the notification.
1251        watcher.changed().await.unwrap();
1252        assert!(watcher.borrow().is_prepare_rollback());
1253        watcher.changed().await.unwrap();
1254        assert!(watcher.borrow().is_rolling_back());
1255        watcher.changed().await.unwrap();
1256        assert!(watcher.borrow().is_failed());
1257        // The runner won't rollback a panicked procedure.
1258        let mut watcher = check_procedure(MockProcedure { panic: true }).await;
1259        watcher.changed().await.unwrap();
1260        assert!(watcher.borrow().is_failed());
1261    }
1262
1263    #[tokio::test]
1264    async fn test_procedure_manager_stopped() {
1265        let dir = create_temp_dir("procedure_manager_stopped");
1266        let config = ManagerConfig {
1267            parent_path: "data/".to_string(),
1268            max_retry_times: 3,
1269            retry_delay: Duration::from_millis(500),
1270            ..Default::default()
1271        };
1272        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1273        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1274        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1275
1276        let mut procedure = ProcedureToLoad::new("submit");
1277        procedure.lock_key = LockKey::single_exclusive("test.submit");
1278        let procedure_id = ProcedureId::random();
1279        assert_matches!(
1280            manager
1281                .submit(ProcedureWithId {
1282                    id: procedure_id,
1283                    procedure: Box::new(procedure),
1284                })
1285                .await
1286                .unwrap_err(),
1287            error::Error::ManagerNotStart { .. }
1288        );
1289    }
1290
1291    #[tokio::test]
1292    async fn test_procedure_manager_restart() {
1293        let dir = create_temp_dir("procedure_manager_restart");
1294        let config = ManagerConfig {
1295            parent_path: "data/".to_string(),
1296            max_retry_times: 3,
1297            retry_delay: Duration::from_millis(500),
1298            ..Default::default()
1299        };
1300        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1301        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1302        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1303
1304        manager.start().await.unwrap();
1305        manager.stop().await.unwrap();
1306        manager.start().await.unwrap();
1307
1308        let mut procedure = ProcedureToLoad::new("submit");
1309        procedure.lock_key = LockKey::single_exclusive("test.submit");
1310        let procedure_id = ProcedureId::random();
1311        assert!(manager
1312            .submit(ProcedureWithId {
1313                id: procedure_id,
1314                procedure: Box::new(procedure),
1315            })
1316            .await
1317            .is_ok());
1318        assert!(manager
1319            .procedure_state(procedure_id)
1320            .await
1321            .unwrap()
1322            .is_some());
1323    }
1324
1325    #[tokio::test(flavor = "multi_thread")]
1326    async fn test_remove_outdated_meta_task() {
1327        let dir = create_temp_dir("remove_outdated_meta_task");
1328        let object_store = test_util::new_object_store(&dir);
1329        let config = ManagerConfig {
1330            parent_path: "data/".to_string(),
1331            max_retry_times: 3,
1332            retry_delay: Duration::from_millis(500),
1333            remove_outdated_meta_task_interval: Duration::from_millis(1),
1334            remove_outdated_meta_ttl: Duration::from_millis(1),
1335            max_running_procedures: 128,
1336        };
1337        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1338        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1339        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1340        manager.manager_ctx.set_running();
1341
1342        let mut procedure = ProcedureToLoad::new("submit");
1343        procedure.lock_key = LockKey::single_exclusive("test.submit");
1344        let procedure_id = ProcedureId::random();
1345        assert!(manager
1346            .submit(ProcedureWithId {
1347                id: procedure_id,
1348                procedure: Box::new(procedure),
1349            })
1350            .await
1351            .is_ok());
1352        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1353        watcher.changed().await.unwrap();
1354
1355        manager.start().await.unwrap();
1356        tokio::time::sleep(Duration::from_millis(300)).await;
1357        assert!(manager
1358            .procedure_state(procedure_id)
1359            .await
1360            .unwrap()
1361            .is_none());
1362
1363        // The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed.
1364        manager.stop().await.unwrap();
1365        let mut procedure = ProcedureToLoad::new("submit");
1366        procedure.lock_key = LockKey::single_exclusive("test.submit");
1367        let procedure_id = ProcedureId::random();
1368
1369        manager.manager_ctx.set_running();
1370        assert!(manager
1371            .submit(ProcedureWithId {
1372                id: procedure_id,
1373                procedure: Box::new(procedure),
1374            })
1375            .await
1376            .is_ok());
1377        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1378        watcher.changed().await.unwrap();
1379        tokio::time::sleep(Duration::from_millis(300)).await;
1380        assert!(manager
1381            .procedure_state(procedure_id)
1382            .await
1383            .unwrap()
1384            .is_some());
1385
1386        // After restart
1387        let mut procedure = ProcedureToLoad::new("submit");
1388        procedure.lock_key = LockKey::single_exclusive("test.submit");
1389        let procedure_id = ProcedureId::random();
1390        assert!(manager
1391            .submit(ProcedureWithId {
1392                id: procedure_id,
1393                procedure: Box::new(procedure),
1394            })
1395            .await
1396            .is_ok());
1397        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1398        watcher.changed().await.unwrap();
1399
1400        manager.start().await.unwrap();
1401        tokio::time::sleep(Duration::from_millis(300)).await;
1402        assert!(manager
1403            .procedure_state(procedure_id)
1404            .await
1405            .unwrap()
1406            .is_none());
1407    }
1408
1409    #[tokio::test]
1410    async fn test_too_many_running_procedures() {
1411        let dir = create_temp_dir("too_many_running_procedures");
1412        let config = ManagerConfig {
1413            parent_path: "data/".to_string(),
1414            max_retry_times: 3,
1415            retry_delay: Duration::from_millis(500),
1416            max_running_procedures: 1,
1417            ..Default::default()
1418        };
1419        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1420        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1421        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1422        manager.manager_ctx.set_running();
1423
1424        manager
1425            .manager_ctx
1426            .running_procedures
1427            .lock()
1428            .unwrap()
1429            .insert(ProcedureId::random());
1430        manager.start().await.unwrap();
1431
1432        // Submit a new procedure should fail.
1433        let mut procedure = ProcedureToLoad::new("submit");
1434        procedure.lock_key = LockKey::single_exclusive("test.submit");
1435        let procedure_id = ProcedureId::random();
1436        let err = manager
1437            .submit(ProcedureWithId {
1438                id: procedure_id,
1439                procedure: Box::new(procedure),
1440            })
1441            .await
1442            .unwrap_err();
1443        assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
1444
1445        manager
1446            .manager_ctx
1447            .running_procedures
1448            .lock()
1449            .unwrap()
1450            .clear();
1451
1452        // Submit a new procedure should succeed.
1453        let mut procedure = ProcedureToLoad::new("submit");
1454        procedure.lock_key = LockKey::single_exclusive("test.submit");
1455        assert!(manager
1456            .submit(ProcedureWithId {
1457                id: procedure_id,
1458                procedure: Box::new(procedure),
1459            })
1460            .await
1461            .is_ok());
1462        assert!(manager
1463            .procedure_state(procedure_id)
1464            .await
1465            .unwrap()
1466            .is_some());
1467        // Wait for the procedure done.
1468        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1469        watcher.changed().await.unwrap();
1470        assert!(watcher.borrow().is_done());
1471    }
1472
1473    #[derive(Debug)]
1474    struct ProcedureToRecover {
1475        content: String,
1476        lock_key: LockKey,
1477        notify: Option<Arc<Notify>>,
1478        poison_keys: PoisonKeys,
1479    }
1480
1481    #[async_trait]
1482    impl Procedure for ProcedureToRecover {
1483        fn type_name(&self) -> &str {
1484            "ProcedureToRecover"
1485        }
1486
1487        async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1488            Ok(Status::done())
1489        }
1490
1491        fn dump(&self) -> Result<String> {
1492            Ok(self.content.clone())
1493        }
1494
1495        fn lock_key(&self) -> LockKey {
1496            self.lock_key.clone()
1497        }
1498
1499        fn recover(&mut self) -> Result<()> {
1500            self.notify.as_ref().unwrap().notify_one();
1501            Ok(())
1502        }
1503
1504        fn poison_keys(&self) -> PoisonKeys {
1505            self.poison_keys.clone()
1506        }
1507    }
1508
1509    impl ProcedureToRecover {
1510        fn new(content: &str) -> ProcedureToRecover {
1511            ProcedureToRecover {
1512                content: content.to_string(),
1513                lock_key: LockKey::default(),
1514                poison_keys: PoisonKeys::default(),
1515                notify: None,
1516            }
1517        }
1518
1519        fn loader(notify: Arc<Notify>) -> BoxedProcedureLoader {
1520            let f = move |json: &str| {
1521                let procedure = ProcedureToRecover {
1522                    content: json.to_string(),
1523                    lock_key: LockKey::default(),
1524                    poison_keys: PoisonKeys::default(),
1525                    notify: Some(notify.clone()),
1526                };
1527                Ok(Box::new(procedure) as _)
1528            };
1529            Box::new(f)
1530        }
1531    }
1532
1533    #[tokio::test]
1534    async fn test_procedure_recover() {
1535        common_telemetry::init_default_ut_logging();
1536        let dir = create_temp_dir("procedure_recover");
1537        let object_store = test_util::new_object_store(&dir);
1538        let config = ManagerConfig {
1539            parent_path: "data/".to_string(),
1540            max_retry_times: 3,
1541            retry_delay: Duration::from_millis(500),
1542            ..Default::default()
1543        };
1544        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1545        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1546        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1547        manager.manager_ctx.start();
1548
1549        let notify = Arc::new(Notify::new());
1550        manager
1551            .register_loader(
1552                "ProcedureToRecover",
1553                ProcedureToRecover::loader(notify.clone()),
1554            )
1555            .unwrap();
1556
1557        // Prepare data
1558        let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1559        let root: BoxedProcedure = Box::new(ProcedureToRecover::new("test procedure recovery"));
1560        let root_id = ProcedureId::random();
1561        // Prepare data for the root procedure.
1562        for step in 0..3 {
1563            let type_name = root.type_name().to_string();
1564            let data = root.dump().unwrap();
1565            procedure_store
1566                .store_procedure(root_id, step, type_name, data, None)
1567                .await
1568                .unwrap();
1569        }
1570
1571        // Recover the manager
1572        manager.recover().await.unwrap();
1573        timeout(Duration::from_secs(10), notify.notified())
1574            .await
1575            .unwrap();
1576    }
1577}