1mod runner;
16
17use std::collections::hash_map::Entry;
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20use std::sync::{Arc, Mutex, RwLock};
21use std::time::{Duration, Instant};
22
23use async_trait::async_trait;
24use backon::ExponentialBuilder;
25use common_error::ext::BoxedError;
26use common_runtime::{RepeatedTask, TaskFunction};
27use common_telemetry::tracing_context::{FutureExt, TracingContext};
28use common_telemetry::{error, info, tracing};
29use snafu::{ensure, OptionExt, ResultExt};
30use tokio::sync::watch::{self, Receiver, Sender};
31use tokio::sync::{Mutex as TokioMutex, Notify};
32
33use crate::error::{
34 self, CheckStatusSnafu, DuplicateProcedureSnafu, Error, LoaderConflictSnafu,
35 ManagerNotStartSnafu, ManagerPasuedSnafu, PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu,
36 Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
37 TooManyRunningProceduresSnafu,
38};
39use crate::local::runner::Runner;
40use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
41use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
42use crate::store::poison_store::PoisonStoreRef;
43use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
44use crate::{
45 BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
46 ProcedureState, ProcedureWithId, StringKey, Watcher,
47};
48
49const META_TTL: Duration = Duration::from_secs(60 * 10);
51
52#[derive(Debug)]
61pub(crate) struct ProcedureMeta {
62 id: ProcedureId,
64 type_name: String,
66 parent_id: Option<ProcedureId>,
68 child_notify: Notify,
70 lock_key: LockKey,
72 poison_keys: PoisonKeys,
74 state_sender: Sender<ProcedureState>,
76 state_receiver: Receiver<ProcedureState>,
78 children: Mutex<Vec<ProcedureId>>,
80 start_time_ms: AtomicI64,
82 end_time_ms: AtomicI64,
84}
85
86impl ProcedureMeta {
87 fn new(
88 id: ProcedureId,
89 procedure_state: ProcedureState,
90 parent_id: Option<ProcedureId>,
91 lock_key: LockKey,
92 poison_keys: PoisonKeys,
93 type_name: &str,
94 ) -> ProcedureMeta {
95 let (state_sender, state_receiver) = watch::channel(procedure_state);
96 ProcedureMeta {
97 id,
98 parent_id,
99 child_notify: Notify::new(),
100 lock_key,
101 poison_keys,
102 state_sender,
103 state_receiver,
104 children: Mutex::new(Vec::new()),
105 start_time_ms: AtomicI64::new(0),
106 end_time_ms: AtomicI64::new(0),
107 type_name: type_name.to_string(),
108 }
109 }
110
111 fn state(&self) -> ProcedureState {
113 self.state_receiver.borrow().clone()
114 }
115
116 fn set_state(&self, state: ProcedureState) {
118 self.state_sender.send(state).unwrap();
120 }
121
122 fn push_child(&self, procedure_id: ProcedureId) {
124 let mut children = self.children.lock().unwrap();
125 children.push(procedure_id);
126 }
127
128 fn list_children(&self, buffer: &mut Vec<ProcedureId>) {
130 let children = self.children.lock().unwrap();
131 buffer.extend_from_slice(&children);
132 }
133
134 fn num_children(&self) -> usize {
136 self.children.lock().unwrap().len()
137 }
138
139 fn set_start_time_ms(&self) {
141 self.start_time_ms
142 .store(common_time::util::current_time_millis(), Ordering::Relaxed);
143 }
144
145 fn set_end_time_ms(&self) {
147 self.end_time_ms
148 .store(common_time::util::current_time_millis(), Ordering::Relaxed);
149 }
150}
151
152type ProcedureMetaRef = Arc<ProcedureMeta>;
154
155struct LoadedProcedure {
157 procedure: BoxedProcedure,
158 step: u32,
159}
160
161pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
168
169pub async fn acquire_dynamic_key_lock(
174 lock: &DynamicKeyLock,
175 key: &StringKey,
176) -> DynamicKeyLockGuard {
177 match key {
178 StringKey::Share(key) => {
179 let guard = lock.read(key.to_string()).await;
180 DynamicKeyLockGuard {
181 guard: Some(OwnedKeyRwLockGuard::from(guard)),
182 key: key.to_string(),
183 lock: lock.clone(),
184 }
185 }
186 StringKey::Exclusive(key) => {
187 let guard = lock.write(key.to_string()).await;
188 DynamicKeyLockGuard {
189 guard: Some(OwnedKeyRwLockGuard::from(guard)),
190 key: key.to_string(),
191 lock: lock.clone(),
192 }
193 }
194 }
195}
196pub struct DynamicKeyLockGuard {
201 guard: Option<OwnedKeyRwLockGuard>,
202 key: String,
203 lock: DynamicKeyLock,
204}
205
206impl Drop for DynamicKeyLockGuard {
207 fn drop(&mut self) {
208 if let Some(guard) = self.guard.take() {
209 drop(guard);
210 }
211 self.lock.clean_keys(&[self.key.to_string()]);
212 }
213}
214
215pub(crate) struct ManagerContext {
217 loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
219 key_lock: KeyRwLock<String>,
225 dynamic_key_lock: DynamicKeyLock,
232 procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
234 running_procedures: Mutex<HashSet<ProcedureId>>,
236 finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
238 running: Arc<AtomicBool>,
240 poison_manager: PoisonStoreRef,
242}
243
244#[async_trait]
245impl ContextProvider for ManagerContext {
246 async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
247 Ok(self.state(procedure_id))
248 }
249
250 async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
251 {
252 let procedures = self.procedures.read().unwrap();
254 let procedure = procedures
255 .get(&procedure_id)
256 .context(ProcedureNotFoundSnafu { procedure_id })?;
257
258 ensure!(
260 procedure.poison_keys.contains(key),
261 PoisonKeyNotDefinedSnafu {
262 key: key.clone(),
263 procedure_id
264 }
265 );
266 }
267 let key = key.to_string();
268 let procedure_id = procedure_id.to_string();
269 self.poison_manager.try_put_poison(key, procedure_id).await
270 }
271
272 async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
273 acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
274 }
275}
276
277impl ManagerContext {
278 fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
280 ManagerContext {
281 key_lock: KeyRwLock::new(),
282 dynamic_key_lock: Arc::new(KeyRwLock::new()),
283 loaders: Mutex::new(HashMap::new()),
284 procedures: RwLock::new(HashMap::new()),
285 running_procedures: Mutex::new(HashSet::new()),
286 finished_procedures: Mutex::new(VecDeque::new()),
287 running: Arc::new(AtomicBool::new(false)),
288 poison_manager,
289 }
290 }
291
292 #[cfg(test)]
293 pub(crate) fn set_running(&self) {
294 self.running.store(true, Ordering::Relaxed);
295 }
296
297 pub(crate) fn start(&self) {
299 self.running.store(true, Ordering::Relaxed);
300 }
301
302 pub(crate) fn stop(&self) {
303 self.running.store(false, Ordering::Relaxed);
304 }
305
306 pub(crate) fn running(&self) -> bool {
308 self.running.load(Ordering::Relaxed)
309 }
310
311 fn contains_procedure(&self, procedure_id: ProcedureId) -> bool {
313 let procedures = self.procedures.read().unwrap();
314 procedures.contains_key(&procedure_id)
315 }
316
317 fn num_running_procedures(&self) -> usize {
319 self.running_procedures.lock().unwrap().len()
320 }
321
322 fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
327 let procedure_id = meta.id;
328 let mut procedures = self.procedures.write().unwrap();
329 match procedures.entry(procedure_id) {
330 Entry::Occupied(_) => return false,
331 Entry::Vacant(vacant_entry) => {
332 vacant_entry.insert(meta);
333 }
334 }
335
336 let mut running_procedures = self.running_procedures.lock().unwrap();
337 running_procedures.insert(procedure_id);
338
339 true
340 }
341
342 fn state(&self, procedure_id: ProcedureId) -> Option<ProcedureState> {
344 let procedures = self.procedures.read().unwrap();
345 procedures.get(&procedure_id).map(|meta| meta.state())
346 }
347
348 fn list_procedure(&self) -> Vec<ProcedureInfo> {
350 let procedures = self.procedures.read().unwrap();
351 procedures
352 .values()
353 .map(|meta| ProcedureInfo {
354 id: meta.id,
355 type_name: meta.type_name.clone(),
356 start_time_ms: meta.start_time_ms.load(Ordering::Relaxed),
357 end_time_ms: meta.end_time_ms.load(Ordering::Relaxed),
358 state: meta.state(),
359 lock_keys: meta.lock_key.get_keys(),
360 })
361 .collect()
362 }
363
364 fn watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
366 let procedures = self.procedures.read().unwrap();
367 procedures
368 .get(&procedure_id)
369 .map(|meta| meta.state_receiver.clone())
370 }
371
372 fn notify_by_subprocedure(&self, procedure_id: ProcedureId) {
374 let procedures = self.procedures.read().unwrap();
375 if let Some(meta) = procedures.get(&procedure_id) {
376 meta.child_notify.notify_one();
377 }
378 }
379
380 fn load_one_procedure_from_message(
382 &self,
383 procedure_id: ProcedureId,
384 message: &ProcedureMessage,
385 ) -> Option<LoadedProcedure> {
386 let loaders = self.loaders.lock().unwrap();
387 let loader = loaders.get(&message.type_name).or_else(|| {
388 error!(
389 "Loader not found, procedure_id: {}, type_name: {}",
390 procedure_id, message.type_name
391 );
392 None
393 })?;
394
395 let procedure = loader(&message.data)
396 .map_err(|e| {
397 error!(
398 "Failed to load procedure data, key: {}, source: {:?}",
399 procedure_id, e
400 );
401 e
402 })
403 .ok()?;
404
405 Some(LoadedProcedure {
406 procedure,
407 step: message.step,
408 })
409 }
410
411 fn procedures_in_tree(&self, root: &ProcedureMetaRef) -> Vec<ProcedureId> {
416 let sub_num = root.num_children();
417 let mut procedures = Vec::with_capacity(1 + sub_num);
419
420 let mut queue = VecDeque::with_capacity(1 + sub_num);
421 queue.push_back(root.clone());
423
424 let mut children_ids = Vec::with_capacity(sub_num);
425 let mut children = Vec::with_capacity(sub_num);
426 while let Some(meta) = queue.pop_front() {
427 procedures.push(meta.id);
428
429 children_ids.clear();
431 meta.list_children(&mut children_ids);
432 self.find_procedures(&children_ids, &mut children);
433
434 for child in children.drain(..) {
436 queue.push_back(child);
437 }
438 }
439
440 procedures
441 }
442
443 fn find_procedures(&self, procedure_ids: &[ProcedureId], metas: &mut Vec<ProcedureMetaRef>) {
447 let procedures = self.procedures.read().unwrap();
448 for procedure_id in procedure_ids {
449 if let Some(meta) = procedures.get(procedure_id) {
450 metas.push(meta.clone());
451 }
452 }
453 }
454
455 fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
457 let now = Instant::now();
460 let mut finished_procedures = self.finished_procedures.lock().unwrap();
461 finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
462
463 let mut running_procedures = self.running_procedures.lock().unwrap();
465 for procedure_id in procedure_ids {
466 running_procedures.remove(procedure_id);
467 }
468 }
469
470 fn remove_outdated_meta(&self, ttl: Duration) {
472 let ids = {
473 let mut finished_procedures = self.finished_procedures.lock().unwrap();
474 if finished_procedures.is_empty() {
475 return;
476 }
477
478 let mut ids_to_remove = Vec::new();
479 while let Some((id, finish_time)) = finished_procedures.front() {
480 if finish_time.elapsed() > ttl {
481 ids_to_remove.push(*id);
482 let _ = finished_procedures.pop_front();
483 } else {
484 break;
487 }
488 }
489 ids_to_remove
490 };
491
492 if ids.is_empty() {
493 return;
494 }
495
496 let mut procedures = self.procedures.write().unwrap();
497 for id in ids {
498 let _ = procedures.remove(&id);
499 }
500 }
501}
502
503#[derive(Debug)]
505pub struct ManagerConfig {
506 pub parent_path: String,
507 pub max_retry_times: usize,
508 pub retry_delay: Duration,
509 pub remove_outdated_meta_task_interval: Duration,
510 pub remove_outdated_meta_ttl: Duration,
511 pub max_running_procedures: usize,
512}
513
514impl Default for ManagerConfig {
515 fn default() -> Self {
516 Self {
517 parent_path: String::default(),
518 max_retry_times: 3,
519 retry_delay: Duration::from_millis(500),
520 remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
521 remove_outdated_meta_ttl: META_TTL,
522 max_running_procedures: 128,
523 }
524 }
525}
526
527type PauseAwareRef = Arc<dyn PauseAware>;
528
529#[async_trait]
530pub trait PauseAware: Send + Sync {
531 async fn is_paused(&self) -> std::result::Result<bool, BoxedError>;
533}
534
535pub struct LocalManager {
537 manager_ctx: Arc<ManagerContext>,
538 procedure_store: Arc<ProcedureStore>,
539 max_retry_times: usize,
540 retry_delay: Duration,
541 remove_outdated_meta_task: TokioMutex<Option<RepeatedTask<Error>>>,
543 config: ManagerConfig,
544 pause_aware: Option<PauseAwareRef>,
545}
546
547impl LocalManager {
548 pub fn new(
550 config: ManagerConfig,
551 state_store: StateStoreRef,
552 poison_store: PoisonStoreRef,
553 pause_aware: Option<PauseAwareRef>,
554 ) -> LocalManager {
555 let manager_ctx = Arc::new(ManagerContext::new(poison_store));
556
557 LocalManager {
558 manager_ctx,
559 procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)),
560 max_retry_times: config.max_retry_times,
561 retry_delay: config.retry_delay,
562 remove_outdated_meta_task: TokioMutex::new(None),
563 config,
564 pause_aware,
565 }
566 }
567
568 pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask<Error> {
570 RepeatedTask::new(
571 self.config.remove_outdated_meta_task_interval,
572 Box::new(RemoveOutdatedMetaFunction {
573 manager_ctx: self.manager_ctx.clone(),
574 ttl: self.config.remove_outdated_meta_ttl,
575 }),
576 )
577 }
578
579 fn submit_root(
581 &self,
582 procedure_id: ProcedureId,
583 procedure_state: ProcedureState,
584 step: u32,
585 procedure: BoxedProcedure,
586 ) -> Result<Watcher> {
587 ensure!(self.manager_ctx.running(), ManagerNotStartSnafu);
588
589 let meta = Arc::new(ProcedureMeta::new(
590 procedure_id,
591 procedure_state,
592 None,
593 procedure.lock_key(),
594 procedure.poison_keys(),
595 procedure.type_name(),
596 ));
597 let runner = Runner {
598 meta: meta.clone(),
599 procedure,
600 manager_ctx: self.manager_ctx.clone(),
601 step,
602 exponential_builder: ExponentialBuilder::default()
603 .with_min_delay(self.retry_delay)
604 .with_max_times(self.max_retry_times),
605 store: self.procedure_store.clone(),
606 rolling_back: false,
607 };
608
609 let watcher = meta.state_receiver.clone();
610
611 ensure!(
612 self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
613 TooManyRunningProceduresSnafu {
614 max_running_procedures: self.config.max_running_procedures,
615 }
616 );
617
618 ensure!(
620 self.manager_ctx.try_insert_procedure(meta),
621 DuplicateProcedureSnafu { procedure_id },
622 );
623
624 let tracing_context = TracingContext::from_current_span();
625
626 let _handle = common_runtime::spawn_global(async move {
627 runner
631 .run()
632 .trace(
633 tracing_context
634 .attach(tracing::info_span!("LocalManager::submit_root_procedure")),
635 )
636 .await;
637 });
638
639 Ok(watcher)
640 }
641
642 fn submit_recovered_messages(
643 &self,
644 messages: HashMap<ProcedureId, ProcedureMessage>,
645 init_state: InitProcedureState,
646 ) {
647 for (procedure_id, message) in &messages {
648 if message.parent_id.is_none() {
649 let Some(mut loaded_procedure) = self
652 .manager_ctx
653 .load_one_procedure_from_message(*procedure_id, message)
654 else {
655 continue;
657 };
658
659 info!(
660 "Recover root procedure {}-{}, step: {}",
661 loaded_procedure.procedure.type_name(),
662 procedure_id,
663 loaded_procedure.step
664 );
665
666 let procedure_state = match init_state {
667 InitProcedureState::RollingBack => ProcedureState::RollingBack {
668 error: Arc::new(
669 error::RollbackProcedureRecoveredSnafu {
670 error: message.error.clone().unwrap_or("Unknown error".to_string()),
671 }
672 .build(),
673 ),
674 },
675 InitProcedureState::Running => ProcedureState::Running,
676 };
677
678 if let Err(e) = loaded_procedure.procedure.recover() {
679 error!(e; "Failed to recover procedure {}", procedure_id);
680 }
681
682 if let Err(e) = self.submit_root(
683 *procedure_id,
684 procedure_state,
685 loaded_procedure.step,
686 loaded_procedure.procedure,
687 ) {
688 error!(e; "Failed to recover procedure {}", procedure_id);
689 }
690 }
691 }
692 }
693
694 async fn recover(&self) -> Result<()> {
696 info!("LocalManager start to recover");
697 let recover_start = Instant::now();
698
699 let ProcedureMessages {
700 messages,
701 rollback_messages,
702 finished_ids,
703 } = self.procedure_store.load_messages().await?;
704 self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack);
706 self.submit_recovered_messages(messages, InitProcedureState::Running);
707
708 if !finished_ids.is_empty() {
709 info!(
710 "LocalManager try to clean finished procedures, num: {}",
711 finished_ids.len()
712 );
713
714 for procedure_id in finished_ids {
715 if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
716 error!(e; "Failed to delete procedure {}", procedure_id);
717 }
718 }
719 }
720
721 info!(
722 "LocalManager finish recovery, cost: {}ms",
723 recover_start.elapsed().as_millis()
724 );
725
726 Ok(())
727 }
728
729 #[cfg(any(test, feature = "testing"))]
730 pub fn contains_loader(&self, name: &str) -> bool {
732 let loaders = self.manager_ctx.loaders.lock().unwrap();
733 loaders.contains_key(name)
734 }
735
736 async fn check_status(&self) -> Result<()> {
737 if let Some(pause_aware) = self.pause_aware.as_ref() {
738 ensure!(
739 !pause_aware.is_paused().await.context(CheckStatusSnafu)?,
740 ManagerPasuedSnafu
741 );
742 }
743
744 Ok(())
745 }
746}
747
748#[async_trait]
749impl ProcedureManager for LocalManager {
750 fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> {
751 let mut loaders = self.manager_ctx.loaders.lock().unwrap();
752 ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name });
753
754 let _ = loaders.insert(name.to_string(), loader);
755
756 Ok(())
757 }
758
759 async fn start(&self) -> Result<()> {
760 let mut task = self.remove_outdated_meta_task.lock().await;
761
762 if task.is_some() {
763 return Ok(());
764 }
765
766 let task_inner = self.build_remove_outdated_meta_task();
767
768 task_inner
769 .start(common_runtime::global_runtime())
770 .context(StartRemoveOutdatedMetaTaskSnafu)?;
771
772 *task = Some(task_inner);
773
774 self.manager_ctx.start();
775
776 info!("LocalManager is start.");
777
778 self.recover().await
779 }
780
781 async fn stop(&self) -> Result<()> {
782 let mut task = self.remove_outdated_meta_task.lock().await;
783
784 if let Some(task) = task.take() {
785 task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?;
786 }
787
788 self.manager_ctx.stop();
789
790 info!("LocalManager is stopped.");
791
792 Ok(())
793 }
794
795 async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher> {
796 let procedure_id = procedure.id;
797 ensure!(
798 !self.manager_ctx.contains_procedure(procedure_id),
799 DuplicateProcedureSnafu { procedure_id }
800 );
801 self.check_status().await?;
802
803 self.submit_root(
804 procedure.id,
805 ProcedureState::Running,
806 0,
807 procedure.procedure,
808 )
809 }
810
811 async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
812 Ok(self.manager_ctx.state(procedure_id))
813 }
814
815 fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
816 self.manager_ctx.watcher(procedure_id)
817 }
818
819 async fn list_procedures(&self) -> Result<Vec<ProcedureInfo>> {
820 Ok(self.manager_ctx.list_procedure())
821 }
822}
823
824struct RemoveOutdatedMetaFunction {
825 manager_ctx: Arc<ManagerContext>,
826 ttl: Duration,
827}
828
829#[async_trait::async_trait]
830impl TaskFunction<Error> for RemoveOutdatedMetaFunction {
831 fn name(&self) -> &str {
832 "ProcedureManager-remove-outdated-meta-task"
833 }
834
835 async fn call(&mut self) -> Result<()> {
836 self.manager_ctx.remove_outdated_meta(self.ttl);
837 Ok(())
838 }
839}
840
841#[cfg(test)]
843pub(crate) mod test_util {
844 use common_test_util::temp_dir::TempDir;
845 use object_store::services::Fs as Builder;
846 use object_store::ObjectStore;
847
848 use super::*;
849
850 pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
851 ProcedureMeta::new(
852 ProcedureId::random(),
853 ProcedureState::Running,
854 None,
855 LockKey::default(),
856 PoisonKeys::default(),
857 "ProcedureAdapter",
858 )
859 }
860
861 pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
862 let store_dir = dir.path().to_str().unwrap();
863 let builder = Builder::default();
864 ObjectStore::new(builder.root(store_dir)).unwrap().finish()
865 }
866}
867
868#[cfg(test)]
869mod tests {
870 use std::assert_matches::assert_matches;
871
872 use common_error::mock::MockError;
873 use common_error::status_code::StatusCode;
874 use common_test_util::temp_dir::create_temp_dir;
875 use tokio::time::timeout;
876
877 use super::*;
878 use crate::error::{self, Error};
879 use crate::store::state_store::ObjectStateStore;
880 use crate::test_util::InMemoryPoisonStore;
881 use crate::{Context, Procedure, Status};
882
883 fn new_test_manager_context() -> ManagerContext {
884 let poison_manager = Arc::new(InMemoryPoisonStore::default());
885 ManagerContext::new(poison_manager)
886 }
887
888 #[test]
889 fn test_manager_context() {
890 let ctx = new_test_manager_context();
891 let meta = Arc::new(test_util::procedure_meta_for_test());
892
893 assert!(!ctx.contains_procedure(meta.id));
894 assert!(ctx.state(meta.id).is_none());
895
896 assert!(ctx.try_insert_procedure(meta.clone()));
897 assert!(ctx.contains_procedure(meta.id));
898
899 assert!(ctx.state(meta.id).unwrap().is_running());
900 meta.set_state(ProcedureState::Done { output: None });
901 assert!(ctx.state(meta.id).unwrap().is_done());
902 }
903
904 #[test]
905 fn test_manager_context_insert_duplicate() {
906 let ctx = new_test_manager_context();
907 let meta = Arc::new(test_util::procedure_meta_for_test());
908
909 assert!(ctx.try_insert_procedure(meta.clone()));
910 assert!(!ctx.try_insert_procedure(meta));
911 }
912
913 fn new_child(parent_id: ProcedureId, ctx: &ManagerContext) -> ProcedureMetaRef {
914 let mut child = test_util::procedure_meta_for_test();
915 child.parent_id = Some(parent_id);
916 let child = Arc::new(child);
917 assert!(ctx.try_insert_procedure(child.clone()));
918
919 let mut parent = Vec::new();
920 ctx.find_procedures(&[parent_id], &mut parent);
921 parent[0].push_child(child.id);
922
923 child
924 }
925
926 #[test]
927 fn test_procedures_in_tree() {
928 let ctx = new_test_manager_context();
929 let root = Arc::new(test_util::procedure_meta_for_test());
930 assert!(ctx.try_insert_procedure(root.clone()));
931
932 assert_eq!(1, ctx.procedures_in_tree(&root).len());
933
934 let child1 = new_child(root.id, &ctx);
935 let child2 = new_child(root.id, &ctx);
936
937 let child3 = new_child(child1.id, &ctx);
938 let child4 = new_child(child1.id, &ctx);
939
940 let child5 = new_child(child2.id, &ctx);
941
942 let expect = vec![
943 root.id, child1.id, child2.id, child3.id, child4.id, child5.id,
944 ];
945 assert_eq!(expect, ctx.procedures_in_tree(&root));
946 }
947
948 #[derive(Debug)]
949 struct ProcedureToLoad {
950 content: String,
951 lock_key: LockKey,
952 poison_keys: PoisonKeys,
953 }
954
955 #[async_trait]
956 impl Procedure for ProcedureToLoad {
957 fn type_name(&self) -> &str {
958 "ProcedureToLoad"
959 }
960
961 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
962 Ok(Status::done())
963 }
964
965 fn dump(&self) -> Result<String> {
966 Ok(self.content.clone())
967 }
968
969 fn lock_key(&self) -> LockKey {
970 self.lock_key.clone()
971 }
972
973 fn poison_keys(&self) -> PoisonKeys {
974 self.poison_keys.clone()
975 }
976 }
977
978 impl ProcedureToLoad {
979 fn new(content: &str) -> ProcedureToLoad {
980 ProcedureToLoad {
981 content: content.to_string(),
982 lock_key: LockKey::default(),
983 poison_keys: PoisonKeys::default(),
984 }
985 }
986
987 fn loader() -> BoxedProcedureLoader {
988 let f = |json: &str| {
989 let procedure = ProcedureToLoad::new(json);
990 Ok(Box::new(procedure) as _)
991 };
992 Box::new(f)
993 }
994 }
995
996 #[test]
997 fn test_register_loader() {
998 let dir = create_temp_dir("register");
999 let config = ManagerConfig {
1000 parent_path: "data/".to_string(),
1001 max_retry_times: 3,
1002 retry_delay: Duration::from_millis(500),
1003 ..Default::default()
1004 };
1005 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1006 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1007 let manager = LocalManager::new(config, state_store, poison_manager, None);
1008 manager.manager_ctx.start();
1009
1010 manager
1011 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1012 .unwrap();
1013 let err = manager
1015 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1016 .unwrap_err();
1017 assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
1018 }
1019
1020 #[tokio::test]
1021 async fn test_recover() {
1022 let dir = create_temp_dir("recover");
1023 let object_store = test_util::new_object_store(&dir);
1024 let config = ManagerConfig {
1025 parent_path: "data/".to_string(),
1026 max_retry_times: 3,
1027 retry_delay: Duration::from_millis(500),
1028 ..Default::default()
1029 };
1030 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1031 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1032 let manager = LocalManager::new(config, state_store, poison_manager, None);
1033 manager.manager_ctx.start();
1034
1035 manager
1036 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1037 .unwrap();
1038
1039 let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1041 let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
1042 let root_id = ProcedureId::random();
1043 for step in 0..3 {
1045 let type_name = root.type_name().to_string();
1046 let data = root.dump().unwrap();
1047 procedure_store
1048 .store_procedure(root_id, step, type_name, data, None)
1049 .await
1050 .unwrap();
1051 }
1052
1053 let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure"));
1054 let child_id = ProcedureId::random();
1055 for step in 0..2 {
1057 let type_name = child.type_name().to_string();
1058 let data = child.dump().unwrap();
1059 procedure_store
1060 .store_procedure(child_id, step, type_name, data, Some(root_id))
1061 .await
1062 .unwrap();
1063 }
1064
1065 manager.recover().await.unwrap();
1067
1068 let _ = manager.procedure_state(root_id).await.unwrap().unwrap();
1070 assert!(manager.procedure_state(child_id).await.unwrap().is_none());
1073 }
1074
1075 #[tokio::test]
1076 async fn test_submit_procedure() {
1077 let dir = create_temp_dir("submit");
1078 let config = ManagerConfig {
1079 parent_path: "data/".to_string(),
1080 max_retry_times: 3,
1081 retry_delay: Duration::from_millis(500),
1082 ..Default::default()
1083 };
1084 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1085 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1086 let manager = LocalManager::new(config, state_store, poison_manager, None);
1087 manager.manager_ctx.start();
1088
1089 let procedure_id = ProcedureId::random();
1090 assert!(manager
1091 .procedure_state(procedure_id)
1092 .await
1093 .unwrap()
1094 .is_none());
1095 assert!(manager.procedure_watcher(procedure_id).is_none());
1096
1097 let mut procedure = ProcedureToLoad::new("submit");
1098 procedure.lock_key = LockKey::single_exclusive("test.submit");
1099 assert!(manager
1100 .submit(ProcedureWithId {
1101 id: procedure_id,
1102 procedure: Box::new(procedure),
1103 })
1104 .await
1105 .is_ok());
1106 assert!(manager
1107 .procedure_state(procedure_id)
1108 .await
1109 .unwrap()
1110 .is_some());
1111 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1113 watcher.changed().await.unwrap();
1114 assert!(watcher.borrow().is_done());
1115
1116 let err = manager
1118 .submit(ProcedureWithId {
1119 id: procedure_id,
1120 procedure: Box::new(ProcedureToLoad::new("submit")),
1121 })
1122 .await
1123 .unwrap_err();
1124 assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}");
1125 }
1126
1127 #[tokio::test]
1128 async fn test_state_changed_on_err() {
1129 let dir = create_temp_dir("on_err");
1130 let config = ManagerConfig {
1131 parent_path: "data/".to_string(),
1132 max_retry_times: 3,
1133 retry_delay: Duration::from_millis(500),
1134 ..Default::default()
1135 };
1136 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1137 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1138 let manager = LocalManager::new(config, state_store, poison_manager, None);
1139 manager.manager_ctx.start();
1140
1141 #[derive(Debug)]
1142 struct MockProcedure {
1143 panic: bool,
1144 }
1145
1146 #[async_trait]
1147 impl Procedure for MockProcedure {
1148 fn type_name(&self) -> &str {
1149 "MockProcedure"
1150 }
1151
1152 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1153 if self.panic {
1154 panic!();
1157 } else {
1158 Err(Error::external(MockError::new(StatusCode::Unexpected)))
1159 }
1160 }
1161
1162 async fn rollback(&mut self, _: &Context) -> Result<()> {
1163 Ok(())
1164 }
1165
1166 fn rollback_supported(&self) -> bool {
1167 true
1168 }
1169
1170 fn dump(&self) -> Result<String> {
1171 Ok(String::new())
1172 }
1173
1174 fn lock_key(&self) -> LockKey {
1175 LockKey::single_exclusive("test.submit")
1176 }
1177
1178 fn poison_keys(&self) -> PoisonKeys {
1179 PoisonKeys::default()
1180 }
1181 }
1182
1183 let check_procedure = |procedure| async {
1184 let procedure_id = ProcedureId::random();
1185 manager
1186 .submit(ProcedureWithId {
1187 id: procedure_id,
1188 procedure: Box::new(procedure),
1189 })
1190 .await
1191 .unwrap()
1192 };
1193
1194 let mut watcher = check_procedure(MockProcedure { panic: false }).await;
1195 watcher.changed().await.unwrap();
1197 assert!(watcher.borrow().is_prepare_rollback());
1198 watcher.changed().await.unwrap();
1199 assert!(watcher.borrow().is_rolling_back());
1200 watcher.changed().await.unwrap();
1201 assert!(watcher.borrow().is_failed());
1202 let mut watcher = check_procedure(MockProcedure { panic: true }).await;
1204 watcher.changed().await.unwrap();
1205 assert!(watcher.borrow().is_failed());
1206 }
1207
1208 #[tokio::test]
1209 async fn test_procedure_manager_stopped() {
1210 let dir = create_temp_dir("procedure_manager_stopped");
1211 let config = ManagerConfig {
1212 parent_path: "data/".to_string(),
1213 max_retry_times: 3,
1214 retry_delay: Duration::from_millis(500),
1215 ..Default::default()
1216 };
1217 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1218 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1219 let manager = LocalManager::new(config, state_store, poison_manager, None);
1220
1221 let mut procedure = ProcedureToLoad::new("submit");
1222 procedure.lock_key = LockKey::single_exclusive("test.submit");
1223 let procedure_id = ProcedureId::random();
1224 assert_matches!(
1225 manager
1226 .submit(ProcedureWithId {
1227 id: procedure_id,
1228 procedure: Box::new(procedure),
1229 })
1230 .await
1231 .unwrap_err(),
1232 error::Error::ManagerNotStart { .. }
1233 );
1234 }
1235
1236 #[tokio::test]
1237 async fn test_procedure_manager_restart() {
1238 let dir = create_temp_dir("procedure_manager_restart");
1239 let config = ManagerConfig {
1240 parent_path: "data/".to_string(),
1241 max_retry_times: 3,
1242 retry_delay: Duration::from_millis(500),
1243 ..Default::default()
1244 };
1245 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1246 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1247 let manager = LocalManager::new(config, state_store, poison_manager, None);
1248
1249 manager.start().await.unwrap();
1250 manager.stop().await.unwrap();
1251 manager.start().await.unwrap();
1252
1253 let mut procedure = ProcedureToLoad::new("submit");
1254 procedure.lock_key = LockKey::single_exclusive("test.submit");
1255 let procedure_id = ProcedureId::random();
1256 assert!(manager
1257 .submit(ProcedureWithId {
1258 id: procedure_id,
1259 procedure: Box::new(procedure),
1260 })
1261 .await
1262 .is_ok());
1263 assert!(manager
1264 .procedure_state(procedure_id)
1265 .await
1266 .unwrap()
1267 .is_some());
1268 }
1269
1270 #[tokio::test(flavor = "multi_thread")]
1271 async fn test_remove_outdated_meta_task() {
1272 let dir = create_temp_dir("remove_outdated_meta_task");
1273 let object_store = test_util::new_object_store(&dir);
1274 let config = ManagerConfig {
1275 parent_path: "data/".to_string(),
1276 max_retry_times: 3,
1277 retry_delay: Duration::from_millis(500),
1278 remove_outdated_meta_task_interval: Duration::from_millis(1),
1279 remove_outdated_meta_ttl: Duration::from_millis(1),
1280 max_running_procedures: 128,
1281 };
1282 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1283 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1284 let manager = LocalManager::new(config, state_store, poison_manager, None);
1285 manager.manager_ctx.set_running();
1286
1287 let mut procedure = ProcedureToLoad::new("submit");
1288 procedure.lock_key = LockKey::single_exclusive("test.submit");
1289 let procedure_id = ProcedureId::random();
1290 assert!(manager
1291 .submit(ProcedureWithId {
1292 id: procedure_id,
1293 procedure: Box::new(procedure),
1294 })
1295 .await
1296 .is_ok());
1297 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1298 watcher.changed().await.unwrap();
1299
1300 manager.start().await.unwrap();
1301 tokio::time::sleep(Duration::from_millis(300)).await;
1302 assert!(manager
1303 .procedure_state(procedure_id)
1304 .await
1305 .unwrap()
1306 .is_none());
1307
1308 manager.stop().await.unwrap();
1310 let mut procedure = ProcedureToLoad::new("submit");
1311 procedure.lock_key = LockKey::single_exclusive("test.submit");
1312 let procedure_id = ProcedureId::random();
1313
1314 manager.manager_ctx.set_running();
1315 assert!(manager
1316 .submit(ProcedureWithId {
1317 id: procedure_id,
1318 procedure: Box::new(procedure),
1319 })
1320 .await
1321 .is_ok());
1322 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1323 watcher.changed().await.unwrap();
1324 tokio::time::sleep(Duration::from_millis(300)).await;
1325 assert!(manager
1326 .procedure_state(procedure_id)
1327 .await
1328 .unwrap()
1329 .is_some());
1330
1331 let mut procedure = ProcedureToLoad::new("submit");
1333 procedure.lock_key = LockKey::single_exclusive("test.submit");
1334 let procedure_id = ProcedureId::random();
1335 assert!(manager
1336 .submit(ProcedureWithId {
1337 id: procedure_id,
1338 procedure: Box::new(procedure),
1339 })
1340 .await
1341 .is_ok());
1342 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1343 watcher.changed().await.unwrap();
1344
1345 manager.start().await.unwrap();
1346 tokio::time::sleep(Duration::from_millis(300)).await;
1347 assert!(manager
1348 .procedure_state(procedure_id)
1349 .await
1350 .unwrap()
1351 .is_none());
1352 }
1353
1354 #[tokio::test]
1355 async fn test_too_many_running_procedures() {
1356 let dir = create_temp_dir("too_many_running_procedures");
1357 let config = ManagerConfig {
1358 parent_path: "data/".to_string(),
1359 max_retry_times: 3,
1360 retry_delay: Duration::from_millis(500),
1361 max_running_procedures: 1,
1362 ..Default::default()
1363 };
1364 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1365 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1366 let manager = LocalManager::new(config, state_store, poison_manager, None);
1367 manager.manager_ctx.set_running();
1368
1369 manager
1370 .manager_ctx
1371 .running_procedures
1372 .lock()
1373 .unwrap()
1374 .insert(ProcedureId::random());
1375 manager.start().await.unwrap();
1376
1377 let mut procedure = ProcedureToLoad::new("submit");
1379 procedure.lock_key = LockKey::single_exclusive("test.submit");
1380 let procedure_id = ProcedureId::random();
1381 let err = manager
1382 .submit(ProcedureWithId {
1383 id: procedure_id,
1384 procedure: Box::new(procedure),
1385 })
1386 .await
1387 .unwrap_err();
1388 assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
1389
1390 manager
1391 .manager_ctx
1392 .running_procedures
1393 .lock()
1394 .unwrap()
1395 .clear();
1396
1397 let mut procedure = ProcedureToLoad::new("submit");
1399 procedure.lock_key = LockKey::single_exclusive("test.submit");
1400 assert!(manager
1401 .submit(ProcedureWithId {
1402 id: procedure_id,
1403 procedure: Box::new(procedure),
1404 })
1405 .await
1406 .is_ok());
1407 assert!(manager
1408 .procedure_state(procedure_id)
1409 .await
1410 .unwrap()
1411 .is_some());
1412 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1414 watcher.changed().await.unwrap();
1415 assert!(watcher.borrow().is_done());
1416 }
1417
1418 #[derive(Debug)]
1419 struct ProcedureToRecover {
1420 content: String,
1421 lock_key: LockKey,
1422 notify: Option<Arc<Notify>>,
1423 poison_keys: PoisonKeys,
1424 }
1425
1426 #[async_trait]
1427 impl Procedure for ProcedureToRecover {
1428 fn type_name(&self) -> &str {
1429 "ProcedureToRecover"
1430 }
1431
1432 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1433 Ok(Status::done())
1434 }
1435
1436 fn dump(&self) -> Result<String> {
1437 Ok(self.content.clone())
1438 }
1439
1440 fn lock_key(&self) -> LockKey {
1441 self.lock_key.clone()
1442 }
1443
1444 fn recover(&mut self) -> Result<()> {
1445 self.notify.as_ref().unwrap().notify_one();
1446 Ok(())
1447 }
1448
1449 fn poison_keys(&self) -> PoisonKeys {
1450 self.poison_keys.clone()
1451 }
1452 }
1453
1454 impl ProcedureToRecover {
1455 fn new(content: &str) -> ProcedureToRecover {
1456 ProcedureToRecover {
1457 content: content.to_string(),
1458 lock_key: LockKey::default(),
1459 poison_keys: PoisonKeys::default(),
1460 notify: None,
1461 }
1462 }
1463
1464 fn loader(notify: Arc<Notify>) -> BoxedProcedureLoader {
1465 let f = move |json: &str| {
1466 let procedure = ProcedureToRecover {
1467 content: json.to_string(),
1468 lock_key: LockKey::default(),
1469 poison_keys: PoisonKeys::default(),
1470 notify: Some(notify.clone()),
1471 };
1472 Ok(Box::new(procedure) as _)
1473 };
1474 Box::new(f)
1475 }
1476 }
1477
1478 #[tokio::test]
1479 async fn test_procedure_recover() {
1480 common_telemetry::init_default_ut_logging();
1481 let dir = create_temp_dir("procedure_recover");
1482 let object_store = test_util::new_object_store(&dir);
1483 let config = ManagerConfig {
1484 parent_path: "data/".to_string(),
1485 max_retry_times: 3,
1486 retry_delay: Duration::from_millis(500),
1487 ..Default::default()
1488 };
1489 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1490 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1491 let manager = LocalManager::new(config, state_store, poison_manager, None);
1492 manager.manager_ctx.start();
1493
1494 let notify = Arc::new(Notify::new());
1495 manager
1496 .register_loader(
1497 "ProcedureToRecover",
1498 ProcedureToRecover::loader(notify.clone()),
1499 )
1500 .unwrap();
1501
1502 let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1504 let root: BoxedProcedure = Box::new(ProcedureToRecover::new("test procedure recovery"));
1505 let root_id = ProcedureId::random();
1506 for step in 0..3 {
1508 let type_name = root.type_name().to_string();
1509 let data = root.dump().unwrap();
1510 procedure_store
1511 .store_procedure(root_id, step, type_name, data, None)
1512 .await
1513 .unwrap();
1514 }
1515
1516 manager.recover().await.unwrap();
1518 timeout(Duration::from_secs(10), notify.notified())
1519 .await
1520 .unwrap();
1521 }
1522}