1mod runner;
16
17use std::collections::hash_map::Entry;
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20use std::sync::{Arc, Mutex, RwLock};
21use std::time::{Duration, Instant};
22
23use async_trait::async_trait;
24use backon::ExponentialBuilder;
25use common_error::ext::BoxedError;
26use common_event_recorder::EventRecorderRef;
27use common_runtime::{RepeatedTask, TaskFunction};
28use common_telemetry::tracing_context::{FutureExt, TracingContext};
29use common_telemetry::{error, info, tracing};
30use snafu::{ensure, OptionExt, ResultExt};
31use tokio::sync::watch::{self, Receiver, Sender};
32use tokio::sync::{Mutex as TokioMutex, Notify};
33
34use crate::error::{
35 self, CheckStatusSnafu, DuplicateProcedureSnafu, Error, LoaderConflictSnafu,
36 ManagerNotStartSnafu, ManagerPasuedSnafu, PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu,
37 Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
38 TooManyRunningProceduresSnafu,
39};
40use crate::event::ProcedureEvent;
41use crate::local::runner::Runner;
42use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
43use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
44use crate::store::poison_store::PoisonStoreRef;
45use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
46use crate::{
47 BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
48 ProcedureState, ProcedureWithId, StringKey, UserMetadata, Watcher,
49};
50
51const META_TTL: Duration = Duration::from_secs(60 * 10);
53
54#[derive(Debug)]
63pub(crate) struct ProcedureMeta {
64 id: ProcedureId,
66 type_name: String,
68 parent_id: Option<ProcedureId>,
70 child_notify: Notify,
72 lock_key: LockKey,
74 poison_keys: PoisonKeys,
76 state_sender: Sender<ProcedureState>,
78 state_receiver: Receiver<ProcedureState>,
80 children: Mutex<Vec<ProcedureId>>,
82 start_time_ms: AtomicI64,
84 end_time_ms: AtomicI64,
86 event_recorder: Option<EventRecorderRef>,
88 user_metadata: Option<UserMetadata>,
90}
91
92impl ProcedureMeta {
93 #[allow(clippy::too_many_arguments)]
94 fn new(
95 id: ProcedureId,
96 procedure_state: ProcedureState,
97 parent_id: Option<ProcedureId>,
98 lock_key: LockKey,
99 poison_keys: PoisonKeys,
100 type_name: &str,
101 event_recorder: Option<EventRecorderRef>,
102 user_metadata: Option<UserMetadata>,
103 ) -> ProcedureMeta {
104 let (state_sender, state_receiver) = watch::channel(procedure_state);
105 ProcedureMeta {
106 id,
107 parent_id,
108 child_notify: Notify::new(),
109 lock_key,
110 poison_keys,
111 state_sender,
112 state_receiver,
113 children: Mutex::new(Vec::new()),
114 start_time_ms: AtomicI64::new(0),
115 end_time_ms: AtomicI64::new(0),
116 type_name: type_name.to_string(),
117 event_recorder,
118 user_metadata,
119 }
120 }
121
122 fn state(&self) -> ProcedureState {
124 self.state_receiver.borrow().clone()
125 }
126
127 fn set_state(&self, state: ProcedureState) {
129 if let (Some(event_recorder), Some(user_metadata)) =
131 (&self.event_recorder, &self.user_metadata)
132 {
133 if let Some(event) = user_metadata.to_event() {
134 event_recorder.record(Box::new(ProcedureEvent::new(self.id, event, state.clone())));
135 }
136 }
137
138 self.state_sender.send(state).unwrap();
140 }
141
142 fn push_child(&self, procedure_id: ProcedureId) {
144 let mut children = self.children.lock().unwrap();
145 children.push(procedure_id);
146 }
147
148 fn list_children(&self, buffer: &mut Vec<ProcedureId>) {
150 let children = self.children.lock().unwrap();
151 buffer.extend_from_slice(&children);
152 }
153
154 fn num_children(&self) -> usize {
156 self.children.lock().unwrap().len()
157 }
158
159 fn set_start_time_ms(&self) {
161 self.start_time_ms
162 .store(common_time::util::current_time_millis(), Ordering::Relaxed);
163 }
164
165 fn set_end_time_ms(&self) {
167 self.end_time_ms
168 .store(common_time::util::current_time_millis(), Ordering::Relaxed);
169 }
170}
171
172type ProcedureMetaRef = Arc<ProcedureMeta>;
174
175struct LoadedProcedure {
177 procedure: BoxedProcedure,
178 step: u32,
179}
180
181pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
188
189pub async fn acquire_dynamic_key_lock(
194 lock: &DynamicKeyLock,
195 key: &StringKey,
196) -> DynamicKeyLockGuard {
197 match key {
198 StringKey::Share(key) => {
199 let guard = lock.read(key.to_string()).await;
200 DynamicKeyLockGuard {
201 guard: Some(OwnedKeyRwLockGuard::from(guard)),
202 key: key.to_string(),
203 lock: lock.clone(),
204 }
205 }
206 StringKey::Exclusive(key) => {
207 let guard = lock.write(key.to_string()).await;
208 DynamicKeyLockGuard {
209 guard: Some(OwnedKeyRwLockGuard::from(guard)),
210 key: key.to_string(),
211 lock: lock.clone(),
212 }
213 }
214 }
215}
216pub struct DynamicKeyLockGuard {
221 guard: Option<OwnedKeyRwLockGuard>,
222 key: String,
223 lock: DynamicKeyLock,
224}
225
226impl Drop for DynamicKeyLockGuard {
227 fn drop(&mut self) {
228 if let Some(guard) = self.guard.take() {
229 drop(guard);
230 }
231 self.lock.clean_keys(&[self.key.to_string()]);
232 }
233}
234
235pub(crate) struct ManagerContext {
237 loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
239 key_lock: KeyRwLock<String>,
245 dynamic_key_lock: DynamicKeyLock,
252 procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
254 running_procedures: Mutex<HashSet<ProcedureId>>,
256 finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
258 running: Arc<AtomicBool>,
260 poison_manager: PoisonStoreRef,
262}
263
264#[async_trait]
265impl ContextProvider for ManagerContext {
266 async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
267 Ok(self.state(procedure_id))
268 }
269
270 async fn procedure_state_receiver(
271 &self,
272 procedure_id: ProcedureId,
273 ) -> Result<Option<Receiver<ProcedureState>>> {
274 Ok(self.state_receiver(procedure_id))
275 }
276
277 async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
278 {
279 let procedures = self.procedures.read().unwrap();
281 let procedure = procedures
282 .get(&procedure_id)
283 .context(ProcedureNotFoundSnafu { procedure_id })?;
284
285 ensure!(
287 procedure.poison_keys.contains(key),
288 PoisonKeyNotDefinedSnafu {
289 key: key.clone(),
290 procedure_id
291 }
292 );
293 }
294 let key = key.to_string();
295 let procedure_id = procedure_id.to_string();
296 self.poison_manager.try_put_poison(key, procedure_id).await
297 }
298
299 async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
300 acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
301 }
302}
303
304impl ManagerContext {
305 fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
307 ManagerContext {
308 key_lock: KeyRwLock::new(),
309 dynamic_key_lock: Arc::new(KeyRwLock::new()),
310 loaders: Mutex::new(HashMap::new()),
311 procedures: RwLock::new(HashMap::new()),
312 running_procedures: Mutex::new(HashSet::new()),
313 finished_procedures: Mutex::new(VecDeque::new()),
314 running: Arc::new(AtomicBool::new(false)),
315 poison_manager,
316 }
317 }
318
319 #[cfg(test)]
320 pub(crate) fn set_running(&self) {
321 self.running.store(true, Ordering::Relaxed);
322 }
323
324 pub(crate) fn start(&self) {
326 self.running.store(true, Ordering::Relaxed);
327 }
328
329 pub(crate) fn stop(&self) {
330 self.running.store(false, Ordering::Relaxed);
331 }
332
333 pub(crate) fn running(&self) -> bool {
335 self.running.load(Ordering::Relaxed)
336 }
337
338 fn contains_procedure(&self, procedure_id: ProcedureId) -> bool {
340 let procedures = self.procedures.read().unwrap();
341 procedures.contains_key(&procedure_id)
342 }
343
344 fn num_running_procedures(&self) -> usize {
346 self.running_procedures.lock().unwrap().len()
347 }
348
349 fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
354 let procedure_id = meta.id;
355 let mut procedures = self.procedures.write().unwrap();
356 match procedures.entry(procedure_id) {
357 Entry::Occupied(_) => return false,
358 Entry::Vacant(vacant_entry) => {
359 vacant_entry.insert(meta);
360 }
361 }
362
363 let mut running_procedures = self.running_procedures.lock().unwrap();
364 running_procedures.insert(procedure_id);
365
366 true
367 }
368
369 fn state(&self, procedure_id: ProcedureId) -> Option<ProcedureState> {
371 let procedures = self.procedures.read().unwrap();
372 procedures.get(&procedure_id).map(|meta| meta.state())
373 }
374
375 fn state_receiver(&self, procedure_id: ProcedureId) -> Option<Receiver<ProcedureState>> {
377 let procedures = self.procedures.read().unwrap();
378 procedures
379 .get(&procedure_id)
380 .map(|meta| meta.state_receiver.clone())
381 }
382
383 fn list_procedure(&self) -> Vec<ProcedureInfo> {
385 let procedures = self.procedures.read().unwrap();
386 procedures
387 .values()
388 .map(|meta| ProcedureInfo {
389 id: meta.id,
390 type_name: meta.type_name.clone(),
391 start_time_ms: meta.start_time_ms.load(Ordering::Relaxed),
392 end_time_ms: meta.end_time_ms.load(Ordering::Relaxed),
393 state: meta.state(),
394 lock_keys: meta.lock_key.get_keys(),
395 })
396 .collect()
397 }
398
399 fn watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
401 let procedures = self.procedures.read().unwrap();
402 procedures
403 .get(&procedure_id)
404 .map(|meta| meta.state_receiver.clone())
405 }
406
407 fn notify_by_subprocedure(&self, procedure_id: ProcedureId) {
409 let procedures = self.procedures.read().unwrap();
410 if let Some(meta) = procedures.get(&procedure_id) {
411 meta.child_notify.notify_one();
412 }
413 }
414
415 fn load_one_procedure_from_message(
417 &self,
418 procedure_id: ProcedureId,
419 message: &ProcedureMessage,
420 ) -> Option<LoadedProcedure> {
421 let loaders = self.loaders.lock().unwrap();
422 let loader = loaders.get(&message.type_name).or_else(|| {
423 error!(
424 "Loader not found, procedure_id: {}, type_name: {}",
425 procedure_id, message.type_name
426 );
427 None
428 })?;
429
430 let procedure = loader(&message.data)
431 .map_err(|e| {
432 error!(
433 "Failed to load procedure data, key: {}, source: {:?}",
434 procedure_id, e
435 );
436 e
437 })
438 .ok()?;
439
440 Some(LoadedProcedure {
441 procedure,
442 step: message.step,
443 })
444 }
445
446 fn procedures_in_tree(&self, root: &ProcedureMetaRef) -> Vec<ProcedureId> {
451 let sub_num = root.num_children();
452 let mut procedures = Vec::with_capacity(1 + sub_num);
454
455 let mut queue = VecDeque::with_capacity(1 + sub_num);
456 queue.push_back(root.clone());
458
459 let mut children_ids = Vec::with_capacity(sub_num);
460 let mut children = Vec::with_capacity(sub_num);
461 while let Some(meta) = queue.pop_front() {
462 procedures.push(meta.id);
463
464 children_ids.clear();
466 meta.list_children(&mut children_ids);
467 self.find_procedures(&children_ids, &mut children);
468
469 for child in children.drain(..) {
471 queue.push_back(child);
472 }
473 }
474
475 procedures
476 }
477
478 fn find_procedures(&self, procedure_ids: &[ProcedureId], metas: &mut Vec<ProcedureMetaRef>) {
482 let procedures = self.procedures.read().unwrap();
483 for procedure_id in procedure_ids {
484 if let Some(meta) = procedures.get(procedure_id) {
485 metas.push(meta.clone());
486 }
487 }
488 }
489
490 fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
492 let now = Instant::now();
495 let mut finished_procedures = self.finished_procedures.lock().unwrap();
496 finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
497
498 let mut running_procedures = self.running_procedures.lock().unwrap();
500 for procedure_id in procedure_ids {
501 running_procedures.remove(procedure_id);
502 }
503 }
504
505 fn remove_outdated_meta(&self, ttl: Duration) {
507 let ids = {
508 let mut finished_procedures = self.finished_procedures.lock().unwrap();
509 if finished_procedures.is_empty() {
510 return;
511 }
512
513 let mut ids_to_remove = Vec::new();
514 while let Some((id, finish_time)) = finished_procedures.front() {
515 if finish_time.elapsed() > ttl {
516 ids_to_remove.push(*id);
517 let _ = finished_procedures.pop_front();
518 } else {
519 break;
522 }
523 }
524 ids_to_remove
525 };
526
527 if ids.is_empty() {
528 return;
529 }
530
531 let mut procedures = self.procedures.write().unwrap();
532 for id in ids {
533 let _ = procedures.remove(&id);
534 }
535 }
536}
537
538#[derive(Debug)]
540pub struct ManagerConfig {
541 pub parent_path: String,
542 pub max_retry_times: usize,
543 pub retry_delay: Duration,
544 pub remove_outdated_meta_task_interval: Duration,
545 pub remove_outdated_meta_ttl: Duration,
546 pub max_running_procedures: usize,
547}
548
549impl Default for ManagerConfig {
550 fn default() -> Self {
551 Self {
552 parent_path: String::default(),
553 max_retry_times: 3,
554 retry_delay: Duration::from_millis(500),
555 remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
556 remove_outdated_meta_ttl: META_TTL,
557 max_running_procedures: 128,
558 }
559 }
560}
561
562type PauseAwareRef = Arc<dyn PauseAware>;
563
564#[async_trait]
565pub trait PauseAware: Send + Sync {
566 async fn is_paused(&self) -> std::result::Result<bool, BoxedError>;
568}
569
570pub struct LocalManager {
572 manager_ctx: Arc<ManagerContext>,
573 procedure_store: Arc<ProcedureStore>,
574 max_retry_times: usize,
575 retry_delay: Duration,
576 remove_outdated_meta_task: TokioMutex<Option<RepeatedTask<Error>>>,
578 config: ManagerConfig,
579 pause_aware: Option<PauseAwareRef>,
580 event_recorder: Option<EventRecorderRef>,
581}
582
583impl LocalManager {
584 pub fn new(
586 config: ManagerConfig,
587 state_store: StateStoreRef,
588 poison_store: PoisonStoreRef,
589 pause_aware: Option<PauseAwareRef>,
590 event_recorder: Option<EventRecorderRef>,
591 ) -> LocalManager {
592 let manager_ctx = Arc::new(ManagerContext::new(poison_store));
593
594 LocalManager {
595 manager_ctx,
596 procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)),
597 max_retry_times: config.max_retry_times,
598 retry_delay: config.retry_delay,
599 remove_outdated_meta_task: TokioMutex::new(None),
600 config,
601 pause_aware,
602 event_recorder,
603 }
604 }
605
606 pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask<Error> {
608 RepeatedTask::new(
609 self.config.remove_outdated_meta_task_interval,
610 Box::new(RemoveOutdatedMetaFunction {
611 manager_ctx: self.manager_ctx.clone(),
612 ttl: self.config.remove_outdated_meta_ttl,
613 }),
614 )
615 }
616
617 fn submit_root(
619 &self,
620 procedure_id: ProcedureId,
621 procedure_state: ProcedureState,
622 step: u32,
623 procedure: BoxedProcedure,
624 ) -> Result<Watcher> {
625 ensure!(self.manager_ctx.running(), ManagerNotStartSnafu);
626
627 let user_metadata = procedure.user_metadata();
628 let meta = Arc::new(ProcedureMeta::new(
629 procedure_id,
630 procedure_state,
631 None,
632 procedure.lock_key(),
633 procedure.poison_keys(),
634 procedure.type_name(),
635 self.event_recorder.clone(),
636 user_metadata.clone(),
637 ));
638 let runner = Runner {
639 meta: meta.clone(),
640 procedure,
641 manager_ctx: self.manager_ctx.clone(),
642 step,
643 exponential_builder: ExponentialBuilder::default()
644 .with_min_delay(self.retry_delay)
645 .with_max_times(self.max_retry_times),
646 store: self.procedure_store.clone(),
647 rolling_back: false,
648 event_recorder: self.event_recorder.clone(),
649 };
650
651 if let (Some(event_recorder), Some(event)) = (
652 self.event_recorder.as_ref(),
653 user_metadata.and_then(|m| m.to_event()),
654 ) {
655 event_recorder.record(Box::new(ProcedureEvent::new(
656 procedure_id,
657 event,
658 ProcedureState::Running,
659 )));
660 }
661
662 let watcher = meta.state_receiver.clone();
663
664 ensure!(
665 self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
666 TooManyRunningProceduresSnafu {
667 max_running_procedures: self.config.max_running_procedures,
668 }
669 );
670
671 ensure!(
673 self.manager_ctx.try_insert_procedure(meta),
674 DuplicateProcedureSnafu { procedure_id },
675 );
676
677 let tracing_context = TracingContext::from_current_span();
678
679 let _handle = common_runtime::spawn_global(async move {
680 runner
684 .run()
685 .trace(
686 tracing_context
687 .attach(tracing::info_span!("LocalManager::submit_root_procedure")),
688 )
689 .await;
690 });
691
692 Ok(watcher)
693 }
694
695 fn submit_recovered_messages(
696 &self,
697 messages: HashMap<ProcedureId, ProcedureMessage>,
698 init_state: InitProcedureState,
699 ) {
700 for (procedure_id, message) in &messages {
701 if message.parent_id.is_none() {
702 let Some(mut loaded_procedure) = self
705 .manager_ctx
706 .load_one_procedure_from_message(*procedure_id, message)
707 else {
708 continue;
710 };
711
712 info!(
713 "Recover root procedure {}-{}, step: {}",
714 loaded_procedure.procedure.type_name(),
715 procedure_id,
716 loaded_procedure.step
717 );
718
719 let procedure_state = match init_state {
720 InitProcedureState::RollingBack => ProcedureState::RollingBack {
721 error: Arc::new(
722 error::RollbackProcedureRecoveredSnafu {
723 error: message.error.clone().unwrap_or("Unknown error".to_string()),
724 }
725 .build(),
726 ),
727 },
728 InitProcedureState::Running => ProcedureState::Running,
729 };
730
731 if let Err(e) = loaded_procedure.procedure.recover() {
732 error!(e; "Failed to recover procedure {}", procedure_id);
733 }
734
735 if let Err(e) = self.submit_root(
736 *procedure_id,
737 procedure_state,
738 loaded_procedure.step,
739 loaded_procedure.procedure,
740 ) {
741 error!(e; "Failed to recover procedure {}", procedure_id);
742 }
743 }
744 }
745 }
746
747 async fn recover(&self) -> Result<()> {
749 info!("LocalManager start to recover");
750 let recover_start = Instant::now();
751
752 let ProcedureMessages {
753 messages,
754 rollback_messages,
755 finished_ids,
756 } = self.procedure_store.load_messages().await?;
757 self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack);
759 self.submit_recovered_messages(messages, InitProcedureState::Running);
760
761 if !finished_ids.is_empty() {
762 info!(
763 "LocalManager try to clean finished procedures, num: {}",
764 finished_ids.len()
765 );
766
767 for procedure_id in finished_ids {
768 if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
769 error!(e; "Failed to delete procedure {}", procedure_id);
770 }
771 }
772 }
773
774 info!(
775 "LocalManager finish recovery, cost: {}ms",
776 recover_start.elapsed().as_millis()
777 );
778
779 Ok(())
780 }
781
782 #[cfg(any(test, feature = "testing"))]
783 pub fn contains_loader(&self, name: &str) -> bool {
785 let loaders = self.manager_ctx.loaders.lock().unwrap();
786 loaders.contains_key(name)
787 }
788
789 async fn check_status(&self) -> Result<()> {
790 if let Some(pause_aware) = self.pause_aware.as_ref() {
791 ensure!(
792 !pause_aware.is_paused().await.context(CheckStatusSnafu)?,
793 ManagerPasuedSnafu
794 );
795 }
796
797 Ok(())
798 }
799}
800
801#[async_trait]
802impl ProcedureManager for LocalManager {
803 fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> {
804 let mut loaders = self.manager_ctx.loaders.lock().unwrap();
805 ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name });
806
807 let _ = loaders.insert(name.to_string(), loader);
808
809 Ok(())
810 }
811
812 async fn start(&self) -> Result<()> {
813 let mut task = self.remove_outdated_meta_task.lock().await;
814
815 if task.is_some() {
816 return Ok(());
817 }
818
819 let task_inner = self.build_remove_outdated_meta_task();
820
821 task_inner
822 .start(common_runtime::global_runtime())
823 .context(StartRemoveOutdatedMetaTaskSnafu)?;
824
825 *task = Some(task_inner);
826
827 self.manager_ctx.start();
828
829 info!("LocalManager is start.");
830
831 self.recover().await
832 }
833
834 async fn stop(&self) -> Result<()> {
835 let mut task = self.remove_outdated_meta_task.lock().await;
836
837 if let Some(task) = task.take() {
838 task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?;
839 }
840
841 self.manager_ctx.stop();
842
843 info!("LocalManager is stopped.");
844
845 Ok(())
846 }
847
848 async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher> {
849 let procedure_id = procedure.id;
850 ensure!(
851 !self.manager_ctx.contains_procedure(procedure_id),
852 DuplicateProcedureSnafu { procedure_id }
853 );
854 self.check_status().await?;
855
856 self.submit_root(
857 procedure.id,
858 ProcedureState::Running,
859 0,
860 procedure.procedure,
861 )
862 }
863
864 async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
865 Ok(self.manager_ctx.state(procedure_id))
866 }
867
868 fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
869 self.manager_ctx.watcher(procedure_id)
870 }
871
872 async fn list_procedures(&self) -> Result<Vec<ProcedureInfo>> {
873 Ok(self.manager_ctx.list_procedure())
874 }
875}
876
877struct RemoveOutdatedMetaFunction {
878 manager_ctx: Arc<ManagerContext>,
879 ttl: Duration,
880}
881
882#[async_trait::async_trait]
883impl TaskFunction<Error> for RemoveOutdatedMetaFunction {
884 fn name(&self) -> &str {
885 "ProcedureManager-remove-outdated-meta-task"
886 }
887
888 async fn call(&mut self) -> Result<()> {
889 self.manager_ctx.remove_outdated_meta(self.ttl);
890 Ok(())
891 }
892}
893
894#[cfg(test)]
896pub(crate) mod test_util {
897 use common_test_util::temp_dir::TempDir;
898 use object_store::services::Fs as Builder;
899 use object_store::ObjectStore;
900
901 use super::*;
902
903 pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
904 ProcedureMeta::new(
905 ProcedureId::random(),
906 ProcedureState::Running,
907 None,
908 LockKey::default(),
909 PoisonKeys::default(),
910 "ProcedureAdapter",
911 None,
912 None,
913 )
914 }
915
916 pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
917 let store_dir = dir.path().to_str().unwrap();
918 let builder = Builder::default();
919 ObjectStore::new(builder.root(store_dir)).unwrap().finish()
920 }
921}
922
923#[cfg(test)]
924mod tests {
925 use std::assert_matches::assert_matches;
926
927 use common_error::mock::MockError;
928 use common_error::status_code::StatusCode;
929 use common_test_util::temp_dir::create_temp_dir;
930 use tokio::time::timeout;
931
932 use super::*;
933 use crate::error::{self, Error};
934 use crate::store::state_store::ObjectStateStore;
935 use crate::test_util::InMemoryPoisonStore;
936 use crate::{Context, Procedure, Status};
937
938 fn new_test_manager_context() -> ManagerContext {
939 let poison_manager = Arc::new(InMemoryPoisonStore::default());
940 ManagerContext::new(poison_manager)
941 }
942
943 #[test]
944 fn test_manager_context() {
945 let ctx = new_test_manager_context();
946 let meta = Arc::new(test_util::procedure_meta_for_test());
947
948 assert!(!ctx.contains_procedure(meta.id));
949 assert!(ctx.state(meta.id).is_none());
950
951 assert!(ctx.try_insert_procedure(meta.clone()));
952 assert!(ctx.contains_procedure(meta.id));
953
954 assert!(ctx.state(meta.id).unwrap().is_running());
955 meta.set_state(ProcedureState::Done { output: None });
956 assert!(ctx.state(meta.id).unwrap().is_done());
957 }
958
959 #[test]
960 fn test_manager_context_insert_duplicate() {
961 let ctx = new_test_manager_context();
962 let meta = Arc::new(test_util::procedure_meta_for_test());
963
964 assert!(ctx.try_insert_procedure(meta.clone()));
965 assert!(!ctx.try_insert_procedure(meta));
966 }
967
968 fn new_child(parent_id: ProcedureId, ctx: &ManagerContext) -> ProcedureMetaRef {
969 let mut child = test_util::procedure_meta_for_test();
970 child.parent_id = Some(parent_id);
971 let child = Arc::new(child);
972 assert!(ctx.try_insert_procedure(child.clone()));
973
974 let mut parent = Vec::new();
975 ctx.find_procedures(&[parent_id], &mut parent);
976 parent[0].push_child(child.id);
977
978 child
979 }
980
981 #[test]
982 fn test_procedures_in_tree() {
983 let ctx = new_test_manager_context();
984 let root = Arc::new(test_util::procedure_meta_for_test());
985 assert!(ctx.try_insert_procedure(root.clone()));
986
987 assert_eq!(1, ctx.procedures_in_tree(&root).len());
988
989 let child1 = new_child(root.id, &ctx);
990 let child2 = new_child(root.id, &ctx);
991
992 let child3 = new_child(child1.id, &ctx);
993 let child4 = new_child(child1.id, &ctx);
994
995 let child5 = new_child(child2.id, &ctx);
996
997 let expect = vec![
998 root.id, child1.id, child2.id, child3.id, child4.id, child5.id,
999 ];
1000 assert_eq!(expect, ctx.procedures_in_tree(&root));
1001 }
1002
1003 #[derive(Debug)]
1004 struct ProcedureToLoad {
1005 content: String,
1006 lock_key: LockKey,
1007 poison_keys: PoisonKeys,
1008 }
1009
1010 #[async_trait]
1011 impl Procedure for ProcedureToLoad {
1012 fn type_name(&self) -> &str {
1013 "ProcedureToLoad"
1014 }
1015
1016 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1017 Ok(Status::done())
1018 }
1019
1020 fn dump(&self) -> Result<String> {
1021 Ok(self.content.clone())
1022 }
1023
1024 fn lock_key(&self) -> LockKey {
1025 self.lock_key.clone()
1026 }
1027
1028 fn poison_keys(&self) -> PoisonKeys {
1029 self.poison_keys.clone()
1030 }
1031 }
1032
1033 impl ProcedureToLoad {
1034 fn new(content: &str) -> ProcedureToLoad {
1035 ProcedureToLoad {
1036 content: content.to_string(),
1037 lock_key: LockKey::default(),
1038 poison_keys: PoisonKeys::default(),
1039 }
1040 }
1041
1042 fn loader() -> BoxedProcedureLoader {
1043 let f = |json: &str| {
1044 let procedure = ProcedureToLoad::new(json);
1045 Ok(Box::new(procedure) as _)
1046 };
1047 Box::new(f)
1048 }
1049 }
1050
1051 #[test]
1052 fn test_register_loader() {
1053 let dir = create_temp_dir("register");
1054 let config = ManagerConfig {
1055 parent_path: "data/".to_string(),
1056 max_retry_times: 3,
1057 retry_delay: Duration::from_millis(500),
1058 ..Default::default()
1059 };
1060 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1061 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1062 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1063 manager.manager_ctx.start();
1064
1065 manager
1066 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1067 .unwrap();
1068 let err = manager
1070 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1071 .unwrap_err();
1072 assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
1073 }
1074
1075 #[tokio::test]
1076 async fn test_recover() {
1077 let dir = create_temp_dir("recover");
1078 let object_store = test_util::new_object_store(&dir);
1079 let config = ManagerConfig {
1080 parent_path: "data/".to_string(),
1081 max_retry_times: 3,
1082 retry_delay: Duration::from_millis(500),
1083 ..Default::default()
1084 };
1085 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1086 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1087 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1088 manager.manager_ctx.start();
1089
1090 manager
1091 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1092 .unwrap();
1093
1094 let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1096 let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
1097 let root_id = ProcedureId::random();
1098 for step in 0..3 {
1100 let type_name = root.type_name().to_string();
1101 let data = root.dump().unwrap();
1102 procedure_store
1103 .store_procedure(root_id, step, type_name, data, None)
1104 .await
1105 .unwrap();
1106 }
1107
1108 let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure"));
1109 let child_id = ProcedureId::random();
1110 for step in 0..2 {
1112 let type_name = child.type_name().to_string();
1113 let data = child.dump().unwrap();
1114 procedure_store
1115 .store_procedure(child_id, step, type_name, data, Some(root_id))
1116 .await
1117 .unwrap();
1118 }
1119
1120 manager.recover().await.unwrap();
1122
1123 let _ = manager.procedure_state(root_id).await.unwrap().unwrap();
1125 assert!(manager.procedure_state(child_id).await.unwrap().is_none());
1128 }
1129
1130 #[tokio::test]
1131 async fn test_submit_procedure() {
1132 let dir = create_temp_dir("submit");
1133 let config = ManagerConfig {
1134 parent_path: "data/".to_string(),
1135 max_retry_times: 3,
1136 retry_delay: Duration::from_millis(500),
1137 ..Default::default()
1138 };
1139 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1140 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1141 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1142 manager.manager_ctx.start();
1143
1144 let procedure_id = ProcedureId::random();
1145 assert!(manager
1146 .procedure_state(procedure_id)
1147 .await
1148 .unwrap()
1149 .is_none());
1150 assert!(manager.procedure_watcher(procedure_id).is_none());
1151
1152 let mut procedure = ProcedureToLoad::new("submit");
1153 procedure.lock_key = LockKey::single_exclusive("test.submit");
1154 assert!(manager
1155 .submit(ProcedureWithId {
1156 id: procedure_id,
1157 procedure: Box::new(procedure),
1158 })
1159 .await
1160 .is_ok());
1161 assert!(manager
1162 .procedure_state(procedure_id)
1163 .await
1164 .unwrap()
1165 .is_some());
1166 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1168 watcher.changed().await.unwrap();
1169 assert!(watcher.borrow().is_done());
1170
1171 let err = manager
1173 .submit(ProcedureWithId {
1174 id: procedure_id,
1175 procedure: Box::new(ProcedureToLoad::new("submit")),
1176 })
1177 .await
1178 .unwrap_err();
1179 assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}");
1180 }
1181
1182 #[tokio::test]
1183 async fn test_state_changed_on_err() {
1184 let dir = create_temp_dir("on_err");
1185 let config = ManagerConfig {
1186 parent_path: "data/".to_string(),
1187 max_retry_times: 3,
1188 retry_delay: Duration::from_millis(500),
1189 ..Default::default()
1190 };
1191 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1192 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1193 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1194 manager.manager_ctx.start();
1195
1196 #[derive(Debug)]
1197 struct MockProcedure {
1198 panic: bool,
1199 }
1200
1201 #[async_trait]
1202 impl Procedure for MockProcedure {
1203 fn type_name(&self) -> &str {
1204 "MockProcedure"
1205 }
1206
1207 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1208 if self.panic {
1209 panic!();
1212 } else {
1213 Err(Error::external(MockError::new(StatusCode::Unexpected)))
1214 }
1215 }
1216
1217 async fn rollback(&mut self, _: &Context) -> Result<()> {
1218 Ok(())
1219 }
1220
1221 fn rollback_supported(&self) -> bool {
1222 true
1223 }
1224
1225 fn dump(&self) -> Result<String> {
1226 Ok(String::new())
1227 }
1228
1229 fn lock_key(&self) -> LockKey {
1230 LockKey::single_exclusive("test.submit")
1231 }
1232
1233 fn poison_keys(&self) -> PoisonKeys {
1234 PoisonKeys::default()
1235 }
1236 }
1237
1238 let check_procedure = |procedure| async {
1239 let procedure_id = ProcedureId::random();
1240 manager
1241 .submit(ProcedureWithId {
1242 id: procedure_id,
1243 procedure: Box::new(procedure),
1244 })
1245 .await
1246 .unwrap()
1247 };
1248
1249 let mut watcher = check_procedure(MockProcedure { panic: false }).await;
1250 watcher.changed().await.unwrap();
1252 assert!(watcher.borrow().is_prepare_rollback());
1253 watcher.changed().await.unwrap();
1254 assert!(watcher.borrow().is_rolling_back());
1255 watcher.changed().await.unwrap();
1256 assert!(watcher.borrow().is_failed());
1257 let mut watcher = check_procedure(MockProcedure { panic: true }).await;
1259 watcher.changed().await.unwrap();
1260 assert!(watcher.borrow().is_failed());
1261 }
1262
1263 #[tokio::test]
1264 async fn test_procedure_manager_stopped() {
1265 let dir = create_temp_dir("procedure_manager_stopped");
1266 let config = ManagerConfig {
1267 parent_path: "data/".to_string(),
1268 max_retry_times: 3,
1269 retry_delay: Duration::from_millis(500),
1270 ..Default::default()
1271 };
1272 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1273 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1274 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1275
1276 let mut procedure = ProcedureToLoad::new("submit");
1277 procedure.lock_key = LockKey::single_exclusive("test.submit");
1278 let procedure_id = ProcedureId::random();
1279 assert_matches!(
1280 manager
1281 .submit(ProcedureWithId {
1282 id: procedure_id,
1283 procedure: Box::new(procedure),
1284 })
1285 .await
1286 .unwrap_err(),
1287 error::Error::ManagerNotStart { .. }
1288 );
1289 }
1290
1291 #[tokio::test]
1292 async fn test_procedure_manager_restart() {
1293 let dir = create_temp_dir("procedure_manager_restart");
1294 let config = ManagerConfig {
1295 parent_path: "data/".to_string(),
1296 max_retry_times: 3,
1297 retry_delay: Duration::from_millis(500),
1298 ..Default::default()
1299 };
1300 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1301 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1302 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1303
1304 manager.start().await.unwrap();
1305 manager.stop().await.unwrap();
1306 manager.start().await.unwrap();
1307
1308 let mut procedure = ProcedureToLoad::new("submit");
1309 procedure.lock_key = LockKey::single_exclusive("test.submit");
1310 let procedure_id = ProcedureId::random();
1311 assert!(manager
1312 .submit(ProcedureWithId {
1313 id: procedure_id,
1314 procedure: Box::new(procedure),
1315 })
1316 .await
1317 .is_ok());
1318 assert!(manager
1319 .procedure_state(procedure_id)
1320 .await
1321 .unwrap()
1322 .is_some());
1323 }
1324
1325 #[tokio::test(flavor = "multi_thread")]
1326 async fn test_remove_outdated_meta_task() {
1327 let dir = create_temp_dir("remove_outdated_meta_task");
1328 let object_store = test_util::new_object_store(&dir);
1329 let config = ManagerConfig {
1330 parent_path: "data/".to_string(),
1331 max_retry_times: 3,
1332 retry_delay: Duration::from_millis(500),
1333 remove_outdated_meta_task_interval: Duration::from_millis(1),
1334 remove_outdated_meta_ttl: Duration::from_millis(1),
1335 max_running_procedures: 128,
1336 };
1337 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1338 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1339 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1340 manager.manager_ctx.set_running();
1341
1342 let mut procedure = ProcedureToLoad::new("submit");
1343 procedure.lock_key = LockKey::single_exclusive("test.submit");
1344 let procedure_id = ProcedureId::random();
1345 assert!(manager
1346 .submit(ProcedureWithId {
1347 id: procedure_id,
1348 procedure: Box::new(procedure),
1349 })
1350 .await
1351 .is_ok());
1352 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1353 watcher.changed().await.unwrap();
1354
1355 manager.start().await.unwrap();
1356 tokio::time::sleep(Duration::from_millis(300)).await;
1357 assert!(manager
1358 .procedure_state(procedure_id)
1359 .await
1360 .unwrap()
1361 .is_none());
1362
1363 manager.stop().await.unwrap();
1365 let mut procedure = ProcedureToLoad::new("submit");
1366 procedure.lock_key = LockKey::single_exclusive("test.submit");
1367 let procedure_id = ProcedureId::random();
1368
1369 manager.manager_ctx.set_running();
1370 assert!(manager
1371 .submit(ProcedureWithId {
1372 id: procedure_id,
1373 procedure: Box::new(procedure),
1374 })
1375 .await
1376 .is_ok());
1377 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1378 watcher.changed().await.unwrap();
1379 tokio::time::sleep(Duration::from_millis(300)).await;
1380 assert!(manager
1381 .procedure_state(procedure_id)
1382 .await
1383 .unwrap()
1384 .is_some());
1385
1386 let mut procedure = ProcedureToLoad::new("submit");
1388 procedure.lock_key = LockKey::single_exclusive("test.submit");
1389 let procedure_id = ProcedureId::random();
1390 assert!(manager
1391 .submit(ProcedureWithId {
1392 id: procedure_id,
1393 procedure: Box::new(procedure),
1394 })
1395 .await
1396 .is_ok());
1397 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1398 watcher.changed().await.unwrap();
1399
1400 manager.start().await.unwrap();
1401 tokio::time::sleep(Duration::from_millis(300)).await;
1402 assert!(manager
1403 .procedure_state(procedure_id)
1404 .await
1405 .unwrap()
1406 .is_none());
1407 }
1408
1409 #[tokio::test]
1410 async fn test_too_many_running_procedures() {
1411 let dir = create_temp_dir("too_many_running_procedures");
1412 let config = ManagerConfig {
1413 parent_path: "data/".to_string(),
1414 max_retry_times: 3,
1415 retry_delay: Duration::from_millis(500),
1416 max_running_procedures: 1,
1417 ..Default::default()
1418 };
1419 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1420 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1421 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1422 manager.manager_ctx.set_running();
1423
1424 manager
1425 .manager_ctx
1426 .running_procedures
1427 .lock()
1428 .unwrap()
1429 .insert(ProcedureId::random());
1430 manager.start().await.unwrap();
1431
1432 let mut procedure = ProcedureToLoad::new("submit");
1434 procedure.lock_key = LockKey::single_exclusive("test.submit");
1435 let procedure_id = ProcedureId::random();
1436 let err = manager
1437 .submit(ProcedureWithId {
1438 id: procedure_id,
1439 procedure: Box::new(procedure),
1440 })
1441 .await
1442 .unwrap_err();
1443 assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
1444
1445 manager
1446 .manager_ctx
1447 .running_procedures
1448 .lock()
1449 .unwrap()
1450 .clear();
1451
1452 let mut procedure = ProcedureToLoad::new("submit");
1454 procedure.lock_key = LockKey::single_exclusive("test.submit");
1455 assert!(manager
1456 .submit(ProcedureWithId {
1457 id: procedure_id,
1458 procedure: Box::new(procedure),
1459 })
1460 .await
1461 .is_ok());
1462 assert!(manager
1463 .procedure_state(procedure_id)
1464 .await
1465 .unwrap()
1466 .is_some());
1467 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1469 watcher.changed().await.unwrap();
1470 assert!(watcher.borrow().is_done());
1471 }
1472
1473 #[derive(Debug)]
1474 struct ProcedureToRecover {
1475 content: String,
1476 lock_key: LockKey,
1477 notify: Option<Arc<Notify>>,
1478 poison_keys: PoisonKeys,
1479 }
1480
1481 #[async_trait]
1482 impl Procedure for ProcedureToRecover {
1483 fn type_name(&self) -> &str {
1484 "ProcedureToRecover"
1485 }
1486
1487 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1488 Ok(Status::done())
1489 }
1490
1491 fn dump(&self) -> Result<String> {
1492 Ok(self.content.clone())
1493 }
1494
1495 fn lock_key(&self) -> LockKey {
1496 self.lock_key.clone()
1497 }
1498
1499 fn recover(&mut self) -> Result<()> {
1500 self.notify.as_ref().unwrap().notify_one();
1501 Ok(())
1502 }
1503
1504 fn poison_keys(&self) -> PoisonKeys {
1505 self.poison_keys.clone()
1506 }
1507 }
1508
1509 impl ProcedureToRecover {
1510 fn new(content: &str) -> ProcedureToRecover {
1511 ProcedureToRecover {
1512 content: content.to_string(),
1513 lock_key: LockKey::default(),
1514 poison_keys: PoisonKeys::default(),
1515 notify: None,
1516 }
1517 }
1518
1519 fn loader(notify: Arc<Notify>) -> BoxedProcedureLoader {
1520 let f = move |json: &str| {
1521 let procedure = ProcedureToRecover {
1522 content: json.to_string(),
1523 lock_key: LockKey::default(),
1524 poison_keys: PoisonKeys::default(),
1525 notify: Some(notify.clone()),
1526 };
1527 Ok(Box::new(procedure) as _)
1528 };
1529 Box::new(f)
1530 }
1531 }
1532
1533 #[tokio::test]
1534 async fn test_procedure_recover() {
1535 common_telemetry::init_default_ut_logging();
1536 let dir = create_temp_dir("procedure_recover");
1537 let object_store = test_util::new_object_store(&dir);
1538 let config = ManagerConfig {
1539 parent_path: "data/".to_string(),
1540 max_retry_times: 3,
1541 retry_delay: Duration::from_millis(500),
1542 ..Default::default()
1543 };
1544 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1545 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1546 let manager = LocalManager::new(config, state_store, poison_manager, None, None);
1547 manager.manager_ctx.start();
1548
1549 let notify = Arc::new(Notify::new());
1550 manager
1551 .register_loader(
1552 "ProcedureToRecover",
1553 ProcedureToRecover::loader(notify.clone()),
1554 )
1555 .unwrap();
1556
1557 let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1559 let root: BoxedProcedure = Box::new(ProcedureToRecover::new("test procedure recovery"));
1560 let root_id = ProcedureId::random();
1561 for step in 0..3 {
1563 let type_name = root.type_name().to_string();
1564 let data = root.dump().unwrap();
1565 procedure_store
1566 .store_procedure(root_id, step, type_name, data, None)
1567 .await
1568 .unwrap();
1569 }
1570
1571 manager.recover().await.unwrap();
1573 timeout(Duration::from_secs(10), notify.notified())
1574 .await
1575 .unwrap();
1576 }
1577}