1mod runner;
16
17use std::collections::hash_map::Entry;
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20use std::sync::{Arc, Mutex, RwLock};
21use std::time::{Duration, Instant};
22
23use async_trait::async_trait;
24use backon::ExponentialBuilder;
25use common_error::ext::BoxedError;
26use common_event_recorder::EventRecorderRef;
27use common_runtime::{RepeatedTask, TaskFunction};
28use common_telemetry::tracing_context::{FutureExt, TracingContext};
29use common_telemetry::{error, info, tracing};
30use snafu::{OptionExt, ResultExt, ensure};
31use tokio::sync::watch::{self, Receiver, Sender};
32use tokio::sync::{Mutex as TokioMutex, Notify};
33
34use crate::error::{
35 self, CheckStatusSnafu, DuplicateProcedureSnafu, Error, LoaderConflictSnafu,
36 ManagerNotStartSnafu, ManagerPasuedSnafu, PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu,
37 Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
38 TooManyRunningProceduresSnafu,
39};
40use crate::event::ProcedureEvent;
41use crate::local::runner::Runner;
42use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
43use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
44use crate::store::poison_store::PoisonStoreRef;
45use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
46use crate::{
47 BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
48 ProcedureState, ProcedureWithId, StringKey, UserMetadata, Watcher,
49};
50
51const META_TTL: Duration = Duration::from_secs(60 * 10);
53
54#[derive(Debug)]
63pub(crate) struct ProcedureMeta {
64 id: ProcedureId,
66 type_name: String,
68 parent_id: Option<ProcedureId>,
70 child_notify: Notify,
72 lock_key: LockKey,
74 poison_keys: PoisonKeys,
76 state_sender: Sender<ProcedureState>,
78 state_receiver: Receiver<ProcedureState>,
80 children: Mutex<Vec<ProcedureId>>,
82 start_time_ms: AtomicI64,
84 end_time_ms: AtomicI64,
86 event_recorder: Option<EventRecorderRef>,
88 user_metadata: Option<UserMetadata>,
90}
91
92impl ProcedureMeta {
93 #[allow(clippy::too_many_arguments)]
94 fn new(
95 id: ProcedureId,
96 procedure_state: ProcedureState,
97 parent_id: Option<ProcedureId>,
98 lock_key: LockKey,
99 poison_keys: PoisonKeys,
100 type_name: &str,
101 event_recorder: Option<EventRecorderRef>,
102 user_metadata: Option<UserMetadata>,
103 ) -> ProcedureMeta {
104 let (state_sender, state_receiver) = watch::channel(procedure_state);
105 ProcedureMeta {
106 id,
107 parent_id,
108 child_notify: Notify::new(),
109 lock_key,
110 poison_keys,
111 state_sender,
112 state_receiver,
113 children: Mutex::new(Vec::new()),
114 start_time_ms: AtomicI64::new(0),
115 end_time_ms: AtomicI64::new(0),
116 type_name: type_name.to_string(),
117 event_recorder,
118 user_metadata,
119 }
120 }
121
122 fn state(&self) -> ProcedureState {
124 self.state_receiver.borrow().clone()
125 }
126
127 fn set_state(&self, state: ProcedureState) {
129 if let (Some(event_recorder), Some(user_metadata)) =
131 (&self.event_recorder, &self.user_metadata)
132 && let Some(event) = user_metadata.to_event()
133 {
134 event_recorder.record(Box::new(ProcedureEvent::new(self.id, event, state.clone())));
135 }
136
137 self.state_sender.send(state).unwrap();
139 }
140
141 fn push_child(&self, procedure_id: ProcedureId) {
143 let mut children = self.children.lock().unwrap();
144 children.push(procedure_id);
145 }
146
147 fn list_children(&self, buffer: &mut Vec<ProcedureId>) {
149 let children = self.children.lock().unwrap();
150 buffer.extend_from_slice(&children);
151 }
152
153 fn num_children(&self) -> usize {
155 self.children.lock().unwrap().len()
156 }
157
158 fn set_start_time_ms(&self) {
160 self.start_time_ms
161 .store(common_time::util::current_time_millis(), Ordering::Relaxed);
162 }
163
164 fn set_end_time_ms(&self) {
166 self.end_time_ms
167 .store(common_time::util::current_time_millis(), Ordering::Relaxed);
168 }
169}
170
171type ProcedureMetaRef = Arc<ProcedureMeta>;
173
174struct LoadedProcedure {
176 procedure: BoxedProcedure,
177 step: u32,
178}
179
180pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
187
188pub async fn acquire_dynamic_key_lock(
193 lock: &DynamicKeyLock,
194 key: &StringKey,
195) -> DynamicKeyLockGuard {
196 match key {
197 StringKey::Share(key) => {
198 let guard = lock.read(key.to_string()).await;
199 DynamicKeyLockGuard {
200 guard: Some(OwnedKeyRwLockGuard::from(guard)),
201 key: key.to_string(),
202 lock: lock.clone(),
203 }
204 }
205 StringKey::Exclusive(key) => {
206 let guard = lock.write(key.to_string()).await;
207 DynamicKeyLockGuard {
208 guard: Some(OwnedKeyRwLockGuard::from(guard)),
209 key: key.to_string(),
210 lock: lock.clone(),
211 }
212 }
213 }
214}
215pub struct DynamicKeyLockGuard {
220 guard: Option<OwnedKeyRwLockGuard>,
221 key: String,
222 lock: DynamicKeyLock,
223}
224
225impl Drop for DynamicKeyLockGuard {
226 fn drop(&mut self) {
227 if let Some(guard) = self.guard.take() {
228 drop(guard);
229 }
230 self.lock.clean_keys(&[self.key.to_string()]);
231 }
232}
233
234pub(crate) struct ManagerContext {
236 loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
238 key_lock: KeyRwLock<String>,
244 dynamic_key_lock: DynamicKeyLock,
251 procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
253 running_procedures: Mutex<HashSet<ProcedureId>>,
255 finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
257 running: Arc<AtomicBool>,
259 poison_manager: PoisonStoreRef,
261}
262
263#[async_trait]
264impl ContextProvider for ManagerContext {
265 async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
266 Ok(self.state(procedure_id))
267 }
268
269 async fn procedure_state_receiver(
270 &self,
271 procedure_id: ProcedureId,
272 ) -> Result<Option<Receiver<ProcedureState>>> {
273 Ok(self.state_receiver(procedure_id))
274 }
275
276 async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
277 {
278 let procedures = self.procedures.read().unwrap();
280 let procedure = procedures
281 .get(&procedure_id)
282 .context(ProcedureNotFoundSnafu { procedure_id })?;
283
284 ensure!(
286 procedure.poison_keys.contains(key),
287 PoisonKeyNotDefinedSnafu {
288 key: key.clone(),
289 procedure_id
290 }
291 );
292 }
293 let key = key.to_string();
294 let procedure_id = procedure_id.to_string();
295 self.poison_manager.try_put_poison(key, procedure_id).await
296 }
297
298 async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
299 acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
300 }
301}
302
303impl ManagerContext {
304 fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
306 ManagerContext {
307 key_lock: KeyRwLock::new(),
308 dynamic_key_lock: Arc::new(KeyRwLock::new()),
309 loaders: Mutex::new(HashMap::new()),
310 procedures: RwLock::new(HashMap::new()),
311 running_procedures: Mutex::new(HashSet::new()),
312 finished_procedures: Mutex::new(VecDeque::new()),
313 running: Arc::new(AtomicBool::new(false)),
314 poison_manager,
315 }
316 }
317
318 #[cfg(test)]
319 pub(crate) fn set_running(&self) {
320 self.running.store(true, Ordering::Relaxed);
321 }
322
323 pub(crate) fn start(&self) {
325 self.running.store(true, Ordering::Relaxed);
326 }
327
328 pub(crate) fn stop(&self) {
329 self.running.store(false, Ordering::Relaxed);
330 }
331
332 pub(crate) fn running(&self) -> bool {
334 self.running.load(Ordering::Relaxed)
335 }
336
337 fn contains_procedure(&self, procedure_id: ProcedureId) -> bool {
339 let procedures = self.procedures.read().unwrap();
340 procedures.contains_key(&procedure_id)
341 }
342
343 fn num_running_procedures(&self) -> usize {
345 self.running_procedures.lock().unwrap().len()
346 }
347
348 fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
353 let procedure_id = meta.id;
354 let mut procedures = self.procedures.write().unwrap();
355 match procedures.entry(procedure_id) {
356 Entry::Occupied(_) => return false,
357 Entry::Vacant(vacant_entry) => {
358 vacant_entry.insert(meta);
359 }
360 }
361
362 let mut running_procedures = self.running_procedures.lock().unwrap();
363 running_procedures.insert(procedure_id);
364
365 true
366 }
367
368 fn state(&self, procedure_id: ProcedureId) -> Option<ProcedureState> {
370 let procedures = self.procedures.read().unwrap();
371 procedures.get(&procedure_id).map(|meta| meta.state())
372 }
373
374 fn state_receiver(&self, procedure_id: ProcedureId) -> Option<Receiver<ProcedureState>> {
376 let procedures = self.procedures.read().unwrap();
377 procedures
378 .get(&procedure_id)
379 .map(|meta| meta.state_receiver.clone())
380 }
381
382 fn list_procedure(&self) -> Vec<ProcedureInfo> {
384 let procedures = self.procedures.read().unwrap();
385 procedures
386 .values()
387 .map(|meta| ProcedureInfo {
388 id: meta.id,
389 type_name: meta.type_name.clone(),
390 start_time_ms: meta.start_time_ms.load(Ordering::Relaxed),
391 end_time_ms: meta.end_time_ms.load(Ordering::Relaxed),
392 state: meta.state(),
393 lock_keys: meta.lock_key.get_keys(),
394 })
395 .collect()
396 }
397
398 fn watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
400 let procedures = self.procedures.read().unwrap();
401 procedures
402 .get(&procedure_id)
403 .map(|meta| meta.state_receiver.clone())
404 }
405
406 fn notify_by_subprocedure(&self, procedure_id: ProcedureId) {
408 let procedures = self.procedures.read().unwrap();
409 if let Some(meta) = procedures.get(&procedure_id) {
410 meta.child_notify.notify_one();
411 }
412 }
413
414 fn load_one_procedure_from_message(
416 &self,
417 procedure_id: ProcedureId,
418 message: &ProcedureMessage,
419 ) -> Option<LoadedProcedure> {
420 let loaders = self.loaders.lock().unwrap();
421 let loader = loaders.get(&message.type_name).or_else(|| {
422 error!(
423 "Loader not found, procedure_id: {}, type_name: {}",
424 procedure_id, message.type_name
425 );
426 None
427 })?;
428
429 let procedure = loader(&message.data)
430 .map_err(|e| {
431 error!(
432 "Failed to load procedure data, key: {}, source: {:?}",
433 procedure_id, e
434 );
435 e
436 })
437 .ok()?;
438
439 Some(LoadedProcedure {
440 procedure,
441 step: message.step,
442 })
443 }
444
445 fn procedures_in_tree(&self, root: &ProcedureMetaRef) -> Vec<ProcedureId> {
450 let sub_num = root.num_children();
451 let mut procedures = Vec::with_capacity(1 + sub_num);
453
454 let mut queue = VecDeque::with_capacity(1 + sub_num);
455 queue.push_back(root.clone());
457
458 let mut children_ids = Vec::with_capacity(sub_num);
459 let mut children = Vec::with_capacity(sub_num);
460 while let Some(meta) = queue.pop_front() {
461 procedures.push(meta.id);
462
463 children_ids.clear();
465 meta.list_children(&mut children_ids);
466 self.find_procedures(&children_ids, &mut children);
467
468 for child in children.drain(..) {
470 queue.push_back(child);
471 }
472 }
473
474 procedures
475 }
476
477 fn find_procedures(&self, procedure_ids: &[ProcedureId], metas: &mut Vec<ProcedureMetaRef>) {
481 let procedures = self.procedures.read().unwrap();
482 for procedure_id in procedure_ids {
483 if let Some(meta) = procedures.get(procedure_id) {
484 metas.push(meta.clone());
485 }
486 }
487 }
488
489 fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
491 let now = Instant::now();
494 let mut finished_procedures = self.finished_procedures.lock().unwrap();
495 finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
496
497 let mut running_procedures = self.running_procedures.lock().unwrap();
499 for procedure_id in procedure_ids {
500 running_procedures.remove(procedure_id);
501 }
502 }
503
504 fn remove_outdated_meta(&self, ttl: Duration) {
506 let ids = {
507 let mut finished_procedures = self.finished_procedures.lock().unwrap();
508 if finished_procedures.is_empty() {
509 return;
510 }
511
512 let mut ids_to_remove = Vec::new();
513 while let Some((id, finish_time)) = finished_procedures.front() {
514 if finish_time.elapsed() > ttl {
515 ids_to_remove.push(*id);
516 let _ = finished_procedures.pop_front();
517 } else {
518 break;
521 }
522 }
523 ids_to_remove
524 };
525
526 if ids.is_empty() {
527 return;
528 }
529
530 let mut procedures = self.procedures.write().unwrap();
531 for id in ids {
532 let _ = procedures.remove(&id);
533 }
534 }
535}
536
537#[derive(Debug)]
539pub struct ManagerConfig {
540 pub parent_path: String,
541 pub max_retry_times: usize,
542 pub retry_delay: Duration,
543 pub remove_outdated_meta_task_interval: Duration,
544 pub remove_outdated_meta_ttl: Duration,
545 pub max_running_procedures: usize,
546}
547
548impl Default for ManagerConfig {
549 fn default() -> Self {
550 Self {
551 parent_path: String::default(),
552 max_retry_times: 3,
553 retry_delay: Duration::from_millis(500),
554 remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
555 remove_outdated_meta_ttl: META_TTL,
556 max_running_procedures: 128,
557 }
558 }
559}
560
561type PauseAwareRef = Arc<dyn PauseAware>;
562
563#[async_trait]
564pub trait PauseAware: Send + Sync {
565 async fn is_paused(&self) -> std::result::Result<bool, BoxedError>;
567}
568
569pub struct LocalManager {
571 manager_ctx: Arc<ManagerContext>,
572 procedure_store: Arc<ProcedureStore>,
573 max_retry_times: usize,
574 retry_delay: Duration,
575 remove_outdated_meta_task: TokioMutex<Option<RepeatedTask<Error>>>,
577 config: ManagerConfig,
578 pause_aware: Option<PauseAwareRef>,
579 event_recorder: Option<EventRecorderRef>,
580}
581
582impl LocalManager {
583 pub fn new(
585 config: ManagerConfig,
586 state_store: StateStoreRef,
587 poison_store: PoisonStoreRef,
588 pause_aware: Option<PauseAwareRef>,
589 event_recorder: Option<EventRecorderRef>,
590 ) -> LocalManager {
591 let manager_ctx = Arc::new(ManagerContext::new(poison_store));
592
593 LocalManager {
594 manager_ctx,
595 procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)),
596 max_retry_times: config.max_retry_times,
597 retry_delay: config.retry_delay,
598 remove_outdated_meta_task: TokioMutex::new(None),
599 config,
600 pause_aware,
601 event_recorder,
602 }
603 }
604
605 pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask<Error> {
607 RepeatedTask::new(
608 self.config.remove_outdated_meta_task_interval,
609 Box::new(RemoveOutdatedMetaFunction {
610 manager_ctx: self.manager_ctx.clone(),
611 ttl: self.config.remove_outdated_meta_ttl,
612 }),
613 )
614 }
615
616 fn submit_root(
618 &self,
619 procedure_id: ProcedureId,
620 procedure_state: ProcedureState,
621 step: u32,
622 procedure: BoxedProcedure,
623 ) -> Result<Watcher> {
624 ensure!(self.manager_ctx.running(), ManagerNotStartSnafu);
625
626 let user_metadata = procedure.user_metadata();
627 let meta = Arc::new(ProcedureMeta::new(
628 procedure_id,
629 procedure_state,
630 None,
631 procedure.lock_key(),
632 procedure.poison_keys(),
633 procedure.type_name(),
634 self.event_recorder.clone(),
635 user_metadata.clone(),
636 ));
637 let runner = Runner {
638 meta: meta.clone(),
639 procedure,
640 manager_ctx: self.manager_ctx.clone(),
641 step,
642 exponential_builder: ExponentialBuilder::default()
643 .with_min_delay(self.retry_delay)
644 .with_max_times(self.max_retry_times),
645 store: self.procedure_store.clone(),
646 rolling_back: false,
647 event_recorder: self.event_recorder.clone(),
648 };
649
650 if let (Some(event_recorder), Some(event)) = (
651 self.event_recorder.as_ref(),
652 user_metadata.and_then(|m| m.to_event()),
653 ) {
654 event_recorder.record(Box::new(ProcedureEvent::new(
655 procedure_id,
656 event,
657 ProcedureState::Running,
658 )));
659 }
660
661 let watcher = meta.state_receiver.clone();
662
663 ensure!(
664 self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
665 TooManyRunningProceduresSnafu {
666 max_running_procedures: self.config.max_running_procedures,
667 }
668 );
669
670 ensure!(
672 self.manager_ctx.try_insert_procedure(meta),
673 DuplicateProcedureSnafu { procedure_id },
674 );
675
676 let tracing_context = TracingContext::from_current_span();
677
678 let _handle = common_runtime::spawn_global(async move {
679 runner
683 .run()
684 .trace(
685 tracing_context
686 .attach(tracing::info_span!("LocalManager::submit_root_procedure")),
687 )
688 .await;
689 });
690
691 Ok(watcher)
692 }
693
694 fn submit_recovered_messages(
695 &self,
696 messages: HashMap<ProcedureId, ProcedureMessage>,
697 init_state: InitProcedureState,
698 ) {
699 for (procedure_id, message) in &messages {
700 if message.parent_id.is_none() {
701 let Some(mut loaded_procedure) = self
704 .manager_ctx
705 .load_one_procedure_from_message(*procedure_id, message)
706 else {
707 continue;
709 };
710
711 info!(
712 "Recover root procedure {}-{}, step: {}",
713 loaded_procedure.procedure.type_name(),
714 procedure_id,
715 loaded_procedure.step
716 );
717
718 let procedure_state = match init_state {
719 InitProcedureState::RollingBack => ProcedureState::RollingBack {
720 error: Arc::new(
721 error::RollbackProcedureRecoveredSnafu {
722 error: message.error.clone().unwrap_or("Unknown error".to_string()),
723 }
724 .build(),
725 ),
726 },
727 InitProcedureState::Running => ProcedureState::Running,
728 };
729
730 if let Err(e) = loaded_procedure.procedure.recover() {
731 error!(e; "Failed to recover procedure {}", procedure_id);
732 }
733
734 if let Err(e) = self.submit_root(
735 *procedure_id,
736 procedure_state,
737 loaded_procedure.step,
738 loaded_procedure.procedure,
739 ) {
740 error!(e; "Failed to recover procedure {}", procedure_id);
741 }
742 }
743 }
744 }
745
746 async fn recover(&self) -> Result<()> {
748 info!("LocalManager start to recover");
749 let recover_start = Instant::now();
750
751 let ProcedureMessages {
752 messages,
753 rollback_messages,
754 finished_ids,
755 } = self.procedure_store.load_messages().await?;
756 self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack);
758 self.submit_recovered_messages(messages, InitProcedureState::Running);
759
760 if !finished_ids.is_empty() {
761 info!(
762 "LocalManager try to clean finished procedures, num: {}",
763 finished_ids.len()
764 );
765
766 for procedure_id in finished_ids {
767 if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
768 error!(e; "Failed to delete procedure {}", procedure_id);
769 }
770 }
771 }
772
773 info!(
774 "LocalManager finish recovery, cost: {}ms",
775 recover_start.elapsed().as_millis()
776 );
777
778 Ok(())
779 }
780
781 #[cfg(any(test, feature = "testing"))]
782 pub fn contains_loader(&self, name: &str) -> bool {
784 let loaders = self.manager_ctx.loaders.lock().unwrap();
785 loaders.contains_key(name)
786 }
787
788 async fn check_status(&self) -> Result<()> {
789 if let Some(pause_aware) = self.pause_aware.as_ref() {
790 ensure!(
791 !pause_aware.is_paused().await.context(CheckStatusSnafu)?,
792 ManagerPasuedSnafu
793 );
794 }
795
796 Ok(())
797 }
798}
799
800#[async_trait]
801impl ProcedureManager for LocalManager {
802 fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> {
803 let mut loaders = self.manager_ctx.loaders.lock().unwrap();
804 ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name });
805
806 let _ = loaders.insert(name.to_string(), loader);
807
808 Ok(())
809 }
810
811 async fn start(&self) -> Result<()> {
812 let mut task = self.remove_outdated_meta_task.lock().await;
813
814 if task.is_some() {
815 return Ok(());
816 }
817
818 let task_inner = self.build_remove_outdated_meta_task();
819
820 task_inner
821 .start(common_runtime::global_runtime())
822 .context(StartRemoveOutdatedMetaTaskSnafu)?;
823
824 *task = Some(task_inner);
825
826 self.manager_ctx.start();
827
828 info!("LocalManager is start.");
829
830 self.recover().await
831 }
832
833 async fn stop(&self) -> Result<()> {
834 let mut task = self.remove_outdated_meta_task.lock().await;
835
836 if let Some(task) = task.take() {
837 task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?;
838 }
839
840 self.manager_ctx.stop();
841
842 info!("LocalManager is stopped.");
843
844 Ok(())
845 }
846
847 async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher> {
848 let procedure_id = procedure.id;
849 ensure!(
850 !self.manager_ctx.contains_procedure(procedure_id),
851 DuplicateProcedureSnafu { procedure_id }
852 );
853 self.check_status().await?;
854
855 self.submit_root(
856 procedure.id,
857 ProcedureState::Running,
858 0,
859 procedure.procedure,
860 )
861 }
862
863 async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
864 Ok(self.manager_ctx.state(procedure_id))
865 }
866
867 fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
868 self.manager_ctx.watcher(procedure_id)
869 }
870
871 async fn list_procedures(&self) -> Result<Vec<ProcedureInfo>> {
872 Ok(self.manager_ctx.list_procedure())
873 }
874}
875
876struct RemoveOutdatedMetaFunction {
877 manager_ctx: Arc<ManagerContext>,
878 ttl: Duration,
879}
880
881#[async_trait::async_trait]
882impl TaskFunction<Error> for RemoveOutdatedMetaFunction {
883 fn name(&self) -> &str {
884 "ProcedureManager-remove-outdated-meta-task"
885 }
886
887 async fn call(&mut self) -> Result<()> {
888 self.manager_ctx.remove_outdated_meta(self.ttl);
889 Ok(())
890 }
891}
892
893#[cfg(test)]
895pub(crate) mod test_util {
896 use common_test_util::temp_dir::TempDir;
897 use object_store::ObjectStore;
898 use object_store::services::Fs as Builder;
899
900 use super::*;
901
902 pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
903 ProcedureMeta::new(
904 ProcedureId::random(),
905 ProcedureState::Running,
906 None,
907 LockKey::default(),
908 PoisonKeys::default(),
909 "ProcedureAdapter",
910 None,
911 None,
912 )
913 }
914
915 pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
916 let store_dir = dir.path().to_str().unwrap();
917 let builder = Builder::default();
918 ObjectStore::new(builder.root(store_dir)).unwrap().finish()
919 }
920}
921
922#[cfg(test)]
923mod tests {
924 use std::assert_matches::assert_matches;
925
926 use common_error::mock::MockError;
927 use common_error::status_code::StatusCode;
928 use common_test_util::temp_dir::create_temp_dir;
929 use tokio::time::timeout;
930
931 use super::*;
932 use crate::error::{self, Error};
933 use crate::store::state_store::ObjectStateStore;
934 use crate::test_util::InMemoryPoisonStore;
935 use crate::{Context, Procedure, Status};
936
937 fn new_test_manager_context() -> ManagerContext {
938 let poison_manager = Arc::new(InMemoryPoisonStore::default());
939 ManagerContext::new(poison_manager)
940 }
941
942 #[test]
943 fn test_manager_context() {
944 let ctx = new_test_manager_context();
945 let meta = Arc::new(test_util::procedure_meta_for_test());
946
947 assert!(!ctx.contains_procedure(meta.id));
948 assert!(ctx.state(meta.id).is_none());
949
950 assert!(ctx.try_insert_procedure(meta.clone()));
951 assert!(ctx.contains_procedure(meta.id));
952
953 assert!(ctx.state(meta.id).unwrap().is_running());
954 meta.set_state(ProcedureState::Done { output: None });
955 assert!(ctx.state(meta.id).unwrap().is_done());
956 }
957
958 #[test]
959 fn test_manager_context_insert_duplicate() {
960 let ctx = new_test_manager_context();
961 let meta = Arc::new(test_util::procedure_meta_for_test());
962
963 assert!(ctx.try_insert_procedure(meta.clone()));
964 assert!(!ctx.try_insert_procedure(meta));
965 }
966
967 fn new_child(parent_id: ProcedureId, ctx: &ManagerContext) -> ProcedureMetaRef {
968 let mut child = test_util::procedure_meta_for_test();
969 child.parent_id = Some(parent_id);
970 let child = Arc::new(child);
971 assert!(ctx.try_insert_procedure(child.clone()));
972
973 let mut parent = Vec::new();
974 ctx.find_procedures(&[parent_id], &mut parent);
975 parent[0].push_child(child.id);
976
977 child
978 }
979
980 #[test]
981 fn test_procedures_in_tree() {
982 let ctx = new_test_manager_context();
983 let root = Arc::new(test_util::procedure_meta_for_test());
984 assert!(ctx.try_insert_procedure(root.clone()));
985
986 assert_eq!(1, ctx.procedures_in_tree(&root).len());
987
988 let child1 = new_child(root.id, &ctx);
989 let child2 = new_child(root.id, &ctx);
990
991 let child3 = new_child(child1.id, &ctx);
992 let child4 = new_child(child1.id, &ctx);
993
994 let child5 = new_child(child2.id, &ctx);
995
996 let expect = vec![
997 root.id, child1.id, child2.id, child3.id, child4.id, child5.id,
998 ];
999 assert_eq!(expect, ctx.procedures_in_tree(&root));
1000 }
1001
1002 #[derive(Debug)]
1003 struct ProcedureToLoad {
1004 content: String,
1005 lock_key: LockKey,
1006 poison_keys: PoisonKeys,
1007 }
1008
1009 #[async_trait]
1010 impl Procedure for ProcedureToLoad {
1011 fn type_name(&self) -> &str {
1012 "ProcedureToLoad"
1013 }
1014
1015 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1016 Ok(Status::done())
1017 }
1018
1019 fn dump(&self) -> Result<String> {
1020 Ok(self.content.clone())
1021 }
1022
1023 fn lock_key(&self) -> LockKey {
1024 self.lock_key.clone()
1025 }
1026
1027 fn poison_keys(&self) -> PoisonKeys {
1028 self.poison_keys.clone()
1029 }
1030 }
1031
1032 impl ProcedureToLoad {
1033 fn new(content: &str) -> ProcedureToLoad {
1034 ProcedureToLoad {
1035 content: content.to_string(),
1036 lock_key: LockKey::default(),
1037 poison_keys: PoisonKeys::default(),
1038 }
1039 }
1040
1041 fn loader() -> BoxedProcedureLoader {
1042 let f = |json: &str| {
1043 let procedure = ProcedureToLoad::new(json);
1044 Ok(Box::new(procedure) as _)
1045 };
1046 Box::new(f)
1047 }
1048 }
1049
1050 #[test]
1051 fn test_register_loader() {
1052 let dir = create_temp_dir("register");
1053 let config = ManagerConfig {
1054 parent_path: "data/".to_string(),
1055 max_retry_times: 3,
1056 retry_delay: Duration::from_millis(500),
1057 ..Default::default()
1058 };
1059 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1060 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1061 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1062 manager.manager_ctx.start();
1063
1064 manager
1065 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1066 .unwrap();
1067 let err = manager
1069 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1070 .unwrap_err();
1071 assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
1072 }
1073
1074 #[tokio::test]
1075 async fn test_recover() {
1076 let dir = create_temp_dir("recover");
1077 let object_store = test_util::new_object_store(&dir);
1078 let config = ManagerConfig {
1079 parent_path: "data/".to_string(),
1080 max_retry_times: 3,
1081 retry_delay: Duration::from_millis(500),
1082 ..Default::default()
1083 };
1084 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1085 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1086 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1087 manager.manager_ctx.start();
1088
1089 manager
1090 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1091 .unwrap();
1092
1093 let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1095 let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
1096 let root_id = ProcedureId::random();
1097 for step in 0..3 {
1099 let type_name = root.type_name().to_string();
1100 let data = root.dump().unwrap();
1101 procedure_store
1102 .store_procedure(root_id, step, type_name, data, None)
1103 .await
1104 .unwrap();
1105 }
1106
1107 let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure"));
1108 let child_id = ProcedureId::random();
1109 for step in 0..2 {
1111 let type_name = child.type_name().to_string();
1112 let data = child.dump().unwrap();
1113 procedure_store
1114 .store_procedure(child_id, step, type_name, data, Some(root_id))
1115 .await
1116 .unwrap();
1117 }
1118
1119 manager.recover().await.unwrap();
1121
1122 let _ = manager.procedure_state(root_id).await.unwrap().unwrap();
1124 assert!(manager.procedure_state(child_id).await.unwrap().is_none());
1127 }
1128
1129 #[tokio::test]
1130 async fn test_submit_procedure() {
1131 let dir = create_temp_dir("submit");
1132 let config = ManagerConfig {
1133 parent_path: "data/".to_string(),
1134 max_retry_times: 3,
1135 retry_delay: Duration::from_millis(500),
1136 ..Default::default()
1137 };
1138 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1139 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1140 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1141 manager.manager_ctx.start();
1142
1143 let procedure_id = ProcedureId::random();
1144 assert!(
1145 manager
1146 .procedure_state(procedure_id)
1147 .await
1148 .unwrap()
1149 .is_none()
1150 );
1151 assert!(manager.procedure_watcher(procedure_id).is_none());
1152
1153 let mut procedure = ProcedureToLoad::new("submit");
1154 procedure.lock_key = LockKey::single_exclusive("test.submit");
1155 assert!(
1156 manager
1157 .submit(ProcedureWithId {
1158 id: procedure_id,
1159 procedure: Box::new(procedure),
1160 })
1161 .await
1162 .is_ok()
1163 );
1164 assert!(
1165 manager
1166 .procedure_state(procedure_id)
1167 .await
1168 .unwrap()
1169 .is_some()
1170 );
1171 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1173 watcher.changed().await.unwrap();
1174 assert!(watcher.borrow().is_done());
1175
1176 let err = manager
1178 .submit(ProcedureWithId {
1179 id: procedure_id,
1180 procedure: Box::new(ProcedureToLoad::new("submit")),
1181 })
1182 .await
1183 .unwrap_err();
1184 assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}");
1185 }
1186
1187 #[tokio::test]
1188 async fn test_state_changed_on_err() {
1189 let dir = create_temp_dir("on_err");
1190 let config = ManagerConfig {
1191 parent_path: "data/".to_string(),
1192 max_retry_times: 3,
1193 retry_delay: Duration::from_millis(500),
1194 ..Default::default()
1195 };
1196 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1197 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1198 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1199 manager.manager_ctx.start();
1200
1201 #[derive(Debug)]
1202 struct MockProcedure {
1203 panic: bool,
1204 }
1205
1206 #[async_trait]
1207 impl Procedure for MockProcedure {
1208 fn type_name(&self) -> &str {
1209 "MockProcedure"
1210 }
1211
1212 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1213 if self.panic {
1214 panic!();
1217 } else {
1218 Err(Error::external(MockError::new(StatusCode::Unexpected)))
1219 }
1220 }
1221
1222 async fn rollback(&mut self, _: &Context) -> Result<()> {
1223 Ok(())
1224 }
1225
1226 fn rollback_supported(&self) -> bool {
1227 true
1228 }
1229
1230 fn dump(&self) -> Result<String> {
1231 Ok(String::new())
1232 }
1233
1234 fn lock_key(&self) -> LockKey {
1235 LockKey::single_exclusive("test.submit")
1236 }
1237
1238 fn poison_keys(&self) -> PoisonKeys {
1239 PoisonKeys::default()
1240 }
1241 }
1242
1243 let check_procedure = |procedure| async {
1244 let procedure_id = ProcedureId::random();
1245 manager
1246 .submit(ProcedureWithId {
1247 id: procedure_id,
1248 procedure: Box::new(procedure),
1249 })
1250 .await
1251 .unwrap()
1252 };
1253
1254 let mut watcher = check_procedure(MockProcedure { panic: false }).await;
1255 watcher.changed().await.unwrap();
1257 assert!(watcher.borrow().is_prepare_rollback());
1258 watcher.changed().await.unwrap();
1259 assert!(watcher.borrow().is_rolling_back());
1260 watcher.changed().await.unwrap();
1261 assert!(watcher.borrow().is_failed());
1262 let mut watcher = check_procedure(MockProcedure { panic: true }).await;
1264 watcher.changed().await.unwrap();
1265 assert!(watcher.borrow().is_failed());
1266 }
1267
1268 #[tokio::test]
1269 async fn test_procedure_manager_stopped() {
1270 let dir = create_temp_dir("procedure_manager_stopped");
1271 let config = ManagerConfig {
1272 parent_path: "data/".to_string(),
1273 max_retry_times: 3,
1274 retry_delay: Duration::from_millis(500),
1275 ..Default::default()
1276 };
1277 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1278 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1279 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1280
1281 let mut procedure = ProcedureToLoad::new("submit");
1282 procedure.lock_key = LockKey::single_exclusive("test.submit");
1283 let procedure_id = ProcedureId::random();
1284 assert_matches!(
1285 manager
1286 .submit(ProcedureWithId {
1287 id: procedure_id,
1288 procedure: Box::new(procedure),
1289 })
1290 .await
1291 .unwrap_err(),
1292 error::Error::ManagerNotStart { .. }
1293 );
1294 }
1295
1296 #[tokio::test]
1297 async fn test_procedure_manager_restart() {
1298 let dir = create_temp_dir("procedure_manager_restart");
1299 let config = ManagerConfig {
1300 parent_path: "data/".to_string(),
1301 max_retry_times: 3,
1302 retry_delay: Duration::from_millis(500),
1303 ..Default::default()
1304 };
1305 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1306 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1307 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1308
1309 manager.start().await.unwrap();
1310 manager.stop().await.unwrap();
1311 manager.start().await.unwrap();
1312
1313 let mut procedure = ProcedureToLoad::new("submit");
1314 procedure.lock_key = LockKey::single_exclusive("test.submit");
1315 let procedure_id = ProcedureId::random();
1316 assert!(
1317 manager
1318 .submit(ProcedureWithId {
1319 id: procedure_id,
1320 procedure: Box::new(procedure),
1321 })
1322 .await
1323 .is_ok()
1324 );
1325 assert!(
1326 manager
1327 .procedure_state(procedure_id)
1328 .await
1329 .unwrap()
1330 .is_some()
1331 );
1332 }
1333
1334 #[tokio::test(flavor = "multi_thread")]
1335 async fn test_remove_outdated_meta_task() {
1336 let dir = create_temp_dir("remove_outdated_meta_task");
1337 let object_store = test_util::new_object_store(&dir);
1338 let config = ManagerConfig {
1339 parent_path: "data/".to_string(),
1340 max_retry_times: 3,
1341 retry_delay: Duration::from_millis(500),
1342 remove_outdated_meta_task_interval: Duration::from_millis(1),
1343 remove_outdated_meta_ttl: Duration::from_millis(1),
1344 max_running_procedures: 128,
1345 };
1346 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1347 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1348 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1349 manager.manager_ctx.set_running();
1350
1351 let mut procedure = ProcedureToLoad::new("submit");
1352 procedure.lock_key = LockKey::single_exclusive("test.submit");
1353 let procedure_id = ProcedureId::random();
1354 assert!(
1355 manager
1356 .submit(ProcedureWithId {
1357 id: procedure_id,
1358 procedure: Box::new(procedure),
1359 })
1360 .await
1361 .is_ok()
1362 );
1363 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1364 watcher.changed().await.unwrap();
1365
1366 manager.start().await.unwrap();
1367 tokio::time::sleep(Duration::from_millis(300)).await;
1368 assert!(
1369 manager
1370 .procedure_state(procedure_id)
1371 .await
1372 .unwrap()
1373 .is_none()
1374 );
1375
1376 manager.stop().await.unwrap();
1378 let mut procedure = ProcedureToLoad::new("submit");
1379 procedure.lock_key = LockKey::single_exclusive("test.submit");
1380 let procedure_id = ProcedureId::random();
1381
1382 manager.manager_ctx.set_running();
1383 assert!(
1384 manager
1385 .submit(ProcedureWithId {
1386 id: procedure_id,
1387 procedure: Box::new(procedure),
1388 })
1389 .await
1390 .is_ok()
1391 );
1392 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1393 watcher.changed().await.unwrap();
1394 tokio::time::sleep(Duration::from_millis(300)).await;
1395 assert!(
1396 manager
1397 .procedure_state(procedure_id)
1398 .await
1399 .unwrap()
1400 .is_some()
1401 );
1402
1403 let mut procedure = ProcedureToLoad::new("submit");
1405 procedure.lock_key = LockKey::single_exclusive("test.submit");
1406 let procedure_id = ProcedureId::random();
1407 assert!(
1408 manager
1409 .submit(ProcedureWithId {
1410 id: procedure_id,
1411 procedure: Box::new(procedure),
1412 })
1413 .await
1414 .is_ok()
1415 );
1416 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1417 watcher.changed().await.unwrap();
1418
1419 manager.start().await.unwrap();
1420 tokio::time::sleep(Duration::from_millis(300)).await;
1421 assert!(
1422 manager
1423 .procedure_state(procedure_id)
1424 .await
1425 .unwrap()
1426 .is_none()
1427 );
1428 }
1429
1430 #[tokio::test]
1431 async fn test_too_many_running_procedures() {
1432 let dir = create_temp_dir("too_many_running_procedures");
1433 let config = ManagerConfig {
1434 parent_path: "data/".to_string(),
1435 max_retry_times: 3,
1436 retry_delay: Duration::from_millis(500),
1437 max_running_procedures: 1,
1438 ..Default::default()
1439 };
1440 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1441 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1442 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1443 manager.manager_ctx.set_running();
1444
1445 manager
1446 .manager_ctx
1447 .running_procedures
1448 .lock()
1449 .unwrap()
1450 .insert(ProcedureId::random());
1451 manager.start().await.unwrap();
1452
1453 let mut procedure = ProcedureToLoad::new("submit");
1455 procedure.lock_key = LockKey::single_exclusive("test.submit");
1456 let procedure_id = ProcedureId::random();
1457 let err = manager
1458 .submit(ProcedureWithId {
1459 id: procedure_id,
1460 procedure: Box::new(procedure),
1461 })
1462 .await
1463 .unwrap_err();
1464 assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
1465
1466 manager
1467 .manager_ctx
1468 .running_procedures
1469 .lock()
1470 .unwrap()
1471 .clear();
1472
1473 let mut procedure = ProcedureToLoad::new("submit");
1475 procedure.lock_key = LockKey::single_exclusive("test.submit");
1476 assert!(
1477 manager
1478 .submit(ProcedureWithId {
1479 id: procedure_id,
1480 procedure: Box::new(procedure),
1481 })
1482 .await
1483 .is_ok()
1484 );
1485 assert!(
1486 manager
1487 .procedure_state(procedure_id)
1488 .await
1489 .unwrap()
1490 .is_some()
1491 );
1492 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1494 watcher.changed().await.unwrap();
1495 assert!(watcher.borrow().is_done());
1496 }
1497
1498 #[derive(Debug)]
1499 struct ProcedureToRecover {
1500 content: String,
1501 lock_key: LockKey,
1502 notify: Option<Arc<Notify>>,
1503 poison_keys: PoisonKeys,
1504 }
1505
1506 #[async_trait]
1507 impl Procedure for ProcedureToRecover {
1508 fn type_name(&self) -> &str {
1509 "ProcedureToRecover"
1510 }
1511
1512 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1513 Ok(Status::done())
1514 }
1515
1516 fn dump(&self) -> Result<String> {
1517 Ok(self.content.clone())
1518 }
1519
1520 fn lock_key(&self) -> LockKey {
1521 self.lock_key.clone()
1522 }
1523
1524 fn recover(&mut self) -> Result<()> {
1525 self.notify.as_ref().unwrap().notify_one();
1526 Ok(())
1527 }
1528
1529 fn poison_keys(&self) -> PoisonKeys {
1530 self.poison_keys.clone()
1531 }
1532 }
1533
1534 impl ProcedureToRecover {
1535 fn new(content: &str) -> ProcedureToRecover {
1536 ProcedureToRecover {
1537 content: content.to_string(),
1538 lock_key: LockKey::default(),
1539 poison_keys: PoisonKeys::default(),
1540 notify: None,
1541 }
1542 }
1543
1544 fn loader(notify: Arc<Notify>) -> BoxedProcedureLoader {
1545 let f = move |json: &str| {
1546 let procedure = ProcedureToRecover {
1547 content: json.to_string(),
1548 lock_key: LockKey::default(),
1549 poison_keys: PoisonKeys::default(),
1550 notify: Some(notify.clone()),
1551 };
1552 Ok(Box::new(procedure) as _)
1553 };
1554 Box::new(f)
1555 }
1556 }
1557
1558 #[tokio::test]
1559 async fn test_procedure_recover() {
1560 common_telemetry::init_default_ut_logging();
1561 let dir = create_temp_dir("procedure_recover");
1562 let object_store = test_util::new_object_store(&dir);
1563 let config = ManagerConfig {
1564 parent_path: "data/".to_string(),
1565 max_retry_times: 3,
1566 retry_delay: Duration::from_millis(500),
1567 ..Default::default()
1568 };
1569 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1570 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1571 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1572 manager.manager_ctx.start();
1573
1574 let notify = Arc::new(Notify::new());
1575 manager
1576 .register_loader(
1577 "ProcedureToRecover",
1578 ProcedureToRecover::loader(notify.clone()),
1579 )
1580 .unwrap();
1581
1582 let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1584 let root: BoxedProcedure = Box::new(ProcedureToRecover::new("test procedure recovery"));
1585 let root_id = ProcedureId::random();
1586 for step in 0..3 {
1588 let type_name = root.type_name().to_string();
1589 let data = root.dump().unwrap();
1590 procedure_store
1591 .store_procedure(root_id, step, type_name, data, None)
1592 .await
1593 .unwrap();
1594 }
1595
1596 manager.recover().await.unwrap();
1598 timeout(Duration::from_secs(10), notify.notified())
1599 .await
1600 .unwrap();
1601 }
1602}