common_procedure/
local.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod runner;
16
17use std::collections::hash_map::Entry;
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20use std::sync::{Arc, Mutex, RwLock};
21use std::time::{Duration, Instant};
22
23use async_trait::async_trait;
24use backon::ExponentialBuilder;
25use common_error::ext::BoxedError;
26use common_event_recorder::EventRecorderRef;
27use common_runtime::{RepeatedTask, TaskFunction};
28use common_telemetry::tracing_context::{FutureExt, TracingContext};
29use common_telemetry::{error, info, tracing};
30use snafu::{OptionExt, ResultExt, ensure};
31use tokio::sync::watch::{self, Receiver, Sender};
32use tokio::sync::{Mutex as TokioMutex, Notify};
33
34use crate::error::{
35    self, CheckStatusSnafu, DuplicateProcedureSnafu, Error, LoaderConflictSnafu,
36    ManagerNotStartSnafu, ManagerPasuedSnafu, PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu,
37    Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
38    TooManyRunningProceduresSnafu,
39};
40use crate::event::ProcedureEvent;
41use crate::local::runner::Runner;
42use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
43use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
44use crate::store::poison_store::PoisonStoreRef;
45use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
46use crate::{
47    BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
48    ProcedureState, ProcedureWithId, StringKey, UserMetadata, Watcher,
49};
50
51/// The expired time of a procedure's metadata.
52const META_TTL: Duration = Duration::from_secs(60 * 10);
53
54/// Shared metadata of a procedure.
55///
56/// # Note
57/// [Notify] is not a condition variable, we can't guarantee the waiters are notified
58/// if they didn't call `notified()` before we signal the notify. So we
59/// 1. use dedicated notify for each condition, such as waiting for a lock, waiting
60///    for children;
61/// 2. always use `notify_one` and ensure there are only one waiter.
62#[derive(Debug)]
63pub(crate) struct ProcedureMeta {
64    /// Id of this procedure.
65    id: ProcedureId,
66    /// Type name of this procedure.
67    type_name: String,
68    /// Parent procedure id.
69    parent_id: Option<ProcedureId>,
70    /// Notify to wait for subprocedures.
71    child_notify: Notify,
72    /// Lock required by this procedure.
73    lock_key: LockKey,
74    /// Poison keys that may cause this procedure to become poisoned during execution.
75    poison_keys: PoisonKeys,
76    /// Sender to notify the procedure state.
77    state_sender: Sender<ProcedureState>,
78    /// Receiver to watch the procedure state.
79    state_receiver: Receiver<ProcedureState>,
80    /// Id of child procedures.
81    children: Mutex<Vec<ProcedureId>>,
82    /// Start execution time of this procedure.
83    start_time_ms: AtomicI64,
84    /// End execution time of this procedure.
85    end_time_ms: AtomicI64,
86    /// Event recorder.
87    event_recorder: Option<EventRecorderRef>,
88    /// The user metadata of the procedure. It's generated by [Procedure::user_metadata].
89    user_metadata: Option<UserMetadata>,
90}
91
92impl ProcedureMeta {
93    #[allow(clippy::too_many_arguments)]
94    fn new(
95        id: ProcedureId,
96        procedure_state: ProcedureState,
97        parent_id: Option<ProcedureId>,
98        lock_key: LockKey,
99        poison_keys: PoisonKeys,
100        type_name: &str,
101        event_recorder: Option<EventRecorderRef>,
102        user_metadata: Option<UserMetadata>,
103    ) -> ProcedureMeta {
104        let (state_sender, state_receiver) = watch::channel(procedure_state);
105        ProcedureMeta {
106            id,
107            parent_id,
108            child_notify: Notify::new(),
109            lock_key,
110            poison_keys,
111            state_sender,
112            state_receiver,
113            children: Mutex::new(Vec::new()),
114            start_time_ms: AtomicI64::new(0),
115            end_time_ms: AtomicI64::new(0),
116            type_name: type_name.to_string(),
117            event_recorder,
118            user_metadata,
119        }
120    }
121
122    /// Returns current [ProcedureState].
123    fn state(&self) -> ProcedureState {
124        self.state_receiver.borrow().clone()
125    }
126
127    /// Update current [ProcedureState].
128    fn set_state(&self, state: ProcedureState) {
129        // Emit the event to the event recorder if the user metadata contains the eventable object.
130        if let (Some(event_recorder), Some(user_metadata)) =
131            (&self.event_recorder, &self.user_metadata)
132            && let Some(event) = user_metadata.to_event()
133        {
134            event_recorder.record(Box::new(ProcedureEvent::new(self.id, event, state.clone())));
135        }
136
137        // Safety: ProcedureMeta also holds the receiver, so `send()` should never fail.
138        self.state_sender.send(state).unwrap();
139    }
140
141    /// Push `procedure_id` of the subprocedure to the metadata.
142    fn push_child(&self, procedure_id: ProcedureId) {
143        let mut children = self.children.lock().unwrap();
144        children.push(procedure_id);
145    }
146
147    /// Append subprocedures to given `buffer`.
148    fn list_children(&self, buffer: &mut Vec<ProcedureId>) {
149        let children = self.children.lock().unwrap();
150        buffer.extend_from_slice(&children);
151    }
152
153    /// Returns the number of subprocedures.
154    fn num_children(&self) -> usize {
155        self.children.lock().unwrap().len()
156    }
157
158    /// update the start time of the procedure.
159    fn set_start_time_ms(&self) {
160        self.start_time_ms
161            .store(common_time::util::current_time_millis(), Ordering::Relaxed);
162    }
163
164    /// update the end time of the procedure.
165    fn set_end_time_ms(&self) {
166        self.end_time_ms
167            .store(common_time::util::current_time_millis(), Ordering::Relaxed);
168    }
169}
170
171/// Reference counted pointer to [ProcedureMeta].
172type ProcedureMetaRef = Arc<ProcedureMeta>;
173
174/// Procedure loaded from store.
175struct LoadedProcedure {
176    procedure: BoxedProcedure,
177    step: u32,
178}
179
180/// The dynamic lock for procedure execution.
181///
182/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
183/// during execution. They are only held when the procedure specifically needs these keys
184/// and are released as soon as the procedure no longer needs them.
185/// This allows for more fine-grained concurrency control during procedure execution.
186pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
187
188/// Acquires a dynamic key lock for the given key.
189///
190/// This function takes a reference to the dynamic key lock and a pointer to the key.
191/// It then matches the key type and acquires the appropriate lock.
192pub async fn acquire_dynamic_key_lock(
193    lock: &DynamicKeyLock,
194    key: &StringKey,
195) -> DynamicKeyLockGuard {
196    match key {
197        StringKey::Share(key) => {
198            let guard = lock.read(key.to_string()).await;
199            DynamicKeyLockGuard {
200                guard: Some(OwnedKeyRwLockGuard::from(guard)),
201                key: key.to_string(),
202                lock: lock.clone(),
203            }
204        }
205        StringKey::Exclusive(key) => {
206            let guard = lock.write(key.to_string()).await;
207            DynamicKeyLockGuard {
208                guard: Some(OwnedKeyRwLockGuard::from(guard)),
209                key: key.to_string(),
210                lock: lock.clone(),
211            }
212        }
213    }
214}
215/// A guard for the dynamic key lock.
216///
217/// This guard is used to release the lock when the procedure no longer needs it.
218/// It also ensures that the lock is cleaned up when the guard is dropped.
219pub struct DynamicKeyLockGuard {
220    guard: Option<OwnedKeyRwLockGuard>,
221    key: String,
222    lock: DynamicKeyLock,
223}
224
225impl Drop for DynamicKeyLockGuard {
226    fn drop(&mut self) {
227        if let Some(guard) = self.guard.take() {
228            drop(guard);
229        }
230        self.lock.clean_keys(&[self.key.to_string()]);
231    }
232}
233
234/// Shared context of the manager.
235pub(crate) struct ManagerContext {
236    /// Procedure loaders. The key is the type name of the procedure which the loader returns.
237    loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
238    /// The key lock for the procedure.
239    ///
240    /// The lock keys are defined in `Procedure::lock_key()`.
241    /// These locks are acquired before the procedure starts and released after the procedure finishes.
242    /// They ensure exclusive access to resources throughout the entire procedure lifecycle.
243    key_lock: KeyRwLock<String>,
244    /// The dynamic lock for procedure execution.
245    ///
246    /// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
247    /// during execution. They are only held when the procedure specifically needs these keys
248    /// and are released as soon as the procedure no longer needs them.
249    /// This allows for more fine-grained concurrency control during procedure execution.
250    dynamic_key_lock: DynamicKeyLock,
251    /// Procedures in the manager.
252    procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
253    /// Running procedures.
254    running_procedures: Mutex<HashSet<ProcedureId>>,
255    /// Ids and finished time of finished procedures.
256    finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
257    /// Running flag.
258    running: Arc<AtomicBool>,
259    /// Poison manager.
260    poison_manager: PoisonStoreRef,
261}
262
263#[async_trait]
264impl ContextProvider for ManagerContext {
265    async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
266        Ok(self.state(procedure_id))
267    }
268
269    async fn procedure_state_receiver(
270        &self,
271        procedure_id: ProcedureId,
272    ) -> Result<Option<Receiver<ProcedureState>>> {
273        Ok(self.state_receiver(procedure_id))
274    }
275
276    async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
277        {
278            // validate the procedure exists
279            let procedures = self.procedures.read().unwrap();
280            let procedure = procedures
281                .get(&procedure_id)
282                .context(ProcedureNotFoundSnafu { procedure_id })?;
283
284            // validate the poison key is defined
285            ensure!(
286                procedure.poison_keys.contains(key),
287                PoisonKeyNotDefinedSnafu {
288                    key: key.clone(),
289                    procedure_id
290                }
291            );
292        }
293        let key = key.to_string();
294        let procedure_id = procedure_id.to_string();
295        self.poison_manager.try_put_poison(key, procedure_id).await
296    }
297
298    async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
299        acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
300    }
301}
302
303impl ManagerContext {
304    /// Returns a new [ManagerContext].
305    fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
306        ManagerContext {
307            key_lock: KeyRwLock::new(),
308            dynamic_key_lock: Arc::new(KeyRwLock::new()),
309            loaders: Mutex::new(HashMap::new()),
310            procedures: RwLock::new(HashMap::new()),
311            running_procedures: Mutex::new(HashSet::new()),
312            finished_procedures: Mutex::new(VecDeque::new()),
313            running: Arc::new(AtomicBool::new(false)),
314            poison_manager,
315        }
316    }
317
318    #[cfg(test)]
319    pub(crate) fn set_running(&self) {
320        self.running.store(true, Ordering::Relaxed);
321    }
322
323    /// Set the running flag.
324    pub(crate) fn start(&self) {
325        self.running.store(true, Ordering::Relaxed);
326    }
327
328    pub(crate) fn stop(&self) {
329        self.running.store(false, Ordering::Relaxed);
330    }
331
332    /// Return `ProcedureManager` is running.
333    pub(crate) fn running(&self) -> bool {
334        self.running.load(Ordering::Relaxed)
335    }
336
337    /// Returns true if the procedure with specific `procedure_id` exists.
338    fn contains_procedure(&self, procedure_id: ProcedureId) -> bool {
339        let procedures = self.procedures.read().unwrap();
340        procedures.contains_key(&procedure_id)
341    }
342
343    /// Returns the number of running procedures.
344    fn num_running_procedures(&self) -> usize {
345        self.running_procedures.lock().unwrap().len()
346    }
347
348    /// Try to insert the `procedure` to the context if there is no procedure
349    /// with same [ProcedureId].
350    ///
351    /// Returns `false` if there is already a procedure using the same [ProcedureId].
352    fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
353        let procedure_id = meta.id;
354        let mut procedures = self.procedures.write().unwrap();
355        match procedures.entry(procedure_id) {
356            Entry::Occupied(_) => return false,
357            Entry::Vacant(vacant_entry) => {
358                vacant_entry.insert(meta);
359            }
360        }
361
362        let mut running_procedures = self.running_procedures.lock().unwrap();
363        running_procedures.insert(procedure_id);
364
365        true
366    }
367
368    /// Returns the [ProcedureState] of specific `procedure_id`.
369    fn state(&self, procedure_id: ProcedureId) -> Option<ProcedureState> {
370        let procedures = self.procedures.read().unwrap();
371        procedures.get(&procedure_id).map(|meta| meta.state())
372    }
373
374    /// Returns the [Receiver<ProcedureState>] of specific `procedure_id`.
375    fn state_receiver(&self, procedure_id: ProcedureId) -> Option<Receiver<ProcedureState>> {
376        let procedures = self.procedures.read().unwrap();
377        procedures
378            .get(&procedure_id)
379            .map(|meta| meta.state_receiver.clone())
380    }
381
382    /// Returns the [ProcedureMeta] of all procedures.
383    fn list_procedure(&self) -> Vec<ProcedureInfo> {
384        let procedures = self.procedures.read().unwrap();
385        procedures
386            .values()
387            .map(|meta| ProcedureInfo {
388                id: meta.id,
389                type_name: meta.type_name.clone(),
390                start_time_ms: meta.start_time_ms.load(Ordering::Relaxed),
391                end_time_ms: meta.end_time_ms.load(Ordering::Relaxed),
392                state: meta.state(),
393                lock_keys: meta.lock_key.get_keys(),
394            })
395            .collect()
396    }
397
398    /// Returns the [Watcher] of specific `procedure_id`.
399    fn watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
400        let procedures = self.procedures.read().unwrap();
401        procedures
402            .get(&procedure_id)
403            .map(|meta| meta.state_receiver.clone())
404    }
405
406    /// Notify a suspended parent procedure with specific `procedure_id` by its subprocedure.
407    fn notify_by_subprocedure(&self, procedure_id: ProcedureId) {
408        let procedures = self.procedures.read().unwrap();
409        if let Some(meta) = procedures.get(&procedure_id) {
410            meta.child_notify.notify_one();
411        }
412    }
413
414    /// Load procedure from specific [ProcedureMessage].
415    fn load_one_procedure_from_message(
416        &self,
417        procedure_id: ProcedureId,
418        message: &ProcedureMessage,
419    ) -> Option<LoadedProcedure> {
420        let loaders = self.loaders.lock().unwrap();
421        let loader = loaders.get(&message.type_name).or_else(|| {
422            error!(
423                "Loader not found, procedure_id: {}, type_name: {}",
424                procedure_id, message.type_name
425            );
426            None
427        })?;
428
429        let procedure = loader(&message.data)
430            .map_err(|e| {
431                error!(
432                    "Failed to load procedure data, key: {}, source: {:?}",
433                    procedure_id, e
434                );
435                e
436            })
437            .ok()?;
438
439        Some(LoadedProcedure {
440            procedure,
441            step: message.step,
442        })
443    }
444
445    /// Returns all procedures in the tree (including given `root` procedure).
446    ///
447    /// If callers need a consistent view of the tree, they must ensure no new
448    /// procedure is added to the tree during using this method.
449    fn procedures_in_tree(&self, root: &ProcedureMetaRef) -> Vec<ProcedureId> {
450        let sub_num = root.num_children();
451        // Reserve capacity for the root procedure and its children.
452        let mut procedures = Vec::with_capacity(1 + sub_num);
453
454        let mut queue = VecDeque::with_capacity(1 + sub_num);
455        // Push the root procedure to the queue.
456        queue.push_back(root.clone());
457
458        let mut children_ids = Vec::with_capacity(sub_num);
459        let mut children = Vec::with_capacity(sub_num);
460        while let Some(meta) = queue.pop_front() {
461            procedures.push(meta.id);
462
463            // Find metadatas of children.
464            children_ids.clear();
465            meta.list_children(&mut children_ids);
466            self.find_procedures(&children_ids, &mut children);
467
468            // Traverse children later.
469            for child in children.drain(..) {
470                queue.push_back(child);
471            }
472        }
473
474        procedures
475    }
476
477    /// Finds procedures by given `procedure_ids`.
478    ///
479    /// Ignores the id if corresponding procedure is not found.
480    fn find_procedures(&self, procedure_ids: &[ProcedureId], metas: &mut Vec<ProcedureMetaRef>) {
481        let procedures = self.procedures.read().unwrap();
482        for procedure_id in procedure_ids {
483            if let Some(meta) = procedures.get(procedure_id) {
484                metas.push(meta.clone());
485            }
486        }
487    }
488
489    /// Clean resources of finished procedures.
490    fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
491        // Since users need to query the procedure state, so we can't remove the
492        // meta of the procedure directly.
493        let now = Instant::now();
494        let mut finished_procedures = self.finished_procedures.lock().unwrap();
495        finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
496
497        // Remove the procedures from the running set.
498        let mut running_procedures = self.running_procedures.lock().unwrap();
499        for procedure_id in procedure_ids {
500            running_procedures.remove(procedure_id);
501        }
502    }
503
504    /// Remove metadata of outdated procedures.
505    fn remove_outdated_meta(&self, ttl: Duration) {
506        let ids = {
507            let mut finished_procedures = self.finished_procedures.lock().unwrap();
508            if finished_procedures.is_empty() {
509                return;
510            }
511
512            let mut ids_to_remove = Vec::new();
513            while let Some((id, finish_time)) = finished_procedures.front() {
514                if finish_time.elapsed() > ttl {
515                    ids_to_remove.push(*id);
516                    let _ = finished_procedures.pop_front();
517                } else {
518                    // The rest procedures are finished later, so we can break
519                    // the loop.
520                    break;
521                }
522            }
523            ids_to_remove
524        };
525
526        if ids.is_empty() {
527            return;
528        }
529
530        let mut procedures = self.procedures.write().unwrap();
531        for id in ids {
532            let _ = procedures.remove(&id);
533        }
534    }
535}
536
537/// Config for [LocalManager].
538#[derive(Debug)]
539pub struct ManagerConfig {
540    pub parent_path: String,
541    pub max_retry_times: usize,
542    pub retry_delay: Duration,
543    pub remove_outdated_meta_task_interval: Duration,
544    pub remove_outdated_meta_ttl: Duration,
545    pub max_running_procedures: usize,
546}
547
548impl Default for ManagerConfig {
549    fn default() -> Self {
550        Self {
551            parent_path: String::default(),
552            max_retry_times: 3,
553            retry_delay: Duration::from_millis(500),
554            remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
555            remove_outdated_meta_ttl: META_TTL,
556            max_running_procedures: 128,
557        }
558    }
559}
560
561type PauseAwareRef = Arc<dyn PauseAware>;
562
563#[async_trait]
564pub trait PauseAware: Send + Sync {
565    /// Returns true if the procedure manager is paused.
566    async fn is_paused(&self) -> std::result::Result<bool, BoxedError>;
567}
568
569/// A [ProcedureManager] that maintains procedure states locally.
570pub struct LocalManager {
571    manager_ctx: Arc<ManagerContext>,
572    procedure_store: Arc<ProcedureStore>,
573    max_retry_times: usize,
574    retry_delay: Duration,
575    /// GC task.
576    remove_outdated_meta_task: TokioMutex<Option<RepeatedTask<Error>>>,
577    config: ManagerConfig,
578    pause_aware: Option<PauseAwareRef>,
579    event_recorder: Option<EventRecorderRef>,
580}
581
582impl LocalManager {
583    /// Create a new [LocalManager] with specific `config`.
584    pub fn new(
585        config: ManagerConfig,
586        state_store: StateStoreRef,
587        poison_store: PoisonStoreRef,
588        pause_aware: Option<PauseAwareRef>,
589        event_recorder: Option<EventRecorderRef>,
590    ) -> LocalManager {
591        let manager_ctx = Arc::new(ManagerContext::new(poison_store));
592
593        LocalManager {
594            manager_ctx,
595            procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)),
596            max_retry_times: config.max_retry_times,
597            retry_delay: config.retry_delay,
598            remove_outdated_meta_task: TokioMutex::new(None),
599            config,
600            pause_aware,
601            event_recorder,
602        }
603    }
604
605    /// Build remove outedated meta task
606    pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask<Error> {
607        RepeatedTask::new(
608            self.config.remove_outdated_meta_task_interval,
609            Box::new(RemoveOutdatedMetaFunction {
610                manager_ctx: self.manager_ctx.clone(),
611                ttl: self.config.remove_outdated_meta_ttl,
612            }),
613        )
614    }
615
616    /// Submit a root procedure with given `procedure_id`.
617    fn submit_root(
618        &self,
619        procedure_id: ProcedureId,
620        procedure_state: ProcedureState,
621        step: u32,
622        procedure: BoxedProcedure,
623    ) -> Result<Watcher> {
624        ensure!(self.manager_ctx.running(), ManagerNotStartSnafu);
625
626        let user_metadata = procedure.user_metadata();
627        let meta = Arc::new(ProcedureMeta::new(
628            procedure_id,
629            procedure_state,
630            None,
631            procedure.lock_key(),
632            procedure.poison_keys(),
633            procedure.type_name(),
634            self.event_recorder.clone(),
635            user_metadata.clone(),
636        ));
637        let runner = Runner {
638            meta: meta.clone(),
639            procedure,
640            manager_ctx: self.manager_ctx.clone(),
641            step,
642            exponential_builder: ExponentialBuilder::default()
643                .with_min_delay(self.retry_delay)
644                .with_max_times(self.max_retry_times),
645            store: self.procedure_store.clone(),
646            rolling_back: false,
647            event_recorder: self.event_recorder.clone(),
648        };
649
650        if let (Some(event_recorder), Some(event)) = (
651            self.event_recorder.as_ref(),
652            user_metadata.and_then(|m| m.to_event()),
653        ) {
654            event_recorder.record(Box::new(ProcedureEvent::new(
655                procedure_id,
656                event,
657                ProcedureState::Running,
658            )));
659        }
660
661        let watcher = meta.state_receiver.clone();
662
663        ensure!(
664            self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
665            TooManyRunningProceduresSnafu {
666                max_running_procedures: self.config.max_running_procedures,
667            }
668        );
669
670        // Inserts meta into the manager before actually spawnd the runner.
671        ensure!(
672            self.manager_ctx.try_insert_procedure(meta),
673            DuplicateProcedureSnafu { procedure_id },
674        );
675
676        let tracing_context = TracingContext::from_current_span();
677
678        let _handle = common_runtime::spawn_global(async move {
679            // Run the root procedure.
680            // The task was moved to another runtime for execution.
681            // In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
682            runner
683                .run()
684                .trace(
685                    tracing_context
686                        .attach(tracing::info_span!("LocalManager::submit_root_procedure")),
687                )
688                .await;
689        });
690
691        Ok(watcher)
692    }
693
694    fn submit_recovered_messages(
695        &self,
696        messages: HashMap<ProcedureId, ProcedureMessage>,
697        init_state: InitProcedureState,
698    ) {
699        for (procedure_id, message) in &messages {
700            if message.parent_id.is_none() {
701                // This is the root procedure. We only submit the root procedure as it will
702                // submit sub-procedures to the manager.
703                let Some(mut loaded_procedure) = self
704                    .manager_ctx
705                    .load_one_procedure_from_message(*procedure_id, message)
706                else {
707                    // Try to load other procedures.
708                    continue;
709                };
710
711                info!(
712                    "Recover root procedure {}-{}, step: {}",
713                    loaded_procedure.procedure.type_name(),
714                    procedure_id,
715                    loaded_procedure.step
716                );
717
718                let procedure_state = match init_state {
719                    InitProcedureState::RollingBack => ProcedureState::RollingBack {
720                        error: Arc::new(
721                            error::RollbackProcedureRecoveredSnafu {
722                                error: message.error.clone().unwrap_or("Unknown error".to_string()),
723                            }
724                            .build(),
725                        ),
726                    },
727                    InitProcedureState::Running => ProcedureState::Running,
728                };
729
730                if let Err(e) = loaded_procedure.procedure.recover() {
731                    error!(e; "Failed to recover procedure {}", procedure_id);
732                }
733
734                if let Err(e) = self.submit_root(
735                    *procedure_id,
736                    procedure_state,
737                    loaded_procedure.step,
738                    loaded_procedure.procedure,
739                ) {
740                    error!(e; "Failed to recover procedure {}", procedure_id);
741                }
742            }
743        }
744    }
745
746    /// Recovers unfinished procedures and reruns them.
747    async fn recover(&self) -> Result<()> {
748        info!("LocalManager start to recover");
749        let recover_start = Instant::now();
750
751        let ProcedureMessages {
752            messages,
753            rollback_messages,
754            finished_ids,
755        } = self.procedure_store.load_messages().await?;
756        // Submits recovered messages first.
757        self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack);
758        self.submit_recovered_messages(messages, InitProcedureState::Running);
759
760        if !finished_ids.is_empty() {
761            info!(
762                "LocalManager try to clean finished procedures, num: {}",
763                finished_ids.len()
764            );
765
766            for procedure_id in finished_ids {
767                if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
768                    error!(e; "Failed to delete procedure {}", procedure_id);
769                }
770            }
771        }
772
773        info!(
774            "LocalManager finish recovery, cost: {}ms",
775            recover_start.elapsed().as_millis()
776        );
777
778        Ok(())
779    }
780
781    #[cfg(any(test, feature = "testing"))]
782    /// Returns true if contains a specified loader.
783    pub fn contains_loader(&self, name: &str) -> bool {
784        let loaders = self.manager_ctx.loaders.lock().unwrap();
785        loaders.contains_key(name)
786    }
787
788    async fn check_status(&self) -> Result<()> {
789        if let Some(pause_aware) = self.pause_aware.as_ref() {
790            ensure!(
791                !pause_aware.is_paused().await.context(CheckStatusSnafu)?,
792                ManagerPasuedSnafu
793            );
794        }
795
796        Ok(())
797    }
798}
799
800#[async_trait]
801impl ProcedureManager for LocalManager {
802    fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> {
803        let mut loaders = self.manager_ctx.loaders.lock().unwrap();
804        ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name });
805
806        let _ = loaders.insert(name.to_string(), loader);
807
808        Ok(())
809    }
810
811    async fn start(&self) -> Result<()> {
812        let mut task = self.remove_outdated_meta_task.lock().await;
813
814        if task.is_some() {
815            return Ok(());
816        }
817
818        let task_inner = self.build_remove_outdated_meta_task();
819
820        task_inner
821            .start(common_runtime::global_runtime())
822            .context(StartRemoveOutdatedMetaTaskSnafu)?;
823
824        *task = Some(task_inner);
825
826        self.manager_ctx.start();
827
828        info!("LocalManager is start.");
829
830        self.recover().await
831    }
832
833    async fn stop(&self) -> Result<()> {
834        let mut task = self.remove_outdated_meta_task.lock().await;
835
836        if let Some(task) = task.take() {
837            task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?;
838        }
839
840        self.manager_ctx.stop();
841
842        info!("LocalManager is stopped.");
843
844        Ok(())
845    }
846
847    async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher> {
848        let procedure_id = procedure.id;
849        ensure!(
850            !self.manager_ctx.contains_procedure(procedure_id),
851            DuplicateProcedureSnafu { procedure_id }
852        );
853        self.check_status().await?;
854
855        self.submit_root(
856            procedure.id,
857            ProcedureState::Running,
858            0,
859            procedure.procedure,
860        )
861    }
862
863    async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
864        Ok(self.manager_ctx.state(procedure_id))
865    }
866
867    fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
868        self.manager_ctx.watcher(procedure_id)
869    }
870
871    async fn list_procedures(&self) -> Result<Vec<ProcedureInfo>> {
872        Ok(self.manager_ctx.list_procedure())
873    }
874}
875
876struct RemoveOutdatedMetaFunction {
877    manager_ctx: Arc<ManagerContext>,
878    ttl: Duration,
879}
880
881#[async_trait::async_trait]
882impl TaskFunction<Error> for RemoveOutdatedMetaFunction {
883    fn name(&self) -> &str {
884        "ProcedureManager-remove-outdated-meta-task"
885    }
886
887    async fn call(&mut self) -> Result<()> {
888        self.manager_ctx.remove_outdated_meta(self.ttl);
889        Ok(())
890    }
891}
892
893/// Create a new [ProcedureMeta] for test purpose.
894#[cfg(test)]
895pub(crate) mod test_util {
896    use common_test_util::temp_dir::TempDir;
897    use object_store::ObjectStore;
898    use object_store::services::Fs as Builder;
899
900    use super::*;
901
902    pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
903        ProcedureMeta::new(
904            ProcedureId::random(),
905            ProcedureState::Running,
906            None,
907            LockKey::default(),
908            PoisonKeys::default(),
909            "ProcedureAdapter",
910            None,
911            None,
912        )
913    }
914
915    pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
916        let store_dir = dir.path().to_str().unwrap();
917        let builder = Builder::default();
918        ObjectStore::new(builder.root(store_dir)).unwrap().finish()
919    }
920}
921
922#[cfg(test)]
923mod tests {
924    use std::assert_matches::assert_matches;
925
926    use common_error::mock::MockError;
927    use common_error::status_code::StatusCode;
928    use common_test_util::temp_dir::create_temp_dir;
929    use tokio::time::timeout;
930
931    use super::*;
932    use crate::error::{self, Error};
933    use crate::store::state_store::ObjectStateStore;
934    use crate::test_util::InMemoryPoisonStore;
935    use crate::{Context, Procedure, Status};
936
937    fn new_test_manager_context() -> ManagerContext {
938        let poison_manager = Arc::new(InMemoryPoisonStore::default());
939        ManagerContext::new(poison_manager)
940    }
941
942    #[test]
943    fn test_manager_context() {
944        let ctx = new_test_manager_context();
945        let meta = Arc::new(test_util::procedure_meta_for_test());
946
947        assert!(!ctx.contains_procedure(meta.id));
948        assert!(ctx.state(meta.id).is_none());
949
950        assert!(ctx.try_insert_procedure(meta.clone()));
951        assert!(ctx.contains_procedure(meta.id));
952
953        assert!(ctx.state(meta.id).unwrap().is_running());
954        meta.set_state(ProcedureState::Done { output: None });
955        assert!(ctx.state(meta.id).unwrap().is_done());
956    }
957
958    #[test]
959    fn test_manager_context_insert_duplicate() {
960        let ctx = new_test_manager_context();
961        let meta = Arc::new(test_util::procedure_meta_for_test());
962
963        assert!(ctx.try_insert_procedure(meta.clone()));
964        assert!(!ctx.try_insert_procedure(meta));
965    }
966
967    fn new_child(parent_id: ProcedureId, ctx: &ManagerContext) -> ProcedureMetaRef {
968        let mut child = test_util::procedure_meta_for_test();
969        child.parent_id = Some(parent_id);
970        let child = Arc::new(child);
971        assert!(ctx.try_insert_procedure(child.clone()));
972
973        let mut parent = Vec::new();
974        ctx.find_procedures(&[parent_id], &mut parent);
975        parent[0].push_child(child.id);
976
977        child
978    }
979
980    #[test]
981    fn test_procedures_in_tree() {
982        let ctx = new_test_manager_context();
983        let root = Arc::new(test_util::procedure_meta_for_test());
984        assert!(ctx.try_insert_procedure(root.clone()));
985
986        assert_eq!(1, ctx.procedures_in_tree(&root).len());
987
988        let child1 = new_child(root.id, &ctx);
989        let child2 = new_child(root.id, &ctx);
990
991        let child3 = new_child(child1.id, &ctx);
992        let child4 = new_child(child1.id, &ctx);
993
994        let child5 = new_child(child2.id, &ctx);
995
996        let expect = vec![
997            root.id, child1.id, child2.id, child3.id, child4.id, child5.id,
998        ];
999        assert_eq!(expect, ctx.procedures_in_tree(&root));
1000    }
1001
1002    #[derive(Debug)]
1003    struct ProcedureToLoad {
1004        content: String,
1005        lock_key: LockKey,
1006        poison_keys: PoisonKeys,
1007    }
1008
1009    #[async_trait]
1010    impl Procedure for ProcedureToLoad {
1011        fn type_name(&self) -> &str {
1012            "ProcedureToLoad"
1013        }
1014
1015        async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1016            Ok(Status::done())
1017        }
1018
1019        fn dump(&self) -> Result<String> {
1020            Ok(self.content.clone())
1021        }
1022
1023        fn lock_key(&self) -> LockKey {
1024            self.lock_key.clone()
1025        }
1026
1027        fn poison_keys(&self) -> PoisonKeys {
1028            self.poison_keys.clone()
1029        }
1030    }
1031
1032    impl ProcedureToLoad {
1033        fn new(content: &str) -> ProcedureToLoad {
1034            ProcedureToLoad {
1035                content: content.to_string(),
1036                lock_key: LockKey::default(),
1037                poison_keys: PoisonKeys::default(),
1038            }
1039        }
1040
1041        fn loader() -> BoxedProcedureLoader {
1042            let f = |json: &str| {
1043                let procedure = ProcedureToLoad::new(json);
1044                Ok(Box::new(procedure) as _)
1045            };
1046            Box::new(f)
1047        }
1048    }
1049
1050    #[test]
1051    fn test_register_loader() {
1052        let dir = create_temp_dir("register");
1053        let config = ManagerConfig {
1054            parent_path: "data/".to_string(),
1055            max_retry_times: 3,
1056            retry_delay: Duration::from_millis(500),
1057            ..Default::default()
1058        };
1059        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1060        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1061        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1062        manager.manager_ctx.start();
1063
1064        manager
1065            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1066            .unwrap();
1067        // Register duplicate loader.
1068        let err = manager
1069            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1070            .unwrap_err();
1071        assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
1072    }
1073
1074    #[tokio::test]
1075    async fn test_recover() {
1076        let dir = create_temp_dir("recover");
1077        let object_store = test_util::new_object_store(&dir);
1078        let config = ManagerConfig {
1079            parent_path: "data/".to_string(),
1080            max_retry_times: 3,
1081            retry_delay: Duration::from_millis(500),
1082            ..Default::default()
1083        };
1084        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1085        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1086        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1087        manager.manager_ctx.start();
1088
1089        manager
1090            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1091            .unwrap();
1092
1093        // Prepare data
1094        let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1095        let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
1096        let root_id = ProcedureId::random();
1097        // Prepare data for the root procedure.
1098        for step in 0..3 {
1099            let type_name = root.type_name().to_string();
1100            let data = root.dump().unwrap();
1101            procedure_store
1102                .store_procedure(root_id, step, type_name, data, None)
1103                .await
1104                .unwrap();
1105        }
1106
1107        let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure"));
1108        let child_id = ProcedureId::random();
1109        // Prepare data for the child procedure
1110        for step in 0..2 {
1111            let type_name = child.type_name().to_string();
1112            let data = child.dump().unwrap();
1113            procedure_store
1114                .store_procedure(child_id, step, type_name, data, Some(root_id))
1115                .await
1116                .unwrap();
1117        }
1118
1119        // Recover the manager
1120        manager.recover().await.unwrap();
1121
1122        // The manager should submit the root procedure.
1123        let _ = manager.procedure_state(root_id).await.unwrap().unwrap();
1124        // Since the mocked root procedure actually doesn't submit subprocedures, so there is no
1125        // related state.
1126        assert!(manager.procedure_state(child_id).await.unwrap().is_none());
1127    }
1128
1129    #[tokio::test]
1130    async fn test_submit_procedure() {
1131        let dir = create_temp_dir("submit");
1132        let config = ManagerConfig {
1133            parent_path: "data/".to_string(),
1134            max_retry_times: 3,
1135            retry_delay: Duration::from_millis(500),
1136            ..Default::default()
1137        };
1138        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1139        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1140        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1141        manager.manager_ctx.start();
1142
1143        let procedure_id = ProcedureId::random();
1144        assert!(
1145            manager
1146                .procedure_state(procedure_id)
1147                .await
1148                .unwrap()
1149                .is_none()
1150        );
1151        assert!(manager.procedure_watcher(procedure_id).is_none());
1152
1153        let mut procedure = ProcedureToLoad::new("submit");
1154        procedure.lock_key = LockKey::single_exclusive("test.submit");
1155        assert!(
1156            manager
1157                .submit(ProcedureWithId {
1158                    id: procedure_id,
1159                    procedure: Box::new(procedure),
1160                })
1161                .await
1162                .is_ok()
1163        );
1164        assert!(
1165            manager
1166                .procedure_state(procedure_id)
1167                .await
1168                .unwrap()
1169                .is_some()
1170        );
1171        // Wait for the procedure done.
1172        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1173        watcher.changed().await.unwrap();
1174        assert!(watcher.borrow().is_done());
1175
1176        // Try to submit procedure with same id again.
1177        let err = manager
1178            .submit(ProcedureWithId {
1179                id: procedure_id,
1180                procedure: Box::new(ProcedureToLoad::new("submit")),
1181            })
1182            .await
1183            .unwrap_err();
1184        assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}");
1185    }
1186
1187    #[tokio::test]
1188    async fn test_state_changed_on_err() {
1189        let dir = create_temp_dir("on_err");
1190        let config = ManagerConfig {
1191            parent_path: "data/".to_string(),
1192            max_retry_times: 3,
1193            retry_delay: Duration::from_millis(500),
1194            ..Default::default()
1195        };
1196        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1197        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1198        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1199        manager.manager_ctx.start();
1200
1201        #[derive(Debug)]
1202        struct MockProcedure {
1203            panic: bool,
1204        }
1205
1206        #[async_trait]
1207        impl Procedure for MockProcedure {
1208            fn type_name(&self) -> &str {
1209                "MockProcedure"
1210            }
1211
1212            async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1213                if self.panic {
1214                    // Test the runner can set the state to failed even the procedure
1215                    // panics.
1216                    panic!();
1217                } else {
1218                    Err(Error::external(MockError::new(StatusCode::Unexpected)))
1219                }
1220            }
1221
1222            async fn rollback(&mut self, _: &Context) -> Result<()> {
1223                Ok(())
1224            }
1225
1226            fn rollback_supported(&self) -> bool {
1227                true
1228            }
1229
1230            fn dump(&self) -> Result<String> {
1231                Ok(String::new())
1232            }
1233
1234            fn lock_key(&self) -> LockKey {
1235                LockKey::single_exclusive("test.submit")
1236            }
1237
1238            fn poison_keys(&self) -> PoisonKeys {
1239                PoisonKeys::default()
1240            }
1241        }
1242
1243        let check_procedure = |procedure| async {
1244            let procedure_id = ProcedureId::random();
1245            manager
1246                .submit(ProcedureWithId {
1247                    id: procedure_id,
1248                    procedure: Box::new(procedure),
1249                })
1250                .await
1251                .unwrap()
1252        };
1253
1254        let mut watcher = check_procedure(MockProcedure { panic: false }).await;
1255        // Wait for the notification.
1256        watcher.changed().await.unwrap();
1257        assert!(watcher.borrow().is_prepare_rollback());
1258        watcher.changed().await.unwrap();
1259        assert!(watcher.borrow().is_rolling_back());
1260        watcher.changed().await.unwrap();
1261        assert!(watcher.borrow().is_failed());
1262        // The runner won't rollback a panicked procedure.
1263        let mut watcher = check_procedure(MockProcedure { panic: true }).await;
1264        watcher.changed().await.unwrap();
1265        assert!(watcher.borrow().is_failed());
1266    }
1267
1268    #[tokio::test]
1269    async fn test_procedure_manager_stopped() {
1270        let dir = create_temp_dir("procedure_manager_stopped");
1271        let config = ManagerConfig {
1272            parent_path: "data/".to_string(),
1273            max_retry_times: 3,
1274            retry_delay: Duration::from_millis(500),
1275            ..Default::default()
1276        };
1277        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1278        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1279        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1280
1281        let mut procedure = ProcedureToLoad::new("submit");
1282        procedure.lock_key = LockKey::single_exclusive("test.submit");
1283        let procedure_id = ProcedureId::random();
1284        assert_matches!(
1285            manager
1286                .submit(ProcedureWithId {
1287                    id: procedure_id,
1288                    procedure: Box::new(procedure),
1289                })
1290                .await
1291                .unwrap_err(),
1292            error::Error::ManagerNotStart { .. }
1293        );
1294    }
1295
1296    #[tokio::test]
1297    async fn test_procedure_manager_restart() {
1298        let dir = create_temp_dir("procedure_manager_restart");
1299        let config = ManagerConfig {
1300            parent_path: "data/".to_string(),
1301            max_retry_times: 3,
1302            retry_delay: Duration::from_millis(500),
1303            ..Default::default()
1304        };
1305        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1306        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1307        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1308
1309        manager.start().await.unwrap();
1310        manager.stop().await.unwrap();
1311        manager.start().await.unwrap();
1312
1313        let mut procedure = ProcedureToLoad::new("submit");
1314        procedure.lock_key = LockKey::single_exclusive("test.submit");
1315        let procedure_id = ProcedureId::random();
1316        assert!(
1317            manager
1318                .submit(ProcedureWithId {
1319                    id: procedure_id,
1320                    procedure: Box::new(procedure),
1321                })
1322                .await
1323                .is_ok()
1324        );
1325        assert!(
1326            manager
1327                .procedure_state(procedure_id)
1328                .await
1329                .unwrap()
1330                .is_some()
1331        );
1332    }
1333
1334    #[tokio::test(flavor = "multi_thread")]
1335    async fn test_remove_outdated_meta_task() {
1336        let dir = create_temp_dir("remove_outdated_meta_task");
1337        let object_store = test_util::new_object_store(&dir);
1338        let config = ManagerConfig {
1339            parent_path: "data/".to_string(),
1340            max_retry_times: 3,
1341            retry_delay: Duration::from_millis(500),
1342            remove_outdated_meta_task_interval: Duration::from_millis(1),
1343            remove_outdated_meta_ttl: Duration::from_millis(1),
1344            max_running_procedures: 128,
1345        };
1346        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1347        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1348        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1349        manager.manager_ctx.set_running();
1350
1351        let mut procedure = ProcedureToLoad::new("submit");
1352        procedure.lock_key = LockKey::single_exclusive("test.submit");
1353        let procedure_id = ProcedureId::random();
1354        assert!(
1355            manager
1356                .submit(ProcedureWithId {
1357                    id: procedure_id,
1358                    procedure: Box::new(procedure),
1359                })
1360                .await
1361                .is_ok()
1362        );
1363        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1364        watcher.changed().await.unwrap();
1365
1366        manager.start().await.unwrap();
1367        tokio::time::sleep(Duration::from_millis(300)).await;
1368        assert!(
1369            manager
1370                .procedure_state(procedure_id)
1371                .await
1372                .unwrap()
1373                .is_none()
1374        );
1375
1376        // The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed.
1377        manager.stop().await.unwrap();
1378        let mut procedure = ProcedureToLoad::new("submit");
1379        procedure.lock_key = LockKey::single_exclusive("test.submit");
1380        let procedure_id = ProcedureId::random();
1381
1382        manager.manager_ctx.set_running();
1383        assert!(
1384            manager
1385                .submit(ProcedureWithId {
1386                    id: procedure_id,
1387                    procedure: Box::new(procedure),
1388                })
1389                .await
1390                .is_ok()
1391        );
1392        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1393        watcher.changed().await.unwrap();
1394        tokio::time::sleep(Duration::from_millis(300)).await;
1395        assert!(
1396            manager
1397                .procedure_state(procedure_id)
1398                .await
1399                .unwrap()
1400                .is_some()
1401        );
1402
1403        // After restart
1404        let mut procedure = ProcedureToLoad::new("submit");
1405        procedure.lock_key = LockKey::single_exclusive("test.submit");
1406        let procedure_id = ProcedureId::random();
1407        assert!(
1408            manager
1409                .submit(ProcedureWithId {
1410                    id: procedure_id,
1411                    procedure: Box::new(procedure),
1412                })
1413                .await
1414                .is_ok()
1415        );
1416        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1417        watcher.changed().await.unwrap();
1418
1419        manager.start().await.unwrap();
1420        tokio::time::sleep(Duration::from_millis(300)).await;
1421        assert!(
1422            manager
1423                .procedure_state(procedure_id)
1424                .await
1425                .unwrap()
1426                .is_none()
1427        );
1428    }
1429
1430    #[tokio::test]
1431    async fn test_too_many_running_procedures() {
1432        let dir = create_temp_dir("too_many_running_procedures");
1433        let config = ManagerConfig {
1434            parent_path: "data/".to_string(),
1435            max_retry_times: 3,
1436            retry_delay: Duration::from_millis(500),
1437            max_running_procedures: 1,
1438            ..Default::default()
1439        };
1440        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1441        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1442        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1443        manager.manager_ctx.set_running();
1444
1445        manager
1446            .manager_ctx
1447            .running_procedures
1448            .lock()
1449            .unwrap()
1450            .insert(ProcedureId::random());
1451        manager.start().await.unwrap();
1452
1453        // Submit a new procedure should fail.
1454        let mut procedure = ProcedureToLoad::new("submit");
1455        procedure.lock_key = LockKey::single_exclusive("test.submit");
1456        let procedure_id = ProcedureId::random();
1457        let err = manager
1458            .submit(ProcedureWithId {
1459                id: procedure_id,
1460                procedure: Box::new(procedure),
1461            })
1462            .await
1463            .unwrap_err();
1464        assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
1465
1466        manager
1467            .manager_ctx
1468            .running_procedures
1469            .lock()
1470            .unwrap()
1471            .clear();
1472
1473        // Submit a new procedure should succeed.
1474        let mut procedure = ProcedureToLoad::new("submit");
1475        procedure.lock_key = LockKey::single_exclusive("test.submit");
1476        assert!(
1477            manager
1478                .submit(ProcedureWithId {
1479                    id: procedure_id,
1480                    procedure: Box::new(procedure),
1481                })
1482                .await
1483                .is_ok()
1484        );
1485        assert!(
1486            manager
1487                .procedure_state(procedure_id)
1488                .await
1489                .unwrap()
1490                .is_some()
1491        );
1492        // Wait for the procedure done.
1493        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1494        watcher.changed().await.unwrap();
1495        assert!(watcher.borrow().is_done());
1496    }
1497
1498    #[derive(Debug)]
1499    struct ProcedureToRecover {
1500        content: String,
1501        lock_key: LockKey,
1502        notify: Option<Arc<Notify>>,
1503        poison_keys: PoisonKeys,
1504    }
1505
1506    #[async_trait]
1507    impl Procedure for ProcedureToRecover {
1508        fn type_name(&self) -> &str {
1509            "ProcedureToRecover"
1510        }
1511
1512        async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1513            Ok(Status::done())
1514        }
1515
1516        fn dump(&self) -> Result<String> {
1517            Ok(self.content.clone())
1518        }
1519
1520        fn lock_key(&self) -> LockKey {
1521            self.lock_key.clone()
1522        }
1523
1524        fn recover(&mut self) -> Result<()> {
1525            self.notify.as_ref().unwrap().notify_one();
1526            Ok(())
1527        }
1528
1529        fn poison_keys(&self) -> PoisonKeys {
1530            self.poison_keys.clone()
1531        }
1532    }
1533
1534    impl ProcedureToRecover {
1535        fn new(content: &str) -> ProcedureToRecover {
1536            ProcedureToRecover {
1537                content: content.to_string(),
1538                lock_key: LockKey::default(),
1539                poison_keys: PoisonKeys::default(),
1540                notify: None,
1541            }
1542        }
1543
1544        fn loader(notify: Arc<Notify>) -> BoxedProcedureLoader {
1545            let f = move |json: &str| {
1546                let procedure = ProcedureToRecover {
1547                    content: json.to_string(),
1548                    lock_key: LockKey::default(),
1549                    poison_keys: PoisonKeys::default(),
1550                    notify: Some(notify.clone()),
1551                };
1552                Ok(Box::new(procedure) as _)
1553            };
1554            Box::new(f)
1555        }
1556    }
1557
1558    #[tokio::test]
1559    async fn test_procedure_recover() {
1560        common_telemetry::init_default_ut_logging();
1561        let dir = create_temp_dir("procedure_recover");
1562        let object_store = test_util::new_object_store(&dir);
1563        let config = ManagerConfig {
1564            parent_path: "data/".to_string(),
1565            max_retry_times: 3,
1566            retry_delay: Duration::from_millis(500),
1567            ..Default::default()
1568        };
1569        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1570        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1571        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1572        manager.manager_ctx.start();
1573
1574        let notify = Arc::new(Notify::new());
1575        manager
1576            .register_loader(
1577                "ProcedureToRecover",
1578                ProcedureToRecover::loader(notify.clone()),
1579            )
1580            .unwrap();
1581
1582        // Prepare data
1583        let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1584        let root: BoxedProcedure = Box::new(ProcedureToRecover::new("test procedure recovery"));
1585        let root_id = ProcedureId::random();
1586        // Prepare data for the root procedure.
1587        for step in 0..3 {
1588            let type_name = root.type_name().to_string();
1589            let data = root.dump().unwrap();
1590            procedure_store
1591                .store_procedure(root_id, step, type_name, data, None)
1592                .await
1593                .unwrap();
1594        }
1595
1596        // Recover the manager
1597        manager.recover().await.unwrap();
1598        timeout(Duration::from_secs(10), notify.notified())
1599            .await
1600            .unwrap();
1601    }
1602}