common_procedure/
local.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod runner;
16
17use std::collections::hash_map::Entry;
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20use std::sync::{Arc, Mutex, RwLock};
21use std::time::{Duration, Instant};
22
23use async_trait::async_trait;
24use backon::ExponentialBuilder;
25use common_error::ext::BoxedError;
26use common_event_recorder::EventRecorderRef;
27use common_runtime::{RepeatedTask, TaskFunction};
28use common_telemetry::tracing_context::{FutureExt, TracingContext};
29use common_telemetry::{error, info, tracing};
30use snafu::{OptionExt, ResultExt, ensure};
31use tokio::sync::watch::{self, Receiver, Sender};
32use tokio::sync::{Mutex as TokioMutex, Notify};
33
34use crate::error::{
35    self, CheckStatusSnafu, DuplicateProcedureSnafu, Error, LoaderConflictSnafu,
36    ManagerNotStartSnafu, ManagerPasuedSnafu, PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu,
37    Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
38    TooManyRunningProceduresSnafu,
39};
40use crate::event::ProcedureEvent;
41use crate::local::runner::Runner;
42use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
43use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
44use crate::store::poison_store::PoisonStoreRef;
45use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
46use crate::{
47    BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
48    ProcedureState, ProcedureWithId, StringKey, UserMetadata, Watcher,
49};
50
51/// The expired time of a procedure's metadata.
52const META_TTL: Duration = Duration::from_secs(60 * 10);
53
54/// Shared metadata of a procedure.
55///
56/// # Note
57/// [Notify] is not a condition variable, we can't guarantee the waiters are notified
58/// if they didn't call `notified()` before we signal the notify. So we
59/// 1. use dedicated notify for each condition, such as waiting for a lock, waiting
60///    for children;
61/// 2. always use `notify_one` and ensure there are only one waiter.
62#[derive(Debug)]
63pub(crate) struct ProcedureMeta {
64    /// Id of this procedure.
65    id: ProcedureId,
66    /// Type name of this procedure.
67    type_name: String,
68    /// Parent procedure id.
69    parent_id: Option<ProcedureId>,
70    /// Notify to wait for subprocedures.
71    child_notify: Notify,
72    /// Lock required by this procedure.
73    lock_key: LockKey,
74    /// Poison keys that may cause this procedure to become poisoned during execution.
75    poison_keys: PoisonKeys,
76    /// Sender to notify the procedure state.
77    state_sender: Sender<ProcedureState>,
78    /// Receiver to watch the procedure state.
79    state_receiver: Receiver<ProcedureState>,
80    /// Id of child procedures.
81    children: Mutex<Vec<ProcedureId>>,
82    /// Start execution time of this procedure.
83    start_time_ms: AtomicI64,
84    /// End execution time of this procedure.
85    end_time_ms: AtomicI64,
86    /// Event recorder.
87    event_recorder: Option<EventRecorderRef>,
88    /// The user metadata of the procedure. It's generated by [Procedure::user_metadata].
89    user_metadata: Option<UserMetadata>,
90}
91
92impl ProcedureMeta {
93    #[allow(clippy::too_many_arguments)]
94    fn new(
95        id: ProcedureId,
96        procedure_state: ProcedureState,
97        parent_id: Option<ProcedureId>,
98        lock_key: LockKey,
99        poison_keys: PoisonKeys,
100        type_name: &str,
101        event_recorder: Option<EventRecorderRef>,
102        user_metadata: Option<UserMetadata>,
103    ) -> ProcedureMeta {
104        let (state_sender, state_receiver) = watch::channel(procedure_state);
105        ProcedureMeta {
106            id,
107            parent_id,
108            child_notify: Notify::new(),
109            lock_key,
110            poison_keys,
111            state_sender,
112            state_receiver,
113            children: Mutex::new(Vec::new()),
114            start_time_ms: AtomicI64::new(0),
115            end_time_ms: AtomicI64::new(0),
116            type_name: type_name.to_string(),
117            event_recorder,
118            user_metadata,
119        }
120    }
121
122    /// Returns current [ProcedureState].
123    fn state(&self) -> ProcedureState {
124        self.state_receiver.borrow().clone()
125    }
126
127    /// Update current [ProcedureState].
128    fn set_state(&self, state: ProcedureState) {
129        // Emit the event to the event recorder if the user metadata contains the eventable object.
130        if let (Some(event_recorder), Some(user_metadata)) =
131            (&self.event_recorder, &self.user_metadata)
132            && let Some(event) = user_metadata.to_event()
133        {
134            event_recorder.record(Box::new(ProcedureEvent::new(self.id, event, state.clone())));
135        }
136
137        // Safety: ProcedureMeta also holds the receiver, so `send()` should never fail.
138        self.state_sender.send(state).unwrap();
139    }
140
141    /// Push `procedure_id` of the subprocedure to the metadata.
142    fn push_child(&self, procedure_id: ProcedureId) {
143        let mut children = self.children.lock().unwrap();
144        children.push(procedure_id);
145    }
146
147    /// Append subprocedures to given `buffer`.
148    fn list_children(&self, buffer: &mut Vec<ProcedureId>) {
149        let children = self.children.lock().unwrap();
150        buffer.extend_from_slice(&children);
151    }
152
153    /// Returns the number of subprocedures.
154    fn num_children(&self) -> usize {
155        self.children.lock().unwrap().len()
156    }
157
158    /// update the start time of the procedure.
159    fn set_start_time_ms(&self) {
160        self.start_time_ms
161            .store(common_time::util::current_time_millis(), Ordering::Relaxed);
162    }
163
164    /// update the end time of the procedure.
165    fn set_end_time_ms(&self) {
166        self.end_time_ms
167            .store(common_time::util::current_time_millis(), Ordering::Relaxed);
168    }
169}
170
171/// Reference counted pointer to [ProcedureMeta].
172type ProcedureMetaRef = Arc<ProcedureMeta>;
173
174/// Procedure loaded from store.
175struct LoadedProcedure {
176    procedure: BoxedProcedure,
177    step: u32,
178}
179
180/// The dynamic lock for procedure execution.
181///
182/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
183/// during execution. They are only held when the procedure specifically needs these keys
184/// and are released as soon as the procedure no longer needs them.
185/// This allows for more fine-grained concurrency control during procedure execution.
186pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
187
188/// Acquires a dynamic key lock for the given key.
189///
190/// This function takes a reference to the dynamic key lock and a pointer to the key.
191/// It then matches the key type and acquires the appropriate lock.
192pub async fn acquire_dynamic_key_lock(
193    lock: &DynamicKeyLock,
194    key: &StringKey,
195) -> DynamicKeyLockGuard {
196    match key {
197        StringKey::Share(key) => {
198            let guard = lock.read(key.clone()).await;
199            DynamicKeyLockGuard {
200                guard: Some(OwnedKeyRwLockGuard::from(guard)),
201                key: key.clone(),
202                lock: lock.clone(),
203            }
204        }
205        StringKey::Exclusive(key) => {
206            let guard = lock.write(key.clone()).await;
207            DynamicKeyLockGuard {
208                guard: Some(OwnedKeyRwLockGuard::from(guard)),
209                key: key.clone(),
210                lock: lock.clone(),
211            }
212        }
213    }
214}
215/// A guard for the dynamic key lock.
216///
217/// This guard is used to release the lock when the procedure no longer needs it.
218/// It also ensures that the lock is cleaned up when the guard is dropped.
219pub struct DynamicKeyLockGuard {
220    guard: Option<OwnedKeyRwLockGuard>,
221    key: String,
222    lock: DynamicKeyLock,
223}
224
225impl Drop for DynamicKeyLockGuard {
226    fn drop(&mut self) {
227        if let Some(guard) = self.guard.take() {
228            drop(guard);
229        }
230        self.lock.clean_keys(std::slice::from_ref(&self.key));
231    }
232}
233
234/// Shared context of the manager.
235pub(crate) struct ManagerContext {
236    /// Procedure loaders. The key is the type name of the procedure which the loader returns.
237    loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
238    /// The key lock for the procedure.
239    ///
240    /// The lock keys are defined in `Procedure::lock_key()`.
241    /// These locks are acquired before the procedure starts and released after the procedure finishes.
242    /// They ensure exclusive access to resources throughout the entire procedure lifecycle.
243    key_lock: KeyRwLock<String>,
244    /// The dynamic lock for procedure execution.
245    ///
246    /// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
247    /// during execution. They are only held when the procedure specifically needs these keys
248    /// and are released as soon as the procedure no longer needs them.
249    /// This allows for more fine-grained concurrency control during procedure execution.
250    dynamic_key_lock: DynamicKeyLock,
251    /// Procedures in the manager.
252    procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
253    /// Running procedures.
254    running_procedures: Mutex<HashSet<ProcedureId>>,
255    /// Ids and finished time of finished procedures.
256    finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
257    /// Running flag.
258    running: Arc<AtomicBool>,
259    /// Poison manager.
260    poison_manager: PoisonStoreRef,
261}
262
263#[async_trait]
264impl ContextProvider for ManagerContext {
265    async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
266        Ok(self.state(procedure_id))
267    }
268
269    async fn procedure_state_receiver(
270        &self,
271        procedure_id: ProcedureId,
272    ) -> Result<Option<Receiver<ProcedureState>>> {
273        Ok(self.state_receiver(procedure_id))
274    }
275
276    async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
277        {
278            // validate the procedure exists
279            let procedures = self.procedures.read().unwrap();
280            let procedure = procedures
281                .get(&procedure_id)
282                .context(ProcedureNotFoundSnafu { procedure_id })?;
283
284            // validate the poison key is defined
285            ensure!(
286                procedure.poison_keys.contains(key),
287                PoisonKeyNotDefinedSnafu {
288                    key: key.clone(),
289                    procedure_id
290                }
291            );
292        }
293        let key = key.to_string();
294        let procedure_id = procedure_id.to_string();
295        self.poison_manager.try_put_poison(key, procedure_id).await
296    }
297
298    async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
299        acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
300    }
301}
302
303impl ManagerContext {
304    /// Returns a new [ManagerContext].
305    fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
306        ManagerContext {
307            key_lock: KeyRwLock::new(),
308            dynamic_key_lock: Arc::new(KeyRwLock::new()),
309            loaders: Mutex::new(HashMap::new()),
310            procedures: RwLock::new(HashMap::new()),
311            running_procedures: Mutex::new(HashSet::new()),
312            finished_procedures: Mutex::new(VecDeque::new()),
313            running: Arc::new(AtomicBool::new(false)),
314            poison_manager,
315        }
316    }
317
318    #[cfg(test)]
319    pub(crate) fn set_running(&self) {
320        self.running.store(true, Ordering::Relaxed);
321    }
322
323    /// Set the running flag.
324    pub(crate) fn start(&self) {
325        self.running.store(true, Ordering::Relaxed);
326    }
327
328    pub(crate) fn stop(&self) {
329        self.running.store(false, Ordering::Relaxed);
330    }
331
332    /// Return `ProcedureManager` is running.
333    pub(crate) fn running(&self) -> bool {
334        self.running.load(Ordering::Relaxed)
335    }
336
337    /// Returns true if the procedure with specific `procedure_id` exists.
338    fn contains_procedure(&self, procedure_id: ProcedureId) -> bool {
339        let procedures = self.procedures.read().unwrap();
340        procedures.contains_key(&procedure_id)
341    }
342
343    /// Returns the number of running procedures.
344    fn num_running_procedures(&self) -> usize {
345        self.running_procedures.lock().unwrap().len()
346    }
347
348    /// Try to insert the `procedure` to the context if there is no procedure
349    /// with same [ProcedureId].
350    ///
351    /// Returns `false` if there is already a procedure using the same [ProcedureId].
352    fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
353        let procedure_id = meta.id;
354        let mut procedures = self.procedures.write().unwrap();
355        match procedures.entry(procedure_id) {
356            Entry::Occupied(_) => return false,
357            Entry::Vacant(vacant_entry) => {
358                vacant_entry.insert(meta);
359            }
360        }
361
362        let mut running_procedures = self.running_procedures.lock().unwrap();
363        running_procedures.insert(procedure_id);
364
365        true
366    }
367
368    /// Returns the [ProcedureState] of specific `procedure_id`.
369    fn state(&self, procedure_id: ProcedureId) -> Option<ProcedureState> {
370        let procedures = self.procedures.read().unwrap();
371        procedures.get(&procedure_id).map(|meta| meta.state())
372    }
373
374    /// Returns the [Receiver<ProcedureState>] of specific `procedure_id`.
375    fn state_receiver(&self, procedure_id: ProcedureId) -> Option<Receiver<ProcedureState>> {
376        let procedures = self.procedures.read().unwrap();
377        procedures
378            .get(&procedure_id)
379            .map(|meta| meta.state_receiver.clone())
380    }
381
382    /// Returns the [ProcedureMeta] of all procedures.
383    fn list_procedure(&self) -> Vec<ProcedureInfo> {
384        let procedures = self.procedures.read().unwrap();
385        procedures
386            .values()
387            .map(|meta| ProcedureInfo {
388                id: meta.id,
389                type_name: meta.type_name.clone(),
390                start_time_ms: meta.start_time_ms.load(Ordering::Relaxed),
391                end_time_ms: meta.end_time_ms.load(Ordering::Relaxed),
392                state: meta.state(),
393                lock_keys: meta.lock_key.get_keys(),
394            })
395            .collect()
396    }
397
398    /// Returns the [Watcher] of specific `procedure_id`.
399    fn watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
400        let procedures = self.procedures.read().unwrap();
401        procedures
402            .get(&procedure_id)
403            .map(|meta| meta.state_receiver.clone())
404    }
405
406    /// Notify a suspended parent procedure with specific `procedure_id` by its subprocedure.
407    fn notify_by_subprocedure(&self, procedure_id: ProcedureId) {
408        let procedures = self.procedures.read().unwrap();
409        if let Some(meta) = procedures.get(&procedure_id) {
410            meta.child_notify.notify_one();
411        }
412    }
413
414    /// Load procedure from specific [ProcedureMessage].
415    fn load_one_procedure_from_message(
416        &self,
417        procedure_id: ProcedureId,
418        message: &ProcedureMessage,
419    ) -> Option<LoadedProcedure> {
420        let loaders = self.loaders.lock().unwrap();
421        let loader = loaders.get(&message.type_name).or_else(|| {
422            error!(
423                "Loader not found, procedure_id: {}, type_name: {}",
424                procedure_id, message.type_name
425            );
426            None
427        })?;
428
429        let procedure = loader(&message.data)
430            .map_err(|e| {
431                error!(
432                    "Failed to load procedure data, key: {}, source: {:?}",
433                    procedure_id, e
434                );
435                e
436            })
437            .ok()?;
438
439        Some(LoadedProcedure {
440            procedure,
441            step: message.step,
442        })
443    }
444
445    /// Returns all procedures in the tree (including given `root` procedure).
446    ///
447    /// If callers need a consistent view of the tree, they must ensure no new
448    /// procedure is added to the tree during using this method.
449    fn procedures_in_tree(&self, root: &ProcedureMetaRef) -> Vec<ProcedureId> {
450        let sub_num = root.num_children();
451        // Reserve capacity for the root procedure and its children.
452        let mut procedures = Vec::with_capacity(1 + sub_num);
453
454        let mut queue = VecDeque::with_capacity(1 + sub_num);
455        // Push the root procedure to the queue.
456        queue.push_back(root.clone());
457
458        let mut children_ids = Vec::with_capacity(sub_num);
459        let mut children = Vec::with_capacity(sub_num);
460        while let Some(meta) = queue.pop_front() {
461            procedures.push(meta.id);
462
463            // Find metadatas of children.
464            children_ids.clear();
465            meta.list_children(&mut children_ids);
466            self.find_procedures(&children_ids, &mut children);
467
468            // Traverse children later.
469            for child in children.drain(..) {
470                queue.push_back(child);
471            }
472        }
473
474        procedures
475    }
476
477    /// Finds procedures by given `procedure_ids`.
478    ///
479    /// Ignores the id if corresponding procedure is not found.
480    fn find_procedures(&self, procedure_ids: &[ProcedureId], metas: &mut Vec<ProcedureMetaRef>) {
481        let procedures = self.procedures.read().unwrap();
482        for procedure_id in procedure_ids {
483            if let Some(meta) = procedures.get(procedure_id) {
484                metas.push(meta.clone());
485            }
486        }
487    }
488
489    /// Clean resources of finished procedures.
490    fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
491        // Since users need to query the procedure state, so we can't remove the
492        // meta of the procedure directly.
493        let now = Instant::now();
494        let mut finished_procedures = self.finished_procedures.lock().unwrap();
495        finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
496
497        // Remove the procedures from the running set.
498        let mut running_procedures = self.running_procedures.lock().unwrap();
499        for procedure_id in procedure_ids {
500            running_procedures.remove(procedure_id);
501        }
502    }
503
504    /// Remove metadata of outdated procedures.
505    fn remove_outdated_meta(&self, ttl: Duration) {
506        let ids = {
507            let mut finished_procedures = self.finished_procedures.lock().unwrap();
508            if finished_procedures.is_empty() {
509                return;
510            }
511
512            let mut ids_to_remove = Vec::new();
513            while let Some((id, finish_time)) = finished_procedures.front() {
514                if finish_time.elapsed() > ttl {
515                    ids_to_remove.push(*id);
516                    let _ = finished_procedures.pop_front();
517                } else {
518                    // The rest procedures are finished later, so we can break
519                    // the loop.
520                    break;
521                }
522            }
523            ids_to_remove
524        };
525
526        if ids.is_empty() {
527            return;
528        }
529
530        let mut procedures = self.procedures.write().unwrap();
531        for id in ids {
532            let _ = procedures.remove(&id);
533        }
534    }
535}
536
537/// Config for [LocalManager].
538#[derive(Debug)]
539pub struct ManagerConfig {
540    pub parent_path: String,
541    pub max_retry_times: usize,
542    pub retry_delay: Duration,
543    pub remove_outdated_meta_task_interval: Duration,
544    pub remove_outdated_meta_ttl: Duration,
545    pub max_running_procedures: usize,
546}
547
548impl Default for ManagerConfig {
549    fn default() -> Self {
550        Self {
551            parent_path: String::default(),
552            max_retry_times: 3,
553            retry_delay: Duration::from_millis(500),
554            remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
555            remove_outdated_meta_ttl: META_TTL,
556            max_running_procedures: 128,
557        }
558    }
559}
560
561type PauseAwareRef = Arc<dyn PauseAware>;
562
563#[async_trait]
564pub trait PauseAware: Send + Sync {
565    /// Returns true if the procedure manager is paused.
566    async fn is_paused(&self) -> std::result::Result<bool, BoxedError>;
567}
568
569/// A [ProcedureManager] that maintains procedure states locally.
570pub struct LocalManager {
571    manager_ctx: Arc<ManagerContext>,
572    procedure_store: Arc<ProcedureStore>,
573    max_retry_times: usize,
574    retry_delay: Duration,
575    /// GC task.
576    remove_outdated_meta_task: TokioMutex<Option<RepeatedTask<Error>>>,
577    config: ManagerConfig,
578    pause_aware: Option<PauseAwareRef>,
579    event_recorder: Option<EventRecorderRef>,
580}
581
582impl LocalManager {
583    /// Create a new [LocalManager] with specific `config`.
584    pub fn new(
585        config: ManagerConfig,
586        state_store: StateStoreRef,
587        poison_store: PoisonStoreRef,
588        pause_aware: Option<PauseAwareRef>,
589        event_recorder: Option<EventRecorderRef>,
590    ) -> LocalManager {
591        let manager_ctx = Arc::new(ManagerContext::new(poison_store));
592
593        LocalManager {
594            manager_ctx,
595            procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)),
596            max_retry_times: config.max_retry_times,
597            retry_delay: config.retry_delay,
598            remove_outdated_meta_task: TokioMutex::new(None),
599            config,
600            pause_aware,
601            event_recorder,
602        }
603    }
604
605    /// Build remove outedated meta task
606    pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask<Error> {
607        RepeatedTask::new(
608            self.config.remove_outdated_meta_task_interval,
609            Box::new(RemoveOutdatedMetaFunction {
610                manager_ctx: self.manager_ctx.clone(),
611                ttl: self.config.remove_outdated_meta_ttl,
612            }),
613        )
614    }
615
616    /// Submit a root procedure with given `procedure_id`.
617    fn submit_root(
618        &self,
619        procedure_id: ProcedureId,
620        procedure_state: ProcedureState,
621        step: u32,
622        procedure: BoxedProcedure,
623    ) -> Result<Watcher> {
624        ensure!(self.manager_ctx.running(), ManagerNotStartSnafu);
625
626        let user_metadata = procedure.user_metadata();
627        let meta = Arc::new(ProcedureMeta::new(
628            procedure_id,
629            procedure_state,
630            None,
631            procedure.lock_key(),
632            procedure.poison_keys(),
633            procedure.type_name(),
634            self.event_recorder.clone(),
635            user_metadata.clone(),
636        ));
637        let runner = Runner {
638            meta: meta.clone(),
639            procedure,
640            manager_ctx: self.manager_ctx.clone(),
641            step,
642            exponential_builder: ExponentialBuilder::default()
643                .with_min_delay(self.retry_delay)
644                .with_max_times(self.max_retry_times),
645            store: self.procedure_store.clone(),
646            rolling_back: false,
647            event_recorder: self.event_recorder.clone(),
648        };
649
650        if let (Some(event_recorder), Some(event)) = (
651            self.event_recorder.as_ref(),
652            user_metadata.and_then(|m| m.to_event()),
653        ) {
654            event_recorder.record(Box::new(ProcedureEvent::new(
655                procedure_id,
656                event,
657                ProcedureState::Running,
658            )));
659        }
660
661        let watcher = meta.state_receiver.clone();
662
663        ensure!(
664            self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
665            TooManyRunningProceduresSnafu {
666                max_running_procedures: self.config.max_running_procedures,
667            }
668        );
669
670        // Inserts meta into the manager before actually spawnd the runner.
671        ensure!(
672            self.manager_ctx.try_insert_procedure(meta),
673            DuplicateProcedureSnafu { procedure_id },
674        );
675
676        let tracing_context = TracingContext::from_current_span();
677
678        let _handle = common_runtime::spawn_global(async move {
679            let span = tracing_context.attach(tracing::info_span!(
680            "LocalManager::submit_root_procedure",
681                procedure_name = %runner.meta.type_name,
682                procedure_id = %runner.meta.id,
683            ));
684            // Run the root procedure.
685            // The task was moved to another runtime for execution.
686            // In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
687            runner.run().trace(span).await;
688        });
689
690        Ok(watcher)
691    }
692
693    fn submit_recovered_messages(
694        &self,
695        messages: HashMap<ProcedureId, ProcedureMessage>,
696        init_state: InitProcedureState,
697    ) {
698        for (procedure_id, message) in &messages {
699            if message.parent_id.is_none() {
700                // This is the root procedure. We only submit the root procedure as it will
701                // submit sub-procedures to the manager.
702                let Some(mut loaded_procedure) = self
703                    .manager_ctx
704                    .load_one_procedure_from_message(*procedure_id, message)
705                else {
706                    // Try to load other procedures.
707                    continue;
708                };
709
710                info!(
711                    "Recover root procedure {}-{}, step: {}",
712                    loaded_procedure.procedure.type_name(),
713                    procedure_id,
714                    loaded_procedure.step
715                );
716
717                let procedure_state = match init_state {
718                    InitProcedureState::RollingBack => ProcedureState::RollingBack {
719                        error: Arc::new(
720                            error::RollbackProcedureRecoveredSnafu {
721                                error: message.error.clone().unwrap_or("Unknown error".to_string()),
722                            }
723                            .build(),
724                        ),
725                    },
726                    InitProcedureState::Running => ProcedureState::Running,
727                };
728
729                if let Err(e) = loaded_procedure.procedure.recover() {
730                    error!(e; "Failed to recover procedure {}", procedure_id);
731                }
732
733                if let Err(e) = self.submit_root(
734                    *procedure_id,
735                    procedure_state,
736                    loaded_procedure.step,
737                    loaded_procedure.procedure,
738                ) {
739                    error!(e; "Failed to recover procedure {}", procedure_id);
740                }
741            }
742        }
743    }
744
745    /// Recovers unfinished procedures and reruns them.
746    async fn recover(&self) -> Result<()> {
747        info!("LocalManager start to recover");
748        let recover_start = Instant::now();
749
750        let ProcedureMessages {
751            messages,
752            rollback_messages,
753            finished_ids,
754        } = self.procedure_store.load_messages().await?;
755        // Submits recovered messages first.
756        self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack);
757        self.submit_recovered_messages(messages, InitProcedureState::Running);
758
759        if !finished_ids.is_empty() {
760            info!(
761                "LocalManager try to clean finished procedures, num: {}",
762                finished_ids.len()
763            );
764
765            for procedure_id in finished_ids {
766                if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
767                    error!(e; "Failed to delete procedure {}", procedure_id);
768                }
769            }
770        }
771
772        info!(
773            "LocalManager finish recovery, cost: {}ms",
774            recover_start.elapsed().as_millis()
775        );
776
777        Ok(())
778    }
779
780    #[cfg(any(test, feature = "testing"))]
781    /// Returns true if contains a specified loader.
782    pub fn contains_loader(&self, name: &str) -> bool {
783        let loaders = self.manager_ctx.loaders.lock().unwrap();
784        loaders.contains_key(name)
785    }
786
787    async fn check_status(&self) -> Result<()> {
788        if let Some(pause_aware) = self.pause_aware.as_ref() {
789            ensure!(
790                !pause_aware.is_paused().await.context(CheckStatusSnafu)?,
791                ManagerPasuedSnafu
792            );
793        }
794
795        Ok(())
796    }
797}
798
799#[async_trait]
800impl ProcedureManager for LocalManager {
801    fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> {
802        let mut loaders = self.manager_ctx.loaders.lock().unwrap();
803        ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name });
804
805        let _ = loaders.insert(name.to_string(), loader);
806
807        Ok(())
808    }
809
810    async fn start(&self) -> Result<()> {
811        let mut task = self.remove_outdated_meta_task.lock().await;
812
813        if task.is_some() {
814            return Ok(());
815        }
816
817        let task_inner = self.build_remove_outdated_meta_task();
818
819        task_inner
820            .start(common_runtime::global_runtime())
821            .context(StartRemoveOutdatedMetaTaskSnafu)?;
822
823        *task = Some(task_inner);
824
825        self.manager_ctx.start();
826
827        info!("LocalManager is start.");
828
829        self.recover().await
830    }
831
832    async fn stop(&self) -> Result<()> {
833        let mut task = self.remove_outdated_meta_task.lock().await;
834
835        if let Some(task) = task.take() {
836            task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?;
837        }
838
839        self.manager_ctx.stop();
840
841        info!("LocalManager is stopped.");
842
843        Ok(())
844    }
845
846    async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher> {
847        let procedure_id = procedure.id;
848        ensure!(
849            !self.manager_ctx.contains_procedure(procedure_id),
850            DuplicateProcedureSnafu { procedure_id }
851        );
852        self.check_status().await?;
853
854        self.submit_root(
855            procedure.id,
856            ProcedureState::Running,
857            0,
858            procedure.procedure,
859        )
860    }
861
862    async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
863        Ok(self.manager_ctx.state(procedure_id))
864    }
865
866    fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
867        self.manager_ctx.watcher(procedure_id)
868    }
869
870    async fn list_procedures(&self) -> Result<Vec<ProcedureInfo>> {
871        Ok(self.manager_ctx.list_procedure())
872    }
873}
874
875struct RemoveOutdatedMetaFunction {
876    manager_ctx: Arc<ManagerContext>,
877    ttl: Duration,
878}
879
880#[async_trait::async_trait]
881impl TaskFunction<Error> for RemoveOutdatedMetaFunction {
882    fn name(&self) -> &str {
883        "ProcedureManager-remove-outdated-meta-task"
884    }
885
886    async fn call(&mut self) -> Result<()> {
887        self.manager_ctx.remove_outdated_meta(self.ttl);
888        Ok(())
889    }
890}
891
892/// Create a new [ProcedureMeta] for test purpose.
893#[cfg(test)]
894pub(crate) mod test_util {
895    use common_test_util::temp_dir::TempDir;
896    use object_store::ObjectStore;
897    use object_store::services::Fs as Builder;
898
899    use super::*;
900
901    pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
902        ProcedureMeta::new(
903            ProcedureId::random(),
904            ProcedureState::Running,
905            None,
906            LockKey::default(),
907            PoisonKeys::default(),
908            "ProcedureAdapter",
909            None,
910            None,
911        )
912    }
913
914    pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
915        let store_dir = dir.path().to_str().unwrap();
916        let builder = Builder::default();
917        ObjectStore::new(builder.root(store_dir)).unwrap().finish()
918    }
919}
920
921#[cfg(test)]
922mod tests {
923    use std::assert_matches::assert_matches;
924
925    use common_error::mock::MockError;
926    use common_error::status_code::StatusCode;
927    use common_test_util::temp_dir::create_temp_dir;
928    use tokio::time::timeout;
929
930    use super::*;
931    use crate::error::{self, Error};
932    use crate::store::state_store::ObjectStateStore;
933    use crate::test_util::InMemoryPoisonStore;
934    use crate::{Context, Procedure, Status};
935
936    fn new_test_manager_context() -> ManagerContext {
937        let poison_manager = Arc::new(InMemoryPoisonStore::default());
938        ManagerContext::new(poison_manager)
939    }
940
941    #[test]
942    fn test_manager_context() {
943        let ctx = new_test_manager_context();
944        let meta = Arc::new(test_util::procedure_meta_for_test());
945
946        assert!(!ctx.contains_procedure(meta.id));
947        assert!(ctx.state(meta.id).is_none());
948
949        assert!(ctx.try_insert_procedure(meta.clone()));
950        assert!(ctx.contains_procedure(meta.id));
951
952        assert!(ctx.state(meta.id).unwrap().is_running());
953        meta.set_state(ProcedureState::Done { output: None });
954        assert!(ctx.state(meta.id).unwrap().is_done());
955    }
956
957    #[test]
958    fn test_manager_context_insert_duplicate() {
959        let ctx = new_test_manager_context();
960        let meta = Arc::new(test_util::procedure_meta_for_test());
961
962        assert!(ctx.try_insert_procedure(meta.clone()));
963        assert!(!ctx.try_insert_procedure(meta));
964    }
965
966    fn new_child(parent_id: ProcedureId, ctx: &ManagerContext) -> ProcedureMetaRef {
967        let mut child = test_util::procedure_meta_for_test();
968        child.parent_id = Some(parent_id);
969        let child = Arc::new(child);
970        assert!(ctx.try_insert_procedure(child.clone()));
971
972        let mut parent = Vec::new();
973        ctx.find_procedures(&[parent_id], &mut parent);
974        parent[0].push_child(child.id);
975
976        child
977    }
978
979    #[test]
980    fn test_procedures_in_tree() {
981        let ctx = new_test_manager_context();
982        let root = Arc::new(test_util::procedure_meta_for_test());
983        assert!(ctx.try_insert_procedure(root.clone()));
984
985        assert_eq!(1, ctx.procedures_in_tree(&root).len());
986
987        let child1 = new_child(root.id, &ctx);
988        let child2 = new_child(root.id, &ctx);
989
990        let child3 = new_child(child1.id, &ctx);
991        let child4 = new_child(child1.id, &ctx);
992
993        let child5 = new_child(child2.id, &ctx);
994
995        let expect = vec![
996            root.id, child1.id, child2.id, child3.id, child4.id, child5.id,
997        ];
998        assert_eq!(expect, ctx.procedures_in_tree(&root));
999    }
1000
1001    #[derive(Debug)]
1002    struct ProcedureToLoad {
1003        content: String,
1004        lock_key: LockKey,
1005        poison_keys: PoisonKeys,
1006    }
1007
1008    #[async_trait]
1009    impl Procedure for ProcedureToLoad {
1010        fn type_name(&self) -> &str {
1011            "ProcedureToLoad"
1012        }
1013
1014        async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1015            Ok(Status::done())
1016        }
1017
1018        fn dump(&self) -> Result<String> {
1019            Ok(self.content.clone())
1020        }
1021
1022        fn lock_key(&self) -> LockKey {
1023            self.lock_key.clone()
1024        }
1025
1026        fn poison_keys(&self) -> PoisonKeys {
1027            self.poison_keys.clone()
1028        }
1029    }
1030
1031    impl ProcedureToLoad {
1032        fn new(content: &str) -> ProcedureToLoad {
1033            ProcedureToLoad {
1034                content: content.to_string(),
1035                lock_key: LockKey::default(),
1036                poison_keys: PoisonKeys::default(),
1037            }
1038        }
1039
1040        fn loader() -> BoxedProcedureLoader {
1041            let f = |json: &str| {
1042                let procedure = ProcedureToLoad::new(json);
1043                Ok(Box::new(procedure) as _)
1044            };
1045            Box::new(f)
1046        }
1047    }
1048
1049    #[test]
1050    fn test_register_loader() {
1051        let dir = create_temp_dir("register");
1052        let config = ManagerConfig {
1053            parent_path: "data/".to_string(),
1054            max_retry_times: 3,
1055            retry_delay: Duration::from_millis(500),
1056            ..Default::default()
1057        };
1058        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1059        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1060        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1061        manager.manager_ctx.start();
1062
1063        manager
1064            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1065            .unwrap();
1066        // Register duplicate loader.
1067        let err = manager
1068            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1069            .unwrap_err();
1070        assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
1071    }
1072
1073    #[tokio::test]
1074    async fn test_recover() {
1075        let dir = create_temp_dir("recover");
1076        let object_store = test_util::new_object_store(&dir);
1077        let config = ManagerConfig {
1078            parent_path: "data/".to_string(),
1079            max_retry_times: 3,
1080            retry_delay: Duration::from_millis(500),
1081            ..Default::default()
1082        };
1083        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1084        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1085        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1086        manager.manager_ctx.start();
1087
1088        manager
1089            .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1090            .unwrap();
1091
1092        // Prepare data
1093        let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1094        let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
1095        let root_id = ProcedureId::random();
1096        // Prepare data for the root procedure.
1097        for step in 0..3 {
1098            let type_name = root.type_name().to_string();
1099            let data = root.dump().unwrap();
1100            procedure_store
1101                .store_procedure(root_id, step, type_name, data, None)
1102                .await
1103                .unwrap();
1104        }
1105
1106        let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure"));
1107        let child_id = ProcedureId::random();
1108        // Prepare data for the child procedure
1109        for step in 0..2 {
1110            let type_name = child.type_name().to_string();
1111            let data = child.dump().unwrap();
1112            procedure_store
1113                .store_procedure(child_id, step, type_name, data, Some(root_id))
1114                .await
1115                .unwrap();
1116        }
1117
1118        // Recover the manager
1119        manager.recover().await.unwrap();
1120
1121        // The manager should submit the root procedure.
1122        let _ = manager.procedure_state(root_id).await.unwrap().unwrap();
1123        // Since the mocked root procedure actually doesn't submit subprocedures, so there is no
1124        // related state.
1125        assert!(manager.procedure_state(child_id).await.unwrap().is_none());
1126    }
1127
1128    #[tokio::test]
1129    async fn test_submit_procedure() {
1130        let dir = create_temp_dir("submit");
1131        let config = ManagerConfig {
1132            parent_path: "data/".to_string(),
1133            max_retry_times: 3,
1134            retry_delay: Duration::from_millis(500),
1135            ..Default::default()
1136        };
1137        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1138        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1139        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1140        manager.manager_ctx.start();
1141
1142        let procedure_id = ProcedureId::random();
1143        assert!(
1144            manager
1145                .procedure_state(procedure_id)
1146                .await
1147                .unwrap()
1148                .is_none()
1149        );
1150        assert!(manager.procedure_watcher(procedure_id).is_none());
1151
1152        let mut procedure = ProcedureToLoad::new("submit");
1153        procedure.lock_key = LockKey::single_exclusive("test.submit");
1154        assert!(
1155            manager
1156                .submit(ProcedureWithId {
1157                    id: procedure_id,
1158                    procedure: Box::new(procedure),
1159                })
1160                .await
1161                .is_ok()
1162        );
1163        assert!(
1164            manager
1165                .procedure_state(procedure_id)
1166                .await
1167                .unwrap()
1168                .is_some()
1169        );
1170        // Wait for the procedure done.
1171        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1172        watcher.changed().await.unwrap();
1173        assert!(watcher.borrow().is_done());
1174
1175        // Try to submit procedure with same id again.
1176        let err = manager
1177            .submit(ProcedureWithId {
1178                id: procedure_id,
1179                procedure: Box::new(ProcedureToLoad::new("submit")),
1180            })
1181            .await
1182            .unwrap_err();
1183        assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}");
1184    }
1185
1186    #[tokio::test]
1187    async fn test_state_changed_on_err() {
1188        let dir = create_temp_dir("on_err");
1189        let config = ManagerConfig {
1190            parent_path: "data/".to_string(),
1191            max_retry_times: 3,
1192            retry_delay: Duration::from_millis(500),
1193            ..Default::default()
1194        };
1195        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1196        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1197        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1198        manager.manager_ctx.start();
1199
1200        #[derive(Debug)]
1201        struct MockProcedure {
1202            panic: bool,
1203        }
1204
1205        #[async_trait]
1206        impl Procedure for MockProcedure {
1207            fn type_name(&self) -> &str {
1208                "MockProcedure"
1209            }
1210
1211            async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1212                if self.panic {
1213                    // Test the runner can set the state to failed even the procedure
1214                    // panics.
1215                    panic!();
1216                } else {
1217                    Err(Error::external(MockError::new(StatusCode::Unexpected)))
1218                }
1219            }
1220
1221            async fn rollback(&mut self, _: &Context) -> Result<()> {
1222                Ok(())
1223            }
1224
1225            fn rollback_supported(&self) -> bool {
1226                true
1227            }
1228
1229            fn dump(&self) -> Result<String> {
1230                Ok(String::new())
1231            }
1232
1233            fn lock_key(&self) -> LockKey {
1234                LockKey::single_exclusive("test.submit")
1235            }
1236
1237            fn poison_keys(&self) -> PoisonKeys {
1238                PoisonKeys::default()
1239            }
1240        }
1241
1242        let check_procedure = |procedure| async {
1243            let procedure_id = ProcedureId::random();
1244            manager
1245                .submit(ProcedureWithId {
1246                    id: procedure_id,
1247                    procedure: Box::new(procedure),
1248                })
1249                .await
1250                .unwrap()
1251        };
1252
1253        let mut watcher = check_procedure(MockProcedure { panic: false }).await;
1254        // Wait for the notification.
1255        watcher.changed().await.unwrap();
1256        assert!(watcher.borrow().is_prepare_rollback());
1257        watcher.changed().await.unwrap();
1258        assert!(watcher.borrow().is_rolling_back());
1259        watcher.changed().await.unwrap();
1260        assert!(watcher.borrow().is_failed());
1261        // The runner won't rollback a panicked procedure.
1262        let mut watcher = check_procedure(MockProcedure { panic: true }).await;
1263        watcher.changed().await.unwrap();
1264        assert!(watcher.borrow().is_failed());
1265    }
1266
1267    #[tokio::test]
1268    async fn test_procedure_manager_stopped() {
1269        let dir = create_temp_dir("procedure_manager_stopped");
1270        let config = ManagerConfig {
1271            parent_path: "data/".to_string(),
1272            max_retry_times: 3,
1273            retry_delay: Duration::from_millis(500),
1274            ..Default::default()
1275        };
1276        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1277        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1278        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1279
1280        let mut procedure = ProcedureToLoad::new("submit");
1281        procedure.lock_key = LockKey::single_exclusive("test.submit");
1282        let procedure_id = ProcedureId::random();
1283        assert_matches!(
1284            manager
1285                .submit(ProcedureWithId {
1286                    id: procedure_id,
1287                    procedure: Box::new(procedure),
1288                })
1289                .await
1290                .unwrap_err(),
1291            error::Error::ManagerNotStart { .. }
1292        );
1293    }
1294
1295    #[tokio::test]
1296    async fn test_procedure_manager_restart() {
1297        let dir = create_temp_dir("procedure_manager_restart");
1298        let config = ManagerConfig {
1299            parent_path: "data/".to_string(),
1300            max_retry_times: 3,
1301            retry_delay: Duration::from_millis(500),
1302            ..Default::default()
1303        };
1304        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1305        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1306        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1307
1308        manager.start().await.unwrap();
1309        manager.stop().await.unwrap();
1310        manager.start().await.unwrap();
1311
1312        let mut procedure = ProcedureToLoad::new("submit");
1313        procedure.lock_key = LockKey::single_exclusive("test.submit");
1314        let procedure_id = ProcedureId::random();
1315        assert!(
1316            manager
1317                .submit(ProcedureWithId {
1318                    id: procedure_id,
1319                    procedure: Box::new(procedure),
1320                })
1321                .await
1322                .is_ok()
1323        );
1324        assert!(
1325            manager
1326                .procedure_state(procedure_id)
1327                .await
1328                .unwrap()
1329                .is_some()
1330        );
1331    }
1332
1333    #[tokio::test(flavor = "multi_thread")]
1334    async fn test_remove_outdated_meta_task() {
1335        let dir = create_temp_dir("remove_outdated_meta_task");
1336        let object_store = test_util::new_object_store(&dir);
1337        let config = ManagerConfig {
1338            parent_path: "data/".to_string(),
1339            max_retry_times: 3,
1340            retry_delay: Duration::from_millis(500),
1341            remove_outdated_meta_task_interval: Duration::from_millis(1),
1342            remove_outdated_meta_ttl: Duration::from_millis(1),
1343            max_running_procedures: 128,
1344        };
1345        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1346        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1347        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1348        manager.manager_ctx.set_running();
1349
1350        let mut procedure = ProcedureToLoad::new("submit");
1351        procedure.lock_key = LockKey::single_exclusive("test.submit");
1352        let procedure_id = ProcedureId::random();
1353        assert!(
1354            manager
1355                .submit(ProcedureWithId {
1356                    id: procedure_id,
1357                    procedure: Box::new(procedure),
1358                })
1359                .await
1360                .is_ok()
1361        );
1362        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1363        watcher.changed().await.unwrap();
1364
1365        manager.start().await.unwrap();
1366        tokio::time::sleep(Duration::from_millis(300)).await;
1367        assert!(
1368            manager
1369                .procedure_state(procedure_id)
1370                .await
1371                .unwrap()
1372                .is_none()
1373        );
1374
1375        // The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed.
1376        manager.stop().await.unwrap();
1377        let mut procedure = ProcedureToLoad::new("submit");
1378        procedure.lock_key = LockKey::single_exclusive("test.submit");
1379        let procedure_id = ProcedureId::random();
1380
1381        manager.manager_ctx.set_running();
1382        assert!(
1383            manager
1384                .submit(ProcedureWithId {
1385                    id: procedure_id,
1386                    procedure: Box::new(procedure),
1387                })
1388                .await
1389                .is_ok()
1390        );
1391        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1392        watcher.changed().await.unwrap();
1393        tokio::time::sleep(Duration::from_millis(300)).await;
1394        assert!(
1395            manager
1396                .procedure_state(procedure_id)
1397                .await
1398                .unwrap()
1399                .is_some()
1400        );
1401
1402        // After restart
1403        let mut procedure = ProcedureToLoad::new("submit");
1404        procedure.lock_key = LockKey::single_exclusive("test.submit");
1405        let procedure_id = ProcedureId::random();
1406        assert!(
1407            manager
1408                .submit(ProcedureWithId {
1409                    id: procedure_id,
1410                    procedure: Box::new(procedure),
1411                })
1412                .await
1413                .is_ok()
1414        );
1415        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1416        watcher.changed().await.unwrap();
1417
1418        manager.start().await.unwrap();
1419        tokio::time::sleep(Duration::from_millis(300)).await;
1420        assert!(
1421            manager
1422                .procedure_state(procedure_id)
1423                .await
1424                .unwrap()
1425                .is_none()
1426        );
1427    }
1428
1429    #[tokio::test]
1430    async fn test_too_many_running_procedures() {
1431        let dir = create_temp_dir("too_many_running_procedures");
1432        let config = ManagerConfig {
1433            parent_path: "data/".to_string(),
1434            max_retry_times: 3,
1435            retry_delay: Duration::from_millis(500),
1436            max_running_procedures: 1,
1437            ..Default::default()
1438        };
1439        let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1440        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1441        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1442        manager.manager_ctx.set_running();
1443
1444        manager
1445            .manager_ctx
1446            .running_procedures
1447            .lock()
1448            .unwrap()
1449            .insert(ProcedureId::random());
1450        manager.start().await.unwrap();
1451
1452        // Submit a new procedure should fail.
1453        let mut procedure = ProcedureToLoad::new("submit");
1454        procedure.lock_key = LockKey::single_exclusive("test.submit");
1455        let procedure_id = ProcedureId::random();
1456        let err = manager
1457            .submit(ProcedureWithId {
1458                id: procedure_id,
1459                procedure: Box::new(procedure),
1460            })
1461            .await
1462            .unwrap_err();
1463        assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
1464
1465        manager
1466            .manager_ctx
1467            .running_procedures
1468            .lock()
1469            .unwrap()
1470            .clear();
1471
1472        // Submit a new procedure should succeed.
1473        let mut procedure = ProcedureToLoad::new("submit");
1474        procedure.lock_key = LockKey::single_exclusive("test.submit");
1475        assert!(
1476            manager
1477                .submit(ProcedureWithId {
1478                    id: procedure_id,
1479                    procedure: Box::new(procedure),
1480                })
1481                .await
1482                .is_ok()
1483        );
1484        assert!(
1485            manager
1486                .procedure_state(procedure_id)
1487                .await
1488                .unwrap()
1489                .is_some()
1490        );
1491        // Wait for the procedure done.
1492        let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1493        watcher.changed().await.unwrap();
1494        assert!(watcher.borrow().is_done());
1495    }
1496
1497    #[derive(Debug)]
1498    struct ProcedureToRecover {
1499        content: String,
1500        lock_key: LockKey,
1501        notify: Option<Arc<Notify>>,
1502        poison_keys: PoisonKeys,
1503    }
1504
1505    #[async_trait]
1506    impl Procedure for ProcedureToRecover {
1507        fn type_name(&self) -> &str {
1508            "ProcedureToRecover"
1509        }
1510
1511        async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1512            Ok(Status::done())
1513        }
1514
1515        fn dump(&self) -> Result<String> {
1516            Ok(self.content.clone())
1517        }
1518
1519        fn lock_key(&self) -> LockKey {
1520            self.lock_key.clone()
1521        }
1522
1523        fn recover(&mut self) -> Result<()> {
1524            self.notify.as_ref().unwrap().notify_one();
1525            Ok(())
1526        }
1527
1528        fn poison_keys(&self) -> PoisonKeys {
1529            self.poison_keys.clone()
1530        }
1531    }
1532
1533    impl ProcedureToRecover {
1534        fn new(content: &str) -> ProcedureToRecover {
1535            ProcedureToRecover {
1536                content: content.to_string(),
1537                lock_key: LockKey::default(),
1538                poison_keys: PoisonKeys::default(),
1539                notify: None,
1540            }
1541        }
1542
1543        fn loader(notify: Arc<Notify>) -> BoxedProcedureLoader {
1544            let f = move |json: &str| {
1545                let procedure = ProcedureToRecover {
1546                    content: json.to_string(),
1547                    lock_key: LockKey::default(),
1548                    poison_keys: PoisonKeys::default(),
1549                    notify: Some(notify.clone()),
1550                };
1551                Ok(Box::new(procedure) as _)
1552            };
1553            Box::new(f)
1554        }
1555    }
1556
1557    #[tokio::test]
1558    async fn test_procedure_recover() {
1559        common_telemetry::init_default_ut_logging();
1560        let dir = create_temp_dir("procedure_recover");
1561        let object_store = test_util::new_object_store(&dir);
1562        let config = ManagerConfig {
1563            parent_path: "data/".to_string(),
1564            max_retry_times: 3,
1565            retry_delay: Duration::from_millis(500),
1566            ..Default::default()
1567        };
1568        let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1569        let poison_manager = Arc::new(InMemoryPoisonStore::new());
1570        let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1571        manager.manager_ctx.start();
1572
1573        let notify = Arc::new(Notify::new());
1574        manager
1575            .register_loader(
1576                "ProcedureToRecover",
1577                ProcedureToRecover::loader(notify.clone()),
1578            )
1579            .unwrap();
1580
1581        // Prepare data
1582        let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1583        let root: BoxedProcedure = Box::new(ProcedureToRecover::new("test procedure recovery"));
1584        let root_id = ProcedureId::random();
1585        // Prepare data for the root procedure.
1586        for step in 0..3 {
1587            let type_name = root.type_name().to_string();
1588            let data = root.dump().unwrap();
1589            procedure_store
1590                .store_procedure(root_id, step, type_name, data, None)
1591                .await
1592                .unwrap();
1593        }
1594
1595        // Recover the manager
1596        manager.recover().await.unwrap();
1597        timeout(Duration::from_secs(10), notify.notified())
1598            .await
1599            .unwrap();
1600    }
1601}