1mod runner;
16
17use std::collections::hash_map::Entry;
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20use std::sync::{Arc, Mutex, RwLock};
21use std::time::{Duration, Instant};
22
23use async_trait::async_trait;
24use backon::ExponentialBuilder;
25use common_error::ext::BoxedError;
26use common_event_recorder::EventRecorderRef;
27use common_runtime::{RepeatedTask, TaskFunction};
28use common_telemetry::tracing_context::{FutureExt, TracingContext};
29use common_telemetry::{error, info, tracing};
30use snafu::{OptionExt, ResultExt, ensure};
31use tokio::sync::watch::{self, Receiver, Sender};
32use tokio::sync::{Mutex as TokioMutex, Notify};
33
34use crate::error::{
35 self, CheckStatusSnafu, DuplicateProcedureSnafu, Error, LoaderConflictSnafu,
36 ManagerNotStartSnafu, ManagerPasuedSnafu, PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu,
37 Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
38 TooManyRunningProceduresSnafu,
39};
40use crate::event::ProcedureEvent;
41use crate::local::runner::Runner;
42use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
43use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
44use crate::store::poison_store::PoisonStoreRef;
45use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
46use crate::{
47 BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
48 ProcedureState, ProcedureWithId, StringKey, UserMetadata, Watcher,
49};
50
51const META_TTL: Duration = Duration::from_secs(60 * 10);
53
54#[derive(Debug)]
63pub(crate) struct ProcedureMeta {
64 id: ProcedureId,
66 type_name: String,
68 parent_id: Option<ProcedureId>,
70 child_notify: Notify,
72 lock_key: LockKey,
74 poison_keys: PoisonKeys,
76 state_sender: Sender<ProcedureState>,
78 state_receiver: Receiver<ProcedureState>,
80 children: Mutex<Vec<ProcedureId>>,
82 start_time_ms: AtomicI64,
84 end_time_ms: AtomicI64,
86 event_recorder: Option<EventRecorderRef>,
88 user_metadata: Option<UserMetadata>,
90}
91
92impl ProcedureMeta {
93 #[allow(clippy::too_many_arguments)]
94 fn new(
95 id: ProcedureId,
96 procedure_state: ProcedureState,
97 parent_id: Option<ProcedureId>,
98 lock_key: LockKey,
99 poison_keys: PoisonKeys,
100 type_name: &str,
101 event_recorder: Option<EventRecorderRef>,
102 user_metadata: Option<UserMetadata>,
103 ) -> ProcedureMeta {
104 let (state_sender, state_receiver) = watch::channel(procedure_state);
105 ProcedureMeta {
106 id,
107 parent_id,
108 child_notify: Notify::new(),
109 lock_key,
110 poison_keys,
111 state_sender,
112 state_receiver,
113 children: Mutex::new(Vec::new()),
114 start_time_ms: AtomicI64::new(0),
115 end_time_ms: AtomicI64::new(0),
116 type_name: type_name.to_string(),
117 event_recorder,
118 user_metadata,
119 }
120 }
121
122 fn state(&self) -> ProcedureState {
124 self.state_receiver.borrow().clone()
125 }
126
127 fn set_state(&self, state: ProcedureState) {
129 if let (Some(event_recorder), Some(user_metadata)) =
131 (&self.event_recorder, &self.user_metadata)
132 && let Some(event) = user_metadata.to_event()
133 {
134 event_recorder.record(Box::new(ProcedureEvent::new(self.id, event, state.clone())));
135 }
136
137 self.state_sender.send(state).unwrap();
139 }
140
141 fn push_child(&self, procedure_id: ProcedureId) {
143 let mut children = self.children.lock().unwrap();
144 children.push(procedure_id);
145 }
146
147 fn list_children(&self, buffer: &mut Vec<ProcedureId>) {
149 let children = self.children.lock().unwrap();
150 buffer.extend_from_slice(&children);
151 }
152
153 fn num_children(&self) -> usize {
155 self.children.lock().unwrap().len()
156 }
157
158 fn set_start_time_ms(&self) {
160 self.start_time_ms
161 .store(common_time::util::current_time_millis(), Ordering::Relaxed);
162 }
163
164 fn set_end_time_ms(&self) {
166 self.end_time_ms
167 .store(common_time::util::current_time_millis(), Ordering::Relaxed);
168 }
169}
170
171type ProcedureMetaRef = Arc<ProcedureMeta>;
173
174struct LoadedProcedure {
176 procedure: BoxedProcedure,
177 step: u32,
178}
179
180pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
187
188pub async fn acquire_dynamic_key_lock(
193 lock: &DynamicKeyLock,
194 key: &StringKey,
195) -> DynamicKeyLockGuard {
196 match key {
197 StringKey::Share(key) => {
198 let guard = lock.read(key.clone()).await;
199 DynamicKeyLockGuard {
200 guard: Some(OwnedKeyRwLockGuard::from(guard)),
201 key: key.clone(),
202 lock: lock.clone(),
203 }
204 }
205 StringKey::Exclusive(key) => {
206 let guard = lock.write(key.clone()).await;
207 DynamicKeyLockGuard {
208 guard: Some(OwnedKeyRwLockGuard::from(guard)),
209 key: key.clone(),
210 lock: lock.clone(),
211 }
212 }
213 }
214}
215pub struct DynamicKeyLockGuard {
220 guard: Option<OwnedKeyRwLockGuard>,
221 key: String,
222 lock: DynamicKeyLock,
223}
224
225impl Drop for DynamicKeyLockGuard {
226 fn drop(&mut self) {
227 if let Some(guard) = self.guard.take() {
228 drop(guard);
229 }
230 self.lock.clean_keys(std::slice::from_ref(&self.key));
231 }
232}
233
234pub(crate) struct ManagerContext {
236 loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
238 key_lock: KeyRwLock<String>,
244 dynamic_key_lock: DynamicKeyLock,
251 procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
253 running_procedures: Mutex<HashSet<ProcedureId>>,
255 finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
257 running: Arc<AtomicBool>,
259 poison_manager: PoisonStoreRef,
261}
262
263#[async_trait]
264impl ContextProvider for ManagerContext {
265 async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
266 Ok(self.state(procedure_id))
267 }
268
269 async fn procedure_state_receiver(
270 &self,
271 procedure_id: ProcedureId,
272 ) -> Result<Option<Receiver<ProcedureState>>> {
273 Ok(self.state_receiver(procedure_id))
274 }
275
276 async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
277 {
278 let procedures = self.procedures.read().unwrap();
280 let procedure = procedures
281 .get(&procedure_id)
282 .context(ProcedureNotFoundSnafu { procedure_id })?;
283
284 ensure!(
286 procedure.poison_keys.contains(key),
287 PoisonKeyNotDefinedSnafu {
288 key: key.clone(),
289 procedure_id
290 }
291 );
292 }
293 let key = key.to_string();
294 let procedure_id = procedure_id.to_string();
295 self.poison_manager.try_put_poison(key, procedure_id).await
296 }
297
298 async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
299 acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
300 }
301}
302
303impl ManagerContext {
304 fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
306 ManagerContext {
307 key_lock: KeyRwLock::new(),
308 dynamic_key_lock: Arc::new(KeyRwLock::new()),
309 loaders: Mutex::new(HashMap::new()),
310 procedures: RwLock::new(HashMap::new()),
311 running_procedures: Mutex::new(HashSet::new()),
312 finished_procedures: Mutex::new(VecDeque::new()),
313 running: Arc::new(AtomicBool::new(false)),
314 poison_manager,
315 }
316 }
317
318 #[cfg(test)]
319 pub(crate) fn set_running(&self) {
320 self.running.store(true, Ordering::Relaxed);
321 }
322
323 pub(crate) fn start(&self) {
325 self.running.store(true, Ordering::Relaxed);
326 }
327
328 pub(crate) fn stop(&self) {
329 self.running.store(false, Ordering::Relaxed);
330 }
331
332 pub(crate) fn running(&self) -> bool {
334 self.running.load(Ordering::Relaxed)
335 }
336
337 fn contains_procedure(&self, procedure_id: ProcedureId) -> bool {
339 let procedures = self.procedures.read().unwrap();
340 procedures.contains_key(&procedure_id)
341 }
342
343 fn num_running_procedures(&self) -> usize {
345 self.running_procedures.lock().unwrap().len()
346 }
347
348 fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
353 let procedure_id = meta.id;
354 let mut procedures = self.procedures.write().unwrap();
355 match procedures.entry(procedure_id) {
356 Entry::Occupied(_) => return false,
357 Entry::Vacant(vacant_entry) => {
358 vacant_entry.insert(meta);
359 }
360 }
361
362 let mut running_procedures = self.running_procedures.lock().unwrap();
363 running_procedures.insert(procedure_id);
364
365 true
366 }
367
368 fn state(&self, procedure_id: ProcedureId) -> Option<ProcedureState> {
370 let procedures = self.procedures.read().unwrap();
371 procedures.get(&procedure_id).map(|meta| meta.state())
372 }
373
374 fn state_receiver(&self, procedure_id: ProcedureId) -> Option<Receiver<ProcedureState>> {
376 let procedures = self.procedures.read().unwrap();
377 procedures
378 .get(&procedure_id)
379 .map(|meta| meta.state_receiver.clone())
380 }
381
382 fn list_procedure(&self) -> Vec<ProcedureInfo> {
384 let procedures = self.procedures.read().unwrap();
385 procedures
386 .values()
387 .map(|meta| ProcedureInfo {
388 id: meta.id,
389 type_name: meta.type_name.clone(),
390 start_time_ms: meta.start_time_ms.load(Ordering::Relaxed),
391 end_time_ms: meta.end_time_ms.load(Ordering::Relaxed),
392 state: meta.state(),
393 lock_keys: meta.lock_key.get_keys(),
394 })
395 .collect()
396 }
397
398 fn watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
400 let procedures = self.procedures.read().unwrap();
401 procedures
402 .get(&procedure_id)
403 .map(|meta| meta.state_receiver.clone())
404 }
405
406 fn notify_by_subprocedure(&self, procedure_id: ProcedureId) {
408 let procedures = self.procedures.read().unwrap();
409 if let Some(meta) = procedures.get(&procedure_id) {
410 meta.child_notify.notify_one();
411 }
412 }
413
414 fn load_one_procedure_from_message(
416 &self,
417 procedure_id: ProcedureId,
418 message: &ProcedureMessage,
419 ) -> Option<LoadedProcedure> {
420 let loaders = self.loaders.lock().unwrap();
421 let loader = loaders.get(&message.type_name).or_else(|| {
422 error!(
423 "Loader not found, procedure_id: {}, type_name: {}",
424 procedure_id, message.type_name
425 );
426 None
427 })?;
428
429 let procedure = loader(&message.data)
430 .map_err(|e| {
431 error!(
432 "Failed to load procedure data, key: {}, source: {:?}",
433 procedure_id, e
434 );
435 e
436 })
437 .ok()?;
438
439 Some(LoadedProcedure {
440 procedure,
441 step: message.step,
442 })
443 }
444
445 fn procedures_in_tree(&self, root: &ProcedureMetaRef) -> Vec<ProcedureId> {
450 let sub_num = root.num_children();
451 let mut procedures = Vec::with_capacity(1 + sub_num);
453
454 let mut queue = VecDeque::with_capacity(1 + sub_num);
455 queue.push_back(root.clone());
457
458 let mut children_ids = Vec::with_capacity(sub_num);
459 let mut children = Vec::with_capacity(sub_num);
460 while let Some(meta) = queue.pop_front() {
461 procedures.push(meta.id);
462
463 children_ids.clear();
465 meta.list_children(&mut children_ids);
466 self.find_procedures(&children_ids, &mut children);
467
468 for child in children.drain(..) {
470 queue.push_back(child);
471 }
472 }
473
474 procedures
475 }
476
477 fn find_procedures(&self, procedure_ids: &[ProcedureId], metas: &mut Vec<ProcedureMetaRef>) {
481 let procedures = self.procedures.read().unwrap();
482 for procedure_id in procedure_ids {
483 if let Some(meta) = procedures.get(procedure_id) {
484 metas.push(meta.clone());
485 }
486 }
487 }
488
489 fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
491 let now = Instant::now();
494 let mut finished_procedures = self.finished_procedures.lock().unwrap();
495 finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
496
497 let mut running_procedures = self.running_procedures.lock().unwrap();
499 for procedure_id in procedure_ids {
500 running_procedures.remove(procedure_id);
501 }
502 }
503
504 fn remove_outdated_meta(&self, ttl: Duration) {
506 let ids = {
507 let mut finished_procedures = self.finished_procedures.lock().unwrap();
508 if finished_procedures.is_empty() {
509 return;
510 }
511
512 let mut ids_to_remove = Vec::new();
513 while let Some((id, finish_time)) = finished_procedures.front() {
514 if finish_time.elapsed() > ttl {
515 ids_to_remove.push(*id);
516 let _ = finished_procedures.pop_front();
517 } else {
518 break;
521 }
522 }
523 ids_to_remove
524 };
525
526 if ids.is_empty() {
527 return;
528 }
529
530 let mut procedures = self.procedures.write().unwrap();
531 for id in ids {
532 let _ = procedures.remove(&id);
533 }
534 }
535}
536
537#[derive(Debug)]
539pub struct ManagerConfig {
540 pub parent_path: String,
541 pub max_retry_times: usize,
542 pub retry_delay: Duration,
543 pub remove_outdated_meta_task_interval: Duration,
544 pub remove_outdated_meta_ttl: Duration,
545 pub max_running_procedures: usize,
546}
547
548impl Default for ManagerConfig {
549 fn default() -> Self {
550 Self {
551 parent_path: String::default(),
552 max_retry_times: 3,
553 retry_delay: Duration::from_millis(500),
554 remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
555 remove_outdated_meta_ttl: META_TTL,
556 max_running_procedures: 128,
557 }
558 }
559}
560
561type PauseAwareRef = Arc<dyn PauseAware>;
562
563#[async_trait]
564pub trait PauseAware: Send + Sync {
565 async fn is_paused(&self) -> std::result::Result<bool, BoxedError>;
567}
568
569pub struct LocalManager {
571 manager_ctx: Arc<ManagerContext>,
572 procedure_store: Arc<ProcedureStore>,
573 max_retry_times: usize,
574 retry_delay: Duration,
575 remove_outdated_meta_task: TokioMutex<Option<RepeatedTask<Error>>>,
577 config: ManagerConfig,
578 pause_aware: Option<PauseAwareRef>,
579 event_recorder: Option<EventRecorderRef>,
580}
581
582impl LocalManager {
583 pub fn new(
585 config: ManagerConfig,
586 state_store: StateStoreRef,
587 poison_store: PoisonStoreRef,
588 pause_aware: Option<PauseAwareRef>,
589 event_recorder: Option<EventRecorderRef>,
590 ) -> LocalManager {
591 let manager_ctx = Arc::new(ManagerContext::new(poison_store));
592
593 LocalManager {
594 manager_ctx,
595 procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)),
596 max_retry_times: config.max_retry_times,
597 retry_delay: config.retry_delay,
598 remove_outdated_meta_task: TokioMutex::new(None),
599 config,
600 pause_aware,
601 event_recorder,
602 }
603 }
604
605 pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask<Error> {
607 RepeatedTask::new(
608 self.config.remove_outdated_meta_task_interval,
609 Box::new(RemoveOutdatedMetaFunction {
610 manager_ctx: self.manager_ctx.clone(),
611 ttl: self.config.remove_outdated_meta_ttl,
612 }),
613 )
614 }
615
616 fn submit_root(
618 &self,
619 procedure_id: ProcedureId,
620 procedure_state: ProcedureState,
621 step: u32,
622 procedure: BoxedProcedure,
623 ) -> Result<Watcher> {
624 ensure!(self.manager_ctx.running(), ManagerNotStartSnafu);
625
626 let user_metadata = procedure.user_metadata();
627 let meta = Arc::new(ProcedureMeta::new(
628 procedure_id,
629 procedure_state,
630 None,
631 procedure.lock_key(),
632 procedure.poison_keys(),
633 procedure.type_name(),
634 self.event_recorder.clone(),
635 user_metadata.clone(),
636 ));
637 let runner = Runner {
638 meta: meta.clone(),
639 procedure,
640 manager_ctx: self.manager_ctx.clone(),
641 step,
642 exponential_builder: ExponentialBuilder::default()
643 .with_min_delay(self.retry_delay)
644 .with_max_times(self.max_retry_times),
645 store: self.procedure_store.clone(),
646 rolling_back: false,
647 event_recorder: self.event_recorder.clone(),
648 };
649
650 if let (Some(event_recorder), Some(event)) = (
651 self.event_recorder.as_ref(),
652 user_metadata.and_then(|m| m.to_event()),
653 ) {
654 event_recorder.record(Box::new(ProcedureEvent::new(
655 procedure_id,
656 event,
657 ProcedureState::Running,
658 )));
659 }
660
661 let watcher = meta.state_receiver.clone();
662
663 ensure!(
664 self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
665 TooManyRunningProceduresSnafu {
666 max_running_procedures: self.config.max_running_procedures,
667 }
668 );
669
670 ensure!(
672 self.manager_ctx.try_insert_procedure(meta),
673 DuplicateProcedureSnafu { procedure_id },
674 );
675
676 let tracing_context = TracingContext::from_current_span();
677
678 let _handle = common_runtime::spawn_global(async move {
679 let span = tracing_context.attach(tracing::info_span!(
680 "LocalManager::submit_root_procedure",
681 procedure_name = %runner.meta.type_name,
682 procedure_id = %runner.meta.id,
683 ));
684 runner.run().trace(span).await;
688 });
689
690 Ok(watcher)
691 }
692
693 fn submit_recovered_messages(
694 &self,
695 messages: HashMap<ProcedureId, ProcedureMessage>,
696 init_state: InitProcedureState,
697 ) {
698 for (procedure_id, message) in &messages {
699 if message.parent_id.is_none() {
700 let Some(mut loaded_procedure) = self
703 .manager_ctx
704 .load_one_procedure_from_message(*procedure_id, message)
705 else {
706 continue;
708 };
709
710 info!(
711 "Recover root procedure {}-{}, step: {}",
712 loaded_procedure.procedure.type_name(),
713 procedure_id,
714 loaded_procedure.step
715 );
716
717 let procedure_state = match init_state {
718 InitProcedureState::RollingBack => ProcedureState::RollingBack {
719 error: Arc::new(
720 error::RollbackProcedureRecoveredSnafu {
721 error: message.error.clone().unwrap_or("Unknown error".to_string()),
722 }
723 .build(),
724 ),
725 },
726 InitProcedureState::Running => ProcedureState::Running,
727 };
728
729 if let Err(e) = loaded_procedure.procedure.recover() {
730 error!(e; "Failed to recover procedure {}", procedure_id);
731 }
732
733 if let Err(e) = self.submit_root(
734 *procedure_id,
735 procedure_state,
736 loaded_procedure.step,
737 loaded_procedure.procedure,
738 ) {
739 error!(e; "Failed to recover procedure {}", procedure_id);
740 }
741 }
742 }
743 }
744
745 async fn recover(&self) -> Result<()> {
747 info!("LocalManager start to recover");
748 let recover_start = Instant::now();
749
750 let ProcedureMessages {
751 messages,
752 rollback_messages,
753 finished_ids,
754 } = self.procedure_store.load_messages().await?;
755 self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack);
757 self.submit_recovered_messages(messages, InitProcedureState::Running);
758
759 if !finished_ids.is_empty() {
760 info!(
761 "LocalManager try to clean finished procedures, num: {}",
762 finished_ids.len()
763 );
764
765 for procedure_id in finished_ids {
766 if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
767 error!(e; "Failed to delete procedure {}", procedure_id);
768 }
769 }
770 }
771
772 info!(
773 "LocalManager finish recovery, cost: {}ms",
774 recover_start.elapsed().as_millis()
775 );
776
777 Ok(())
778 }
779
780 #[cfg(any(test, feature = "testing"))]
781 pub fn contains_loader(&self, name: &str) -> bool {
783 let loaders = self.manager_ctx.loaders.lock().unwrap();
784 loaders.contains_key(name)
785 }
786
787 async fn check_status(&self) -> Result<()> {
788 if let Some(pause_aware) = self.pause_aware.as_ref() {
789 ensure!(
790 !pause_aware.is_paused().await.context(CheckStatusSnafu)?,
791 ManagerPasuedSnafu
792 );
793 }
794
795 Ok(())
796 }
797}
798
799#[async_trait]
800impl ProcedureManager for LocalManager {
801 fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> {
802 let mut loaders = self.manager_ctx.loaders.lock().unwrap();
803 ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name });
804
805 let _ = loaders.insert(name.to_string(), loader);
806
807 Ok(())
808 }
809
810 async fn start(&self) -> Result<()> {
811 let mut task = self.remove_outdated_meta_task.lock().await;
812
813 if task.is_some() {
814 return Ok(());
815 }
816
817 let task_inner = self.build_remove_outdated_meta_task();
818
819 task_inner
820 .start(common_runtime::global_runtime())
821 .context(StartRemoveOutdatedMetaTaskSnafu)?;
822
823 *task = Some(task_inner);
824
825 self.manager_ctx.start();
826
827 info!("LocalManager is start.");
828
829 self.recover().await
830 }
831
832 async fn stop(&self) -> Result<()> {
833 let mut task = self.remove_outdated_meta_task.lock().await;
834
835 if let Some(task) = task.take() {
836 task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?;
837 }
838
839 self.manager_ctx.stop();
840
841 info!("LocalManager is stopped.");
842
843 Ok(())
844 }
845
846 async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher> {
847 let procedure_id = procedure.id;
848 ensure!(
849 !self.manager_ctx.contains_procedure(procedure_id),
850 DuplicateProcedureSnafu { procedure_id }
851 );
852 self.check_status().await?;
853
854 self.submit_root(
855 procedure.id,
856 ProcedureState::Running,
857 0,
858 procedure.procedure,
859 )
860 }
861
862 async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
863 Ok(self.manager_ctx.state(procedure_id))
864 }
865
866 fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
867 self.manager_ctx.watcher(procedure_id)
868 }
869
870 async fn list_procedures(&self) -> Result<Vec<ProcedureInfo>> {
871 Ok(self.manager_ctx.list_procedure())
872 }
873}
874
875struct RemoveOutdatedMetaFunction {
876 manager_ctx: Arc<ManagerContext>,
877 ttl: Duration,
878}
879
880#[async_trait::async_trait]
881impl TaskFunction<Error> for RemoveOutdatedMetaFunction {
882 fn name(&self) -> &str {
883 "ProcedureManager-remove-outdated-meta-task"
884 }
885
886 async fn call(&mut self) -> Result<()> {
887 self.manager_ctx.remove_outdated_meta(self.ttl);
888 Ok(())
889 }
890}
891
892#[cfg(test)]
894pub(crate) mod test_util {
895 use common_test_util::temp_dir::TempDir;
896 use object_store::ObjectStore;
897 use object_store::services::Fs as Builder;
898
899 use super::*;
900
901 pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
902 ProcedureMeta::new(
903 ProcedureId::random(),
904 ProcedureState::Running,
905 None,
906 LockKey::default(),
907 PoisonKeys::default(),
908 "ProcedureAdapter",
909 None,
910 None,
911 )
912 }
913
914 pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
915 let store_dir = dir.path().to_str().unwrap();
916 let builder = Builder::default();
917 ObjectStore::new(builder.root(store_dir)).unwrap().finish()
918 }
919}
920
921#[cfg(test)]
922mod tests {
923 use std::assert_matches::assert_matches;
924
925 use common_error::mock::MockError;
926 use common_error::status_code::StatusCode;
927 use common_test_util::temp_dir::create_temp_dir;
928 use tokio::time::timeout;
929
930 use super::*;
931 use crate::error::{self, Error};
932 use crate::store::state_store::ObjectStateStore;
933 use crate::test_util::InMemoryPoisonStore;
934 use crate::{Context, Procedure, Status};
935
936 fn new_test_manager_context() -> ManagerContext {
937 let poison_manager = Arc::new(InMemoryPoisonStore::default());
938 ManagerContext::new(poison_manager)
939 }
940
941 #[test]
942 fn test_manager_context() {
943 let ctx = new_test_manager_context();
944 let meta = Arc::new(test_util::procedure_meta_for_test());
945
946 assert!(!ctx.contains_procedure(meta.id));
947 assert!(ctx.state(meta.id).is_none());
948
949 assert!(ctx.try_insert_procedure(meta.clone()));
950 assert!(ctx.contains_procedure(meta.id));
951
952 assert!(ctx.state(meta.id).unwrap().is_running());
953 meta.set_state(ProcedureState::Done { output: None });
954 assert!(ctx.state(meta.id).unwrap().is_done());
955 }
956
957 #[test]
958 fn test_manager_context_insert_duplicate() {
959 let ctx = new_test_manager_context();
960 let meta = Arc::new(test_util::procedure_meta_for_test());
961
962 assert!(ctx.try_insert_procedure(meta.clone()));
963 assert!(!ctx.try_insert_procedure(meta));
964 }
965
966 fn new_child(parent_id: ProcedureId, ctx: &ManagerContext) -> ProcedureMetaRef {
967 let mut child = test_util::procedure_meta_for_test();
968 child.parent_id = Some(parent_id);
969 let child = Arc::new(child);
970 assert!(ctx.try_insert_procedure(child.clone()));
971
972 let mut parent = Vec::new();
973 ctx.find_procedures(&[parent_id], &mut parent);
974 parent[0].push_child(child.id);
975
976 child
977 }
978
979 #[test]
980 fn test_procedures_in_tree() {
981 let ctx = new_test_manager_context();
982 let root = Arc::new(test_util::procedure_meta_for_test());
983 assert!(ctx.try_insert_procedure(root.clone()));
984
985 assert_eq!(1, ctx.procedures_in_tree(&root).len());
986
987 let child1 = new_child(root.id, &ctx);
988 let child2 = new_child(root.id, &ctx);
989
990 let child3 = new_child(child1.id, &ctx);
991 let child4 = new_child(child1.id, &ctx);
992
993 let child5 = new_child(child2.id, &ctx);
994
995 let expect = vec![
996 root.id, child1.id, child2.id, child3.id, child4.id, child5.id,
997 ];
998 assert_eq!(expect, ctx.procedures_in_tree(&root));
999 }
1000
1001 #[derive(Debug)]
1002 struct ProcedureToLoad {
1003 content: String,
1004 lock_key: LockKey,
1005 poison_keys: PoisonKeys,
1006 }
1007
1008 #[async_trait]
1009 impl Procedure for ProcedureToLoad {
1010 fn type_name(&self) -> &str {
1011 "ProcedureToLoad"
1012 }
1013
1014 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1015 Ok(Status::done())
1016 }
1017
1018 fn dump(&self) -> Result<String> {
1019 Ok(self.content.clone())
1020 }
1021
1022 fn lock_key(&self) -> LockKey {
1023 self.lock_key.clone()
1024 }
1025
1026 fn poison_keys(&self) -> PoisonKeys {
1027 self.poison_keys.clone()
1028 }
1029 }
1030
1031 impl ProcedureToLoad {
1032 fn new(content: &str) -> ProcedureToLoad {
1033 ProcedureToLoad {
1034 content: content.to_string(),
1035 lock_key: LockKey::default(),
1036 poison_keys: PoisonKeys::default(),
1037 }
1038 }
1039
1040 fn loader() -> BoxedProcedureLoader {
1041 let f = |json: &str| {
1042 let procedure = ProcedureToLoad::new(json);
1043 Ok(Box::new(procedure) as _)
1044 };
1045 Box::new(f)
1046 }
1047 }
1048
1049 #[test]
1050 fn test_register_loader() {
1051 let dir = create_temp_dir("register");
1052 let config = ManagerConfig {
1053 parent_path: "data/".to_string(),
1054 max_retry_times: 3,
1055 retry_delay: Duration::from_millis(500),
1056 ..Default::default()
1057 };
1058 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1059 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1060 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1061 manager.manager_ctx.start();
1062
1063 manager
1064 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1065 .unwrap();
1066 let err = manager
1068 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1069 .unwrap_err();
1070 assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
1071 }
1072
1073 #[tokio::test]
1074 async fn test_recover() {
1075 let dir = create_temp_dir("recover");
1076 let object_store = test_util::new_object_store(&dir);
1077 let config = ManagerConfig {
1078 parent_path: "data/".to_string(),
1079 max_retry_times: 3,
1080 retry_delay: Duration::from_millis(500),
1081 ..Default::default()
1082 };
1083 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1084 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1085 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1086 manager.manager_ctx.start();
1087
1088 manager
1089 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1090 .unwrap();
1091
1092 let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1094 let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
1095 let root_id = ProcedureId::random();
1096 for step in 0..3 {
1098 let type_name = root.type_name().to_string();
1099 let data = root.dump().unwrap();
1100 procedure_store
1101 .store_procedure(root_id, step, type_name, data, None)
1102 .await
1103 .unwrap();
1104 }
1105
1106 let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure"));
1107 let child_id = ProcedureId::random();
1108 for step in 0..2 {
1110 let type_name = child.type_name().to_string();
1111 let data = child.dump().unwrap();
1112 procedure_store
1113 .store_procedure(child_id, step, type_name, data, Some(root_id))
1114 .await
1115 .unwrap();
1116 }
1117
1118 manager.recover().await.unwrap();
1120
1121 let _ = manager.procedure_state(root_id).await.unwrap().unwrap();
1123 assert!(manager.procedure_state(child_id).await.unwrap().is_none());
1126 }
1127
1128 #[tokio::test]
1129 async fn test_submit_procedure() {
1130 let dir = create_temp_dir("submit");
1131 let config = ManagerConfig {
1132 parent_path: "data/".to_string(),
1133 max_retry_times: 3,
1134 retry_delay: Duration::from_millis(500),
1135 ..Default::default()
1136 };
1137 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1138 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1139 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1140 manager.manager_ctx.start();
1141
1142 let procedure_id = ProcedureId::random();
1143 assert!(
1144 manager
1145 .procedure_state(procedure_id)
1146 .await
1147 .unwrap()
1148 .is_none()
1149 );
1150 assert!(manager.procedure_watcher(procedure_id).is_none());
1151
1152 let mut procedure = ProcedureToLoad::new("submit");
1153 procedure.lock_key = LockKey::single_exclusive("test.submit");
1154 assert!(
1155 manager
1156 .submit(ProcedureWithId {
1157 id: procedure_id,
1158 procedure: Box::new(procedure),
1159 })
1160 .await
1161 .is_ok()
1162 );
1163 assert!(
1164 manager
1165 .procedure_state(procedure_id)
1166 .await
1167 .unwrap()
1168 .is_some()
1169 );
1170 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1172 watcher.changed().await.unwrap();
1173 assert!(watcher.borrow().is_done());
1174
1175 let err = manager
1177 .submit(ProcedureWithId {
1178 id: procedure_id,
1179 procedure: Box::new(ProcedureToLoad::new("submit")),
1180 })
1181 .await
1182 .unwrap_err();
1183 assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}");
1184 }
1185
1186 #[tokio::test]
1187 async fn test_state_changed_on_err() {
1188 let dir = create_temp_dir("on_err");
1189 let config = ManagerConfig {
1190 parent_path: "data/".to_string(),
1191 max_retry_times: 3,
1192 retry_delay: Duration::from_millis(500),
1193 ..Default::default()
1194 };
1195 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1196 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1197 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1198 manager.manager_ctx.start();
1199
1200 #[derive(Debug)]
1201 struct MockProcedure {
1202 panic: bool,
1203 }
1204
1205 #[async_trait]
1206 impl Procedure for MockProcedure {
1207 fn type_name(&self) -> &str {
1208 "MockProcedure"
1209 }
1210
1211 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1212 if self.panic {
1213 panic!();
1216 } else {
1217 Err(Error::external(MockError::new(StatusCode::Unexpected)))
1218 }
1219 }
1220
1221 async fn rollback(&mut self, _: &Context) -> Result<()> {
1222 Ok(())
1223 }
1224
1225 fn rollback_supported(&self) -> bool {
1226 true
1227 }
1228
1229 fn dump(&self) -> Result<String> {
1230 Ok(String::new())
1231 }
1232
1233 fn lock_key(&self) -> LockKey {
1234 LockKey::single_exclusive("test.submit")
1235 }
1236
1237 fn poison_keys(&self) -> PoisonKeys {
1238 PoisonKeys::default()
1239 }
1240 }
1241
1242 let check_procedure = |procedure| async {
1243 let procedure_id = ProcedureId::random();
1244 manager
1245 .submit(ProcedureWithId {
1246 id: procedure_id,
1247 procedure: Box::new(procedure),
1248 })
1249 .await
1250 .unwrap()
1251 };
1252
1253 let mut watcher = check_procedure(MockProcedure { panic: false }).await;
1254 watcher.changed().await.unwrap();
1256 assert!(watcher.borrow().is_prepare_rollback());
1257 watcher.changed().await.unwrap();
1258 assert!(watcher.borrow().is_rolling_back());
1259 watcher.changed().await.unwrap();
1260 assert!(watcher.borrow().is_failed());
1261 let mut watcher = check_procedure(MockProcedure { panic: true }).await;
1263 watcher.changed().await.unwrap();
1264 assert!(watcher.borrow().is_failed());
1265 }
1266
1267 #[tokio::test]
1268 async fn test_procedure_manager_stopped() {
1269 let dir = create_temp_dir("procedure_manager_stopped");
1270 let config = ManagerConfig {
1271 parent_path: "data/".to_string(),
1272 max_retry_times: 3,
1273 retry_delay: Duration::from_millis(500),
1274 ..Default::default()
1275 };
1276 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1277 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1278 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1279
1280 let mut procedure = ProcedureToLoad::new("submit");
1281 procedure.lock_key = LockKey::single_exclusive("test.submit");
1282 let procedure_id = ProcedureId::random();
1283 assert_matches!(
1284 manager
1285 .submit(ProcedureWithId {
1286 id: procedure_id,
1287 procedure: Box::new(procedure),
1288 })
1289 .await
1290 .unwrap_err(),
1291 error::Error::ManagerNotStart { .. }
1292 );
1293 }
1294
1295 #[tokio::test]
1296 async fn test_procedure_manager_restart() {
1297 let dir = create_temp_dir("procedure_manager_restart");
1298 let config = ManagerConfig {
1299 parent_path: "data/".to_string(),
1300 max_retry_times: 3,
1301 retry_delay: Duration::from_millis(500),
1302 ..Default::default()
1303 };
1304 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1305 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1306 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1307
1308 manager.start().await.unwrap();
1309 manager.stop().await.unwrap();
1310 manager.start().await.unwrap();
1311
1312 let mut procedure = ProcedureToLoad::new("submit");
1313 procedure.lock_key = LockKey::single_exclusive("test.submit");
1314 let procedure_id = ProcedureId::random();
1315 assert!(
1316 manager
1317 .submit(ProcedureWithId {
1318 id: procedure_id,
1319 procedure: Box::new(procedure),
1320 })
1321 .await
1322 .is_ok()
1323 );
1324 assert!(
1325 manager
1326 .procedure_state(procedure_id)
1327 .await
1328 .unwrap()
1329 .is_some()
1330 );
1331 }
1332
1333 #[tokio::test(flavor = "multi_thread")]
1334 async fn test_remove_outdated_meta_task() {
1335 let dir = create_temp_dir("remove_outdated_meta_task");
1336 let object_store = test_util::new_object_store(&dir);
1337 let config = ManagerConfig {
1338 parent_path: "data/".to_string(),
1339 max_retry_times: 3,
1340 retry_delay: Duration::from_millis(500),
1341 remove_outdated_meta_task_interval: Duration::from_millis(1),
1342 remove_outdated_meta_ttl: Duration::from_millis(1),
1343 max_running_procedures: 128,
1344 };
1345 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1346 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1347 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1348 manager.manager_ctx.set_running();
1349
1350 let mut procedure = ProcedureToLoad::new("submit");
1351 procedure.lock_key = LockKey::single_exclusive("test.submit");
1352 let procedure_id = ProcedureId::random();
1353 assert!(
1354 manager
1355 .submit(ProcedureWithId {
1356 id: procedure_id,
1357 procedure: Box::new(procedure),
1358 })
1359 .await
1360 .is_ok()
1361 );
1362 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1363 watcher.changed().await.unwrap();
1364
1365 manager.start().await.unwrap();
1366 tokio::time::sleep(Duration::from_millis(300)).await;
1367 assert!(
1368 manager
1369 .procedure_state(procedure_id)
1370 .await
1371 .unwrap()
1372 .is_none()
1373 );
1374
1375 manager.stop().await.unwrap();
1377 let mut procedure = ProcedureToLoad::new("submit");
1378 procedure.lock_key = LockKey::single_exclusive("test.submit");
1379 let procedure_id = ProcedureId::random();
1380
1381 manager.manager_ctx.set_running();
1382 assert!(
1383 manager
1384 .submit(ProcedureWithId {
1385 id: procedure_id,
1386 procedure: Box::new(procedure),
1387 })
1388 .await
1389 .is_ok()
1390 );
1391 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1392 watcher.changed().await.unwrap();
1393 tokio::time::sleep(Duration::from_millis(300)).await;
1394 assert!(
1395 manager
1396 .procedure_state(procedure_id)
1397 .await
1398 .unwrap()
1399 .is_some()
1400 );
1401
1402 let mut procedure = ProcedureToLoad::new("submit");
1404 procedure.lock_key = LockKey::single_exclusive("test.submit");
1405 let procedure_id = ProcedureId::random();
1406 assert!(
1407 manager
1408 .submit(ProcedureWithId {
1409 id: procedure_id,
1410 procedure: Box::new(procedure),
1411 })
1412 .await
1413 .is_ok()
1414 );
1415 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1416 watcher.changed().await.unwrap();
1417
1418 manager.start().await.unwrap();
1419 tokio::time::sleep(Duration::from_millis(300)).await;
1420 assert!(
1421 manager
1422 .procedure_state(procedure_id)
1423 .await
1424 .unwrap()
1425 .is_none()
1426 );
1427 }
1428
1429 #[tokio::test]
1430 async fn test_too_many_running_procedures() {
1431 let dir = create_temp_dir("too_many_running_procedures");
1432 let config = ManagerConfig {
1433 parent_path: "data/".to_string(),
1434 max_retry_times: 3,
1435 retry_delay: Duration::from_millis(500),
1436 max_running_procedures: 1,
1437 ..Default::default()
1438 };
1439 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1440 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1441 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1442 manager.manager_ctx.set_running();
1443
1444 manager
1445 .manager_ctx
1446 .running_procedures
1447 .lock()
1448 .unwrap()
1449 .insert(ProcedureId::random());
1450 manager.start().await.unwrap();
1451
1452 let mut procedure = ProcedureToLoad::new("submit");
1454 procedure.lock_key = LockKey::single_exclusive("test.submit");
1455 let procedure_id = ProcedureId::random();
1456 let err = manager
1457 .submit(ProcedureWithId {
1458 id: procedure_id,
1459 procedure: Box::new(procedure),
1460 })
1461 .await
1462 .unwrap_err();
1463 assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
1464
1465 manager
1466 .manager_ctx
1467 .running_procedures
1468 .lock()
1469 .unwrap()
1470 .clear();
1471
1472 let mut procedure = ProcedureToLoad::new("submit");
1474 procedure.lock_key = LockKey::single_exclusive("test.submit");
1475 assert!(
1476 manager
1477 .submit(ProcedureWithId {
1478 id: procedure_id,
1479 procedure: Box::new(procedure),
1480 })
1481 .await
1482 .is_ok()
1483 );
1484 assert!(
1485 manager
1486 .procedure_state(procedure_id)
1487 .await
1488 .unwrap()
1489 .is_some()
1490 );
1491 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1493 watcher.changed().await.unwrap();
1494 assert!(watcher.borrow().is_done());
1495 }
1496
1497 #[derive(Debug)]
1498 struct ProcedureToRecover {
1499 content: String,
1500 lock_key: LockKey,
1501 notify: Option<Arc<Notify>>,
1502 poison_keys: PoisonKeys,
1503 }
1504
1505 #[async_trait]
1506 impl Procedure for ProcedureToRecover {
1507 fn type_name(&self) -> &str {
1508 "ProcedureToRecover"
1509 }
1510
1511 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1512 Ok(Status::done())
1513 }
1514
1515 fn dump(&self) -> Result<String> {
1516 Ok(self.content.clone())
1517 }
1518
1519 fn lock_key(&self) -> LockKey {
1520 self.lock_key.clone()
1521 }
1522
1523 fn recover(&mut self) -> Result<()> {
1524 self.notify.as_ref().unwrap().notify_one();
1525 Ok(())
1526 }
1527
1528 fn poison_keys(&self) -> PoisonKeys {
1529 self.poison_keys.clone()
1530 }
1531 }
1532
1533 impl ProcedureToRecover {
1534 fn new(content: &str) -> ProcedureToRecover {
1535 ProcedureToRecover {
1536 content: content.to_string(),
1537 lock_key: LockKey::default(),
1538 poison_keys: PoisonKeys::default(),
1539 notify: None,
1540 }
1541 }
1542
1543 fn loader(notify: Arc<Notify>) -> BoxedProcedureLoader {
1544 let f = move |json: &str| {
1545 let procedure = ProcedureToRecover {
1546 content: json.to_string(),
1547 lock_key: LockKey::default(),
1548 poison_keys: PoisonKeys::default(),
1549 notify: Some(notify.clone()),
1550 };
1551 Ok(Box::new(procedure) as _)
1552 };
1553 Box::new(f)
1554 }
1555 }
1556
1557 #[tokio::test]
1558 async fn test_procedure_recover() {
1559 common_telemetry::init_default_ut_logging();
1560 let dir = create_temp_dir("procedure_recover");
1561 let object_store = test_util::new_object_store(&dir);
1562 let config = ManagerConfig {
1563 parent_path: "data/".to_string(),
1564 max_retry_times: 3,
1565 retry_delay: Duration::from_millis(500),
1566 ..Default::default()
1567 };
1568 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1569 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1570 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1571 manager.manager_ctx.start();
1572
1573 let notify = Arc::new(Notify::new());
1574 manager
1575 .register_loader(
1576 "ProcedureToRecover",
1577 ProcedureToRecover::loader(notify.clone()),
1578 )
1579 .unwrap();
1580
1581 let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1583 let root: BoxedProcedure = Box::new(ProcedureToRecover::new("test procedure recovery"));
1584 let root_id = ProcedureId::random();
1585 for step in 0..3 {
1587 let type_name = root.type_name().to_string();
1588 let data = root.dump().unwrap();
1589 procedure_store
1590 .store_procedure(root_id, step, type_name, data, None)
1591 .await
1592 .unwrap();
1593 }
1594
1595 manager.recover().await.unwrap();
1597 timeout(Duration::from_secs(10), notify.notified())
1598 .await
1599 .unwrap();
1600 }
1601}