1mod runner;
16
17use std::collections::hash_map::Entry;
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
20use std::sync::{Arc, Mutex, RwLock};
21use std::time::{Duration, Instant};
22
23use async_trait::async_trait;
24use backon::ExponentialBuilder;
25use common_runtime::{RepeatedTask, TaskFunction};
26use common_telemetry::tracing_context::{FutureExt, TracingContext};
27use common_telemetry::{error, info, tracing};
28use snafu::{ensure, OptionExt, ResultExt};
29use tokio::sync::watch::{self, Receiver, Sender};
30use tokio::sync::{Mutex as TokioMutex, Notify};
31
32use crate::error::{
33 self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu,
34 PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
35 StopRemoveOutdatedMetaTaskSnafu, TooManyRunningProceduresSnafu,
36};
37use crate::local::runner::Runner;
38use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
39use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
40use crate::store::poison_store::PoisonStoreRef;
41use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
42use crate::{
43 BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
44 ProcedureState, ProcedureWithId, StringKey, Watcher,
45};
46
47const META_TTL: Duration = Duration::from_secs(60 * 10);
49
50#[derive(Debug)]
59pub(crate) struct ProcedureMeta {
60 id: ProcedureId,
62 type_name: String,
64 parent_id: Option<ProcedureId>,
66 child_notify: Notify,
68 lock_key: LockKey,
70 poison_keys: PoisonKeys,
72 state_sender: Sender<ProcedureState>,
74 state_receiver: Receiver<ProcedureState>,
76 children: Mutex<Vec<ProcedureId>>,
78 start_time_ms: AtomicI64,
80 end_time_ms: AtomicI64,
82}
83
84impl ProcedureMeta {
85 fn new(
86 id: ProcedureId,
87 procedure_state: ProcedureState,
88 parent_id: Option<ProcedureId>,
89 lock_key: LockKey,
90 poison_keys: PoisonKeys,
91 type_name: &str,
92 ) -> ProcedureMeta {
93 let (state_sender, state_receiver) = watch::channel(procedure_state);
94 ProcedureMeta {
95 id,
96 parent_id,
97 child_notify: Notify::new(),
98 lock_key,
99 poison_keys,
100 state_sender,
101 state_receiver,
102 children: Mutex::new(Vec::new()),
103 start_time_ms: AtomicI64::new(0),
104 end_time_ms: AtomicI64::new(0),
105 type_name: type_name.to_string(),
106 }
107 }
108
109 fn state(&self) -> ProcedureState {
111 self.state_receiver.borrow().clone()
112 }
113
114 fn set_state(&self, state: ProcedureState) {
116 self.state_sender.send(state).unwrap();
118 }
119
120 fn push_child(&self, procedure_id: ProcedureId) {
122 let mut children = self.children.lock().unwrap();
123 children.push(procedure_id);
124 }
125
126 fn list_children(&self, buffer: &mut Vec<ProcedureId>) {
128 let children = self.children.lock().unwrap();
129 buffer.extend_from_slice(&children);
130 }
131
132 fn num_children(&self) -> usize {
134 self.children.lock().unwrap().len()
135 }
136
137 fn set_start_time_ms(&self) {
139 self.start_time_ms
140 .store(common_time::util::current_time_millis(), Ordering::Relaxed);
141 }
142
143 fn set_end_time_ms(&self) {
145 self.end_time_ms
146 .store(common_time::util::current_time_millis(), Ordering::Relaxed);
147 }
148}
149
150type ProcedureMetaRef = Arc<ProcedureMeta>;
152
153struct LoadedProcedure {
155 procedure: BoxedProcedure,
156 step: u32,
157}
158
159pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
166
167pub async fn acquire_dynamic_key_lock(
172 lock: &DynamicKeyLock,
173 key: &StringKey,
174) -> DynamicKeyLockGuard {
175 match key {
176 StringKey::Share(key) => {
177 let guard = lock.read(key.to_string()).await;
178 DynamicKeyLockGuard {
179 guard: Some(OwnedKeyRwLockGuard::from(guard)),
180 key: key.to_string(),
181 lock: lock.clone(),
182 }
183 }
184 StringKey::Exclusive(key) => {
185 let guard = lock.write(key.to_string()).await;
186 DynamicKeyLockGuard {
187 guard: Some(OwnedKeyRwLockGuard::from(guard)),
188 key: key.to_string(),
189 lock: lock.clone(),
190 }
191 }
192 }
193}
194pub struct DynamicKeyLockGuard {
199 guard: Option<OwnedKeyRwLockGuard>,
200 key: String,
201 lock: DynamicKeyLock,
202}
203
204impl Drop for DynamicKeyLockGuard {
205 fn drop(&mut self) {
206 if let Some(guard) = self.guard.take() {
207 drop(guard);
208 }
209 self.lock.clean_keys(&[self.key.to_string()]);
210 }
211}
212
213pub(crate) struct ManagerContext {
215 loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
217 key_lock: KeyRwLock<String>,
223 dynamic_key_lock: DynamicKeyLock,
230 procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
232 running_procedures: Mutex<HashSet<ProcedureId>>,
234 finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
236 running: Arc<AtomicBool>,
238 poison_manager: PoisonStoreRef,
240}
241
242#[async_trait]
243impl ContextProvider for ManagerContext {
244 async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
245 Ok(self.state(procedure_id))
246 }
247
248 async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
249 {
250 let procedures = self.procedures.read().unwrap();
252 let procedure = procedures
253 .get(&procedure_id)
254 .context(ProcedureNotFoundSnafu { procedure_id })?;
255
256 ensure!(
258 procedure.poison_keys.contains(key),
259 PoisonKeyNotDefinedSnafu {
260 key: key.clone(),
261 procedure_id
262 }
263 );
264 }
265 let key = key.to_string();
266 let procedure_id = procedure_id.to_string();
267 self.poison_manager.try_put_poison(key, procedure_id).await
268 }
269
270 async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
271 acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
272 }
273}
274
275impl ManagerContext {
276 fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
278 ManagerContext {
279 key_lock: KeyRwLock::new(),
280 dynamic_key_lock: Arc::new(KeyRwLock::new()),
281 loaders: Mutex::new(HashMap::new()),
282 procedures: RwLock::new(HashMap::new()),
283 running_procedures: Mutex::new(HashSet::new()),
284 finished_procedures: Mutex::new(VecDeque::new()),
285 running: Arc::new(AtomicBool::new(false)),
286 poison_manager,
287 }
288 }
289
290 #[cfg(test)]
291 pub(crate) fn set_running(&self) {
292 self.running.store(true, Ordering::Relaxed);
293 }
294
295 pub(crate) fn start(&self) {
297 self.running.store(true, Ordering::Relaxed);
298 }
299
300 pub(crate) fn stop(&self) {
301 self.running.store(false, Ordering::Relaxed);
302 }
303
304 pub(crate) fn running(&self) -> bool {
306 self.running.load(Ordering::Relaxed)
307 }
308
309 fn contains_procedure(&self, procedure_id: ProcedureId) -> bool {
311 let procedures = self.procedures.read().unwrap();
312 procedures.contains_key(&procedure_id)
313 }
314
315 fn num_running_procedures(&self) -> usize {
317 self.running_procedures.lock().unwrap().len()
318 }
319
320 fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
325 let procedure_id = meta.id;
326 let mut procedures = self.procedures.write().unwrap();
327 match procedures.entry(procedure_id) {
328 Entry::Occupied(_) => return false,
329 Entry::Vacant(vacant_entry) => {
330 vacant_entry.insert(meta);
331 }
332 }
333
334 let mut running_procedures = self.running_procedures.lock().unwrap();
335 running_procedures.insert(procedure_id);
336
337 true
338 }
339
340 fn state(&self, procedure_id: ProcedureId) -> Option<ProcedureState> {
342 let procedures = self.procedures.read().unwrap();
343 procedures.get(&procedure_id).map(|meta| meta.state())
344 }
345
346 fn list_procedure(&self) -> Vec<ProcedureInfo> {
348 let procedures = self.procedures.read().unwrap();
349 procedures
350 .values()
351 .map(|meta| ProcedureInfo {
352 id: meta.id,
353 type_name: meta.type_name.clone(),
354 start_time_ms: meta.start_time_ms.load(Ordering::Relaxed),
355 end_time_ms: meta.end_time_ms.load(Ordering::Relaxed),
356 state: meta.state(),
357 lock_keys: meta.lock_key.get_keys(),
358 })
359 .collect()
360 }
361
362 fn watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
364 let procedures = self.procedures.read().unwrap();
365 procedures
366 .get(&procedure_id)
367 .map(|meta| meta.state_receiver.clone())
368 }
369
370 fn notify_by_subprocedure(&self, procedure_id: ProcedureId) {
372 let procedures = self.procedures.read().unwrap();
373 if let Some(meta) = procedures.get(&procedure_id) {
374 meta.child_notify.notify_one();
375 }
376 }
377
378 fn load_one_procedure_from_message(
380 &self,
381 procedure_id: ProcedureId,
382 message: &ProcedureMessage,
383 ) -> Option<LoadedProcedure> {
384 let loaders = self.loaders.lock().unwrap();
385 let loader = loaders.get(&message.type_name).or_else(|| {
386 error!(
387 "Loader not found, procedure_id: {}, type_name: {}",
388 procedure_id, message.type_name
389 );
390 None
391 })?;
392
393 let procedure = loader(&message.data)
394 .map_err(|e| {
395 error!(
396 "Failed to load procedure data, key: {}, source: {:?}",
397 procedure_id, e
398 );
399 e
400 })
401 .ok()?;
402
403 Some(LoadedProcedure {
404 procedure,
405 step: message.step,
406 })
407 }
408
409 fn procedures_in_tree(&self, root: &ProcedureMetaRef) -> Vec<ProcedureId> {
414 let sub_num = root.num_children();
415 let mut procedures = Vec::with_capacity(1 + sub_num);
417
418 let mut queue = VecDeque::with_capacity(1 + sub_num);
419 queue.push_back(root.clone());
421
422 let mut children_ids = Vec::with_capacity(sub_num);
423 let mut children = Vec::with_capacity(sub_num);
424 while let Some(meta) = queue.pop_front() {
425 procedures.push(meta.id);
426
427 children_ids.clear();
429 meta.list_children(&mut children_ids);
430 self.find_procedures(&children_ids, &mut children);
431
432 for child in children.drain(..) {
434 queue.push_back(child);
435 }
436 }
437
438 procedures
439 }
440
441 fn find_procedures(&self, procedure_ids: &[ProcedureId], metas: &mut Vec<ProcedureMetaRef>) {
445 let procedures = self.procedures.read().unwrap();
446 for procedure_id in procedure_ids {
447 if let Some(meta) = procedures.get(procedure_id) {
448 metas.push(meta.clone());
449 }
450 }
451 }
452
453 fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
455 let now = Instant::now();
458 let mut finished_procedures = self.finished_procedures.lock().unwrap();
459 finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
460
461 let mut running_procedures = self.running_procedures.lock().unwrap();
463 for procedure_id in procedure_ids {
464 running_procedures.remove(procedure_id);
465 }
466 }
467
468 fn remove_outdated_meta(&self, ttl: Duration) {
470 let ids = {
471 let mut finished_procedures = self.finished_procedures.lock().unwrap();
472 if finished_procedures.is_empty() {
473 return;
474 }
475
476 let mut ids_to_remove = Vec::new();
477 while let Some((id, finish_time)) = finished_procedures.front() {
478 if finish_time.elapsed() > ttl {
479 ids_to_remove.push(*id);
480 let _ = finished_procedures.pop_front();
481 } else {
482 break;
485 }
486 }
487 ids_to_remove
488 };
489
490 if ids.is_empty() {
491 return;
492 }
493
494 let mut procedures = self.procedures.write().unwrap();
495 for id in ids {
496 let _ = procedures.remove(&id);
497 }
498 }
499}
500
501#[derive(Debug)]
503pub struct ManagerConfig {
504 pub parent_path: String,
505 pub max_retry_times: usize,
506 pub retry_delay: Duration,
507 pub remove_outdated_meta_task_interval: Duration,
508 pub remove_outdated_meta_ttl: Duration,
509 pub max_running_procedures: usize,
510}
511
512impl Default for ManagerConfig {
513 fn default() -> Self {
514 Self {
515 parent_path: String::default(),
516 max_retry_times: 3,
517 retry_delay: Duration::from_millis(500),
518 remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
519 remove_outdated_meta_ttl: META_TTL,
520 max_running_procedures: 128,
521 }
522 }
523}
524
525pub struct LocalManager {
527 manager_ctx: Arc<ManagerContext>,
528 procedure_store: Arc<ProcedureStore>,
529 max_retry_times: usize,
530 retry_delay: Duration,
531 remove_outdated_meta_task: TokioMutex<Option<RepeatedTask<Error>>>,
533 config: ManagerConfig,
534}
535
536impl LocalManager {
537 pub fn new(
539 config: ManagerConfig,
540 state_store: StateStoreRef,
541 poison_store: PoisonStoreRef,
542 ) -> LocalManager {
543 let manager_ctx = Arc::new(ManagerContext::new(poison_store));
544
545 LocalManager {
546 manager_ctx,
547 procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)),
548 max_retry_times: config.max_retry_times,
549 retry_delay: config.retry_delay,
550 remove_outdated_meta_task: TokioMutex::new(None),
551 config,
552 }
553 }
554
555 pub fn build_remove_outdated_meta_task(&self) -> RepeatedTask<Error> {
557 RepeatedTask::new(
558 self.config.remove_outdated_meta_task_interval,
559 Box::new(RemoveOutdatedMetaFunction {
560 manager_ctx: self.manager_ctx.clone(),
561 ttl: self.config.remove_outdated_meta_ttl,
562 }),
563 )
564 }
565
566 fn submit_root(
568 &self,
569 procedure_id: ProcedureId,
570 procedure_state: ProcedureState,
571 step: u32,
572 procedure: BoxedProcedure,
573 ) -> Result<Watcher> {
574 ensure!(self.manager_ctx.running(), ManagerNotStartSnafu);
575
576 let meta = Arc::new(ProcedureMeta::new(
577 procedure_id,
578 procedure_state,
579 None,
580 procedure.lock_key(),
581 procedure.poison_keys(),
582 procedure.type_name(),
583 ));
584 let runner = Runner {
585 meta: meta.clone(),
586 procedure,
587 manager_ctx: self.manager_ctx.clone(),
588 step,
589 exponential_builder: ExponentialBuilder::default()
590 .with_min_delay(self.retry_delay)
591 .with_max_times(self.max_retry_times),
592 store: self.procedure_store.clone(),
593 rolling_back: false,
594 };
595
596 let watcher = meta.state_receiver.clone();
597
598 ensure!(
599 self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
600 TooManyRunningProceduresSnafu {
601 max_running_procedures: self.config.max_running_procedures,
602 }
603 );
604
605 ensure!(
607 self.manager_ctx.try_insert_procedure(meta),
608 DuplicateProcedureSnafu { procedure_id },
609 );
610
611 let tracing_context = TracingContext::from_current_span();
612
613 let _handle = common_runtime::spawn_global(async move {
614 runner
618 .run()
619 .trace(
620 tracing_context
621 .attach(tracing::info_span!("LocalManager::submit_root_procedure")),
622 )
623 .await;
624 });
625
626 Ok(watcher)
627 }
628
629 fn submit_recovered_messages(
630 &self,
631 messages: HashMap<ProcedureId, ProcedureMessage>,
632 init_state: InitProcedureState,
633 ) {
634 for (procedure_id, message) in &messages {
635 if message.parent_id.is_none() {
636 let Some(mut loaded_procedure) = self
639 .manager_ctx
640 .load_one_procedure_from_message(*procedure_id, message)
641 else {
642 continue;
644 };
645
646 info!(
647 "Recover root procedure {}-{}, step: {}",
648 loaded_procedure.procedure.type_name(),
649 procedure_id,
650 loaded_procedure.step
651 );
652
653 let procedure_state = match init_state {
654 InitProcedureState::RollingBack => ProcedureState::RollingBack {
655 error: Arc::new(
656 error::RollbackProcedureRecoveredSnafu {
657 error: message.error.clone().unwrap_or("Unknown error".to_string()),
658 }
659 .build(),
660 ),
661 },
662 InitProcedureState::Running => ProcedureState::Running,
663 };
664
665 if let Err(e) = loaded_procedure.procedure.recover() {
666 error!(e; "Failed to recover procedure {}", procedure_id);
667 }
668
669 if let Err(e) = self.submit_root(
670 *procedure_id,
671 procedure_state,
672 loaded_procedure.step,
673 loaded_procedure.procedure,
674 ) {
675 error!(e; "Failed to recover procedure {}", procedure_id);
676 }
677 }
678 }
679 }
680
681 async fn recover(&self) -> Result<()> {
683 info!("LocalManager start to recover");
684 let recover_start = Instant::now();
685
686 let ProcedureMessages {
687 messages,
688 rollback_messages,
689 finished_ids,
690 } = self.procedure_store.load_messages().await?;
691 self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack);
693 self.submit_recovered_messages(messages, InitProcedureState::Running);
694
695 if !finished_ids.is_empty() {
696 info!(
697 "LocalManager try to clean finished procedures, num: {}",
698 finished_ids.len()
699 );
700
701 for procedure_id in finished_ids {
702 if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
703 error!(e; "Failed to delete procedure {}", procedure_id);
704 }
705 }
706 }
707
708 info!(
709 "LocalManager finish recovery, cost: {}ms",
710 recover_start.elapsed().as_millis()
711 );
712
713 Ok(())
714 }
715
716 #[cfg(any(test, feature = "testing"))]
717 pub fn contains_loader(&self, name: &str) -> bool {
719 let loaders = self.manager_ctx.loaders.lock().unwrap();
720 loaders.contains_key(name)
721 }
722}
723
724#[async_trait]
725impl ProcedureManager for LocalManager {
726 fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()> {
727 let mut loaders = self.manager_ctx.loaders.lock().unwrap();
728 ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name });
729
730 let _ = loaders.insert(name.to_string(), loader);
731
732 Ok(())
733 }
734
735 async fn start(&self) -> Result<()> {
736 let mut task = self.remove_outdated_meta_task.lock().await;
737
738 if task.is_some() {
739 return Ok(());
740 }
741
742 let task_inner = self.build_remove_outdated_meta_task();
743
744 task_inner
745 .start(common_runtime::global_runtime())
746 .context(StartRemoveOutdatedMetaTaskSnafu)?;
747
748 *task = Some(task_inner);
749
750 self.manager_ctx.start();
751
752 info!("LocalManager is start.");
753
754 self.recover().await
755 }
756
757 async fn stop(&self) -> Result<()> {
758 let mut task = self.remove_outdated_meta_task.lock().await;
759
760 if let Some(task) = task.take() {
761 task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?;
762 }
763
764 self.manager_ctx.stop();
765
766 info!("LocalManager is stopped.");
767
768 Ok(())
769 }
770
771 async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher> {
772 let procedure_id = procedure.id;
773 ensure!(
774 !self.manager_ctx.contains_procedure(procedure_id),
775 DuplicateProcedureSnafu { procedure_id }
776 );
777
778 self.submit_root(
779 procedure.id,
780 ProcedureState::Running,
781 0,
782 procedure.procedure,
783 )
784 }
785
786 async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
787 Ok(self.manager_ctx.state(procedure_id))
788 }
789
790 fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
791 self.manager_ctx.watcher(procedure_id)
792 }
793
794 async fn list_procedures(&self) -> Result<Vec<ProcedureInfo>> {
795 Ok(self.manager_ctx.list_procedure())
796 }
797}
798
799struct RemoveOutdatedMetaFunction {
800 manager_ctx: Arc<ManagerContext>,
801 ttl: Duration,
802}
803
804#[async_trait::async_trait]
805impl TaskFunction<Error> for RemoveOutdatedMetaFunction {
806 fn name(&self) -> &str {
807 "ProcedureManager-remove-outdated-meta-task"
808 }
809
810 async fn call(&mut self) -> Result<()> {
811 self.manager_ctx.remove_outdated_meta(self.ttl);
812 Ok(())
813 }
814}
815
816#[cfg(test)]
818pub(crate) mod test_util {
819 use common_test_util::temp_dir::TempDir;
820 use object_store::services::Fs as Builder;
821 use object_store::ObjectStore;
822
823 use super::*;
824
825 pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
826 ProcedureMeta::new(
827 ProcedureId::random(),
828 ProcedureState::Running,
829 None,
830 LockKey::default(),
831 PoisonKeys::default(),
832 "ProcedureAdapter",
833 )
834 }
835
836 pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
837 let store_dir = dir.path().to_str().unwrap();
838 let builder = Builder::default();
839 ObjectStore::new(builder.root(store_dir)).unwrap().finish()
840 }
841}
842
843#[cfg(test)]
844mod tests {
845 use std::assert_matches::assert_matches;
846
847 use common_error::mock::MockError;
848 use common_error::status_code::StatusCode;
849 use common_test_util::temp_dir::create_temp_dir;
850 use tokio::time::timeout;
851
852 use super::*;
853 use crate::error::{self, Error};
854 use crate::store::state_store::ObjectStateStore;
855 use crate::test_util::InMemoryPoisonStore;
856 use crate::{Context, Procedure, Status};
857
858 fn new_test_manager_context() -> ManagerContext {
859 let poison_manager = Arc::new(InMemoryPoisonStore::default());
860 ManagerContext::new(poison_manager)
861 }
862
863 #[test]
864 fn test_manager_context() {
865 let ctx = new_test_manager_context();
866 let meta = Arc::new(test_util::procedure_meta_for_test());
867
868 assert!(!ctx.contains_procedure(meta.id));
869 assert!(ctx.state(meta.id).is_none());
870
871 assert!(ctx.try_insert_procedure(meta.clone()));
872 assert!(ctx.contains_procedure(meta.id));
873
874 assert!(ctx.state(meta.id).unwrap().is_running());
875 meta.set_state(ProcedureState::Done { output: None });
876 assert!(ctx.state(meta.id).unwrap().is_done());
877 }
878
879 #[test]
880 fn test_manager_context_insert_duplicate() {
881 let ctx = new_test_manager_context();
882 let meta = Arc::new(test_util::procedure_meta_for_test());
883
884 assert!(ctx.try_insert_procedure(meta.clone()));
885 assert!(!ctx.try_insert_procedure(meta));
886 }
887
888 fn new_child(parent_id: ProcedureId, ctx: &ManagerContext) -> ProcedureMetaRef {
889 let mut child = test_util::procedure_meta_for_test();
890 child.parent_id = Some(parent_id);
891 let child = Arc::new(child);
892 assert!(ctx.try_insert_procedure(child.clone()));
893
894 let mut parent = Vec::new();
895 ctx.find_procedures(&[parent_id], &mut parent);
896 parent[0].push_child(child.id);
897
898 child
899 }
900
901 #[test]
902 fn test_procedures_in_tree() {
903 let ctx = new_test_manager_context();
904 let root = Arc::new(test_util::procedure_meta_for_test());
905 assert!(ctx.try_insert_procedure(root.clone()));
906
907 assert_eq!(1, ctx.procedures_in_tree(&root).len());
908
909 let child1 = new_child(root.id, &ctx);
910 let child2 = new_child(root.id, &ctx);
911
912 let child3 = new_child(child1.id, &ctx);
913 let child4 = new_child(child1.id, &ctx);
914
915 let child5 = new_child(child2.id, &ctx);
916
917 let expect = vec![
918 root.id, child1.id, child2.id, child3.id, child4.id, child5.id,
919 ];
920 assert_eq!(expect, ctx.procedures_in_tree(&root));
921 }
922
923 #[derive(Debug)]
924 struct ProcedureToLoad {
925 content: String,
926 lock_key: LockKey,
927 poison_keys: PoisonKeys,
928 }
929
930 #[async_trait]
931 impl Procedure for ProcedureToLoad {
932 fn type_name(&self) -> &str {
933 "ProcedureToLoad"
934 }
935
936 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
937 Ok(Status::done())
938 }
939
940 fn dump(&self) -> Result<String> {
941 Ok(self.content.clone())
942 }
943
944 fn lock_key(&self) -> LockKey {
945 self.lock_key.clone()
946 }
947
948 fn poison_keys(&self) -> PoisonKeys {
949 self.poison_keys.clone()
950 }
951 }
952
953 impl ProcedureToLoad {
954 fn new(content: &str) -> ProcedureToLoad {
955 ProcedureToLoad {
956 content: content.to_string(),
957 lock_key: LockKey::default(),
958 poison_keys: PoisonKeys::default(),
959 }
960 }
961
962 fn loader() -> BoxedProcedureLoader {
963 let f = |json: &str| {
964 let procedure = ProcedureToLoad::new(json);
965 Ok(Box::new(procedure) as _)
966 };
967 Box::new(f)
968 }
969 }
970
971 #[test]
972 fn test_register_loader() {
973 let dir = create_temp_dir("register");
974 let config = ManagerConfig {
975 parent_path: "data/".to_string(),
976 max_retry_times: 3,
977 retry_delay: Duration::from_millis(500),
978 ..Default::default()
979 };
980 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
981 let poison_manager = Arc::new(InMemoryPoisonStore::new());
982 let manager = LocalManager::new(config, state_store, poison_manager);
983 manager.manager_ctx.start();
984
985 manager
986 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
987 .unwrap();
988 let err = manager
990 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
991 .unwrap_err();
992 assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
993 }
994
995 #[tokio::test]
996 async fn test_recover() {
997 let dir = create_temp_dir("recover");
998 let object_store = test_util::new_object_store(&dir);
999 let config = ManagerConfig {
1000 parent_path: "data/".to_string(),
1001 max_retry_times: 3,
1002 retry_delay: Duration::from_millis(500),
1003 ..Default::default()
1004 };
1005 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1006 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1007 let manager = LocalManager::new(config, state_store, poison_manager);
1008 manager.manager_ctx.start();
1009
1010 manager
1011 .register_loader("ProcedureToLoad", ProcedureToLoad::loader())
1012 .unwrap();
1013
1014 let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1016 let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
1017 let root_id = ProcedureId::random();
1018 for step in 0..3 {
1020 let type_name = root.type_name().to_string();
1021 let data = root.dump().unwrap();
1022 procedure_store
1023 .store_procedure(root_id, step, type_name, data, None)
1024 .await
1025 .unwrap();
1026 }
1027
1028 let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure"));
1029 let child_id = ProcedureId::random();
1030 for step in 0..2 {
1032 let type_name = child.type_name().to_string();
1033 let data = child.dump().unwrap();
1034 procedure_store
1035 .store_procedure(child_id, step, type_name, data, Some(root_id))
1036 .await
1037 .unwrap();
1038 }
1039
1040 manager.recover().await.unwrap();
1042
1043 let _ = manager.procedure_state(root_id).await.unwrap().unwrap();
1045 assert!(manager.procedure_state(child_id).await.unwrap().is_none());
1048 }
1049
1050 #[tokio::test]
1051 async fn test_submit_procedure() {
1052 let dir = create_temp_dir("submit");
1053 let config = ManagerConfig {
1054 parent_path: "data/".to_string(),
1055 max_retry_times: 3,
1056 retry_delay: Duration::from_millis(500),
1057 ..Default::default()
1058 };
1059 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1060 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1061 let manager = LocalManager::new(config, state_store, poison_manager);
1062 manager.manager_ctx.start();
1063
1064 let procedure_id = ProcedureId::random();
1065 assert!(manager
1066 .procedure_state(procedure_id)
1067 .await
1068 .unwrap()
1069 .is_none());
1070 assert!(manager.procedure_watcher(procedure_id).is_none());
1071
1072 let mut procedure = ProcedureToLoad::new("submit");
1073 procedure.lock_key = LockKey::single_exclusive("test.submit");
1074 assert!(manager
1075 .submit(ProcedureWithId {
1076 id: procedure_id,
1077 procedure: Box::new(procedure),
1078 })
1079 .await
1080 .is_ok());
1081 assert!(manager
1082 .procedure_state(procedure_id)
1083 .await
1084 .unwrap()
1085 .is_some());
1086 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1088 watcher.changed().await.unwrap();
1089 assert!(watcher.borrow().is_done());
1090
1091 let err = manager
1093 .submit(ProcedureWithId {
1094 id: procedure_id,
1095 procedure: Box::new(ProcedureToLoad::new("submit")),
1096 })
1097 .await
1098 .unwrap_err();
1099 assert!(matches!(err, Error::DuplicateProcedure { .. }), "{err}");
1100 }
1101
1102 #[tokio::test]
1103 async fn test_state_changed_on_err() {
1104 let dir = create_temp_dir("on_err");
1105 let config = ManagerConfig {
1106 parent_path: "data/".to_string(),
1107 max_retry_times: 3,
1108 retry_delay: Duration::from_millis(500),
1109 ..Default::default()
1110 };
1111 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1112 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1113 let manager = LocalManager::new(config, state_store, poison_manager);
1114 manager.manager_ctx.start();
1115
1116 #[derive(Debug)]
1117 struct MockProcedure {
1118 panic: bool,
1119 }
1120
1121 #[async_trait]
1122 impl Procedure for MockProcedure {
1123 fn type_name(&self) -> &str {
1124 "MockProcedure"
1125 }
1126
1127 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1128 if self.panic {
1129 panic!();
1132 } else {
1133 Err(Error::external(MockError::new(StatusCode::Unexpected)))
1134 }
1135 }
1136
1137 async fn rollback(&mut self, _: &Context) -> Result<()> {
1138 Ok(())
1139 }
1140
1141 fn rollback_supported(&self) -> bool {
1142 true
1143 }
1144
1145 fn dump(&self) -> Result<String> {
1146 Ok(String::new())
1147 }
1148
1149 fn lock_key(&self) -> LockKey {
1150 LockKey::single_exclusive("test.submit")
1151 }
1152
1153 fn poison_keys(&self) -> PoisonKeys {
1154 PoisonKeys::default()
1155 }
1156 }
1157
1158 let check_procedure = |procedure| async {
1159 let procedure_id = ProcedureId::random();
1160 manager
1161 .submit(ProcedureWithId {
1162 id: procedure_id,
1163 procedure: Box::new(procedure),
1164 })
1165 .await
1166 .unwrap()
1167 };
1168
1169 let mut watcher = check_procedure(MockProcedure { panic: false }).await;
1170 watcher.changed().await.unwrap();
1172 assert!(watcher.borrow().is_prepare_rollback());
1173 watcher.changed().await.unwrap();
1174 assert!(watcher.borrow().is_rolling_back());
1175 watcher.changed().await.unwrap();
1176 assert!(watcher.borrow().is_failed());
1177 let mut watcher = check_procedure(MockProcedure { panic: true }).await;
1179 watcher.changed().await.unwrap();
1180 assert!(watcher.borrow().is_failed());
1181 }
1182
1183 #[tokio::test]
1184 async fn test_procedure_manager_stopped() {
1185 let dir = create_temp_dir("procedure_manager_stopped");
1186 let config = ManagerConfig {
1187 parent_path: "data/".to_string(),
1188 max_retry_times: 3,
1189 retry_delay: Duration::from_millis(500),
1190 ..Default::default()
1191 };
1192 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1193 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1194 let manager = LocalManager::new(config, state_store, poison_manager);
1195
1196 let mut procedure = ProcedureToLoad::new("submit");
1197 procedure.lock_key = LockKey::single_exclusive("test.submit");
1198 let procedure_id = ProcedureId::random();
1199 assert_matches!(
1200 manager
1201 .submit(ProcedureWithId {
1202 id: procedure_id,
1203 procedure: Box::new(procedure),
1204 })
1205 .await
1206 .unwrap_err(),
1207 error::Error::ManagerNotStart { .. }
1208 );
1209 }
1210
1211 #[tokio::test]
1212 async fn test_procedure_manager_restart() {
1213 let dir = create_temp_dir("procedure_manager_restart");
1214 let config = ManagerConfig {
1215 parent_path: "data/".to_string(),
1216 max_retry_times: 3,
1217 retry_delay: Duration::from_millis(500),
1218 ..Default::default()
1219 };
1220 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1221 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1222 let manager = LocalManager::new(config, state_store, poison_manager);
1223
1224 manager.start().await.unwrap();
1225 manager.stop().await.unwrap();
1226 manager.start().await.unwrap();
1227
1228 let mut procedure = ProcedureToLoad::new("submit");
1229 procedure.lock_key = LockKey::single_exclusive("test.submit");
1230 let procedure_id = ProcedureId::random();
1231 assert!(manager
1232 .submit(ProcedureWithId {
1233 id: procedure_id,
1234 procedure: Box::new(procedure),
1235 })
1236 .await
1237 .is_ok());
1238 assert!(manager
1239 .procedure_state(procedure_id)
1240 .await
1241 .unwrap()
1242 .is_some());
1243 }
1244
1245 #[tokio::test(flavor = "multi_thread")]
1246 async fn test_remove_outdated_meta_task() {
1247 let dir = create_temp_dir("remove_outdated_meta_task");
1248 let object_store = test_util::new_object_store(&dir);
1249 let config = ManagerConfig {
1250 parent_path: "data/".to_string(),
1251 max_retry_times: 3,
1252 retry_delay: Duration::from_millis(500),
1253 remove_outdated_meta_task_interval: Duration::from_millis(1),
1254 remove_outdated_meta_ttl: Duration::from_millis(1),
1255 max_running_procedures: 128,
1256 };
1257 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1258 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1259 let manager = LocalManager::new(config, state_store, poison_manager);
1260 manager.manager_ctx.set_running();
1261
1262 let mut procedure = ProcedureToLoad::new("submit");
1263 procedure.lock_key = LockKey::single_exclusive("test.submit");
1264 let procedure_id = ProcedureId::random();
1265 assert!(manager
1266 .submit(ProcedureWithId {
1267 id: procedure_id,
1268 procedure: Box::new(procedure),
1269 })
1270 .await
1271 .is_ok());
1272 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1273 watcher.changed().await.unwrap();
1274
1275 manager.start().await.unwrap();
1276 tokio::time::sleep(Duration::from_millis(300)).await;
1277 assert!(manager
1278 .procedure_state(procedure_id)
1279 .await
1280 .unwrap()
1281 .is_none());
1282
1283 manager.stop().await.unwrap();
1285 let mut procedure = ProcedureToLoad::new("submit");
1286 procedure.lock_key = LockKey::single_exclusive("test.submit");
1287 let procedure_id = ProcedureId::random();
1288
1289 manager.manager_ctx.set_running();
1290 assert!(manager
1291 .submit(ProcedureWithId {
1292 id: procedure_id,
1293 procedure: Box::new(procedure),
1294 })
1295 .await
1296 .is_ok());
1297 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1298 watcher.changed().await.unwrap();
1299 tokio::time::sleep(Duration::from_millis(300)).await;
1300 assert!(manager
1301 .procedure_state(procedure_id)
1302 .await
1303 .unwrap()
1304 .is_some());
1305
1306 let mut procedure = ProcedureToLoad::new("submit");
1308 procedure.lock_key = LockKey::single_exclusive("test.submit");
1309 let procedure_id = ProcedureId::random();
1310 assert!(manager
1311 .submit(ProcedureWithId {
1312 id: procedure_id,
1313 procedure: Box::new(procedure),
1314 })
1315 .await
1316 .is_ok());
1317 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1318 watcher.changed().await.unwrap();
1319
1320 manager.start().await.unwrap();
1321 tokio::time::sleep(Duration::from_millis(300)).await;
1322 assert!(manager
1323 .procedure_state(procedure_id)
1324 .await
1325 .unwrap()
1326 .is_none());
1327 }
1328
1329 #[tokio::test]
1330 async fn test_too_many_running_procedures() {
1331 let dir = create_temp_dir("too_many_running_procedures");
1332 let config = ManagerConfig {
1333 parent_path: "data/".to_string(),
1334 max_retry_times: 3,
1335 retry_delay: Duration::from_millis(500),
1336 max_running_procedures: 1,
1337 ..Default::default()
1338 };
1339 let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
1340 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1341 let manager = LocalManager::new(config, state_store, poison_manager);
1342 manager.manager_ctx.set_running();
1343
1344 manager
1345 .manager_ctx
1346 .running_procedures
1347 .lock()
1348 .unwrap()
1349 .insert(ProcedureId::random());
1350 manager.start().await.unwrap();
1351
1352 let mut procedure = ProcedureToLoad::new("submit");
1354 procedure.lock_key = LockKey::single_exclusive("test.submit");
1355 let procedure_id = ProcedureId::random();
1356 let err = manager
1357 .submit(ProcedureWithId {
1358 id: procedure_id,
1359 procedure: Box::new(procedure),
1360 })
1361 .await
1362 .unwrap_err();
1363 assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
1364
1365 manager
1366 .manager_ctx
1367 .running_procedures
1368 .lock()
1369 .unwrap()
1370 .clear();
1371
1372 let mut procedure = ProcedureToLoad::new("submit");
1374 procedure.lock_key = LockKey::single_exclusive("test.submit");
1375 assert!(manager
1376 .submit(ProcedureWithId {
1377 id: procedure_id,
1378 procedure: Box::new(procedure),
1379 })
1380 .await
1381 .is_ok());
1382 assert!(manager
1383 .procedure_state(procedure_id)
1384 .await
1385 .unwrap()
1386 .is_some());
1387 let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
1389 watcher.changed().await.unwrap();
1390 assert!(watcher.borrow().is_done());
1391 }
1392
1393 #[derive(Debug)]
1394 struct ProcedureToRecover {
1395 content: String,
1396 lock_key: LockKey,
1397 notify: Option<Arc<Notify>>,
1398 poison_keys: PoisonKeys,
1399 }
1400
1401 #[async_trait]
1402 impl Procedure for ProcedureToRecover {
1403 fn type_name(&self) -> &str {
1404 "ProcedureToRecover"
1405 }
1406
1407 async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
1408 Ok(Status::done())
1409 }
1410
1411 fn dump(&self) -> Result<String> {
1412 Ok(self.content.clone())
1413 }
1414
1415 fn lock_key(&self) -> LockKey {
1416 self.lock_key.clone()
1417 }
1418
1419 fn recover(&mut self) -> Result<()> {
1420 self.notify.as_ref().unwrap().notify_one();
1421 Ok(())
1422 }
1423
1424 fn poison_keys(&self) -> PoisonKeys {
1425 self.poison_keys.clone()
1426 }
1427 }
1428
1429 impl ProcedureToRecover {
1430 fn new(content: &str) -> ProcedureToRecover {
1431 ProcedureToRecover {
1432 content: content.to_string(),
1433 lock_key: LockKey::default(),
1434 poison_keys: PoisonKeys::default(),
1435 notify: None,
1436 }
1437 }
1438
1439 fn loader(notify: Arc<Notify>) -> BoxedProcedureLoader {
1440 let f = move |json: &str| {
1441 let procedure = ProcedureToRecover {
1442 content: json.to_string(),
1443 lock_key: LockKey::default(),
1444 poison_keys: PoisonKeys::default(),
1445 notify: Some(notify.clone()),
1446 };
1447 Ok(Box::new(procedure) as _)
1448 };
1449 Box::new(f)
1450 }
1451 }
1452
1453 #[tokio::test]
1454 async fn test_procedure_recover() {
1455 common_telemetry::init_default_ut_logging();
1456 let dir = create_temp_dir("procedure_recover");
1457 let object_store = test_util::new_object_store(&dir);
1458 let config = ManagerConfig {
1459 parent_path: "data/".to_string(),
1460 max_retry_times: 3,
1461 retry_delay: Duration::from_millis(500),
1462 ..Default::default()
1463 };
1464 let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
1465 let poison_manager = Arc::new(InMemoryPoisonStore::new());
1466 let manager = LocalManager::new(config, state_store, poison_manager);
1467 manager.manager_ctx.start();
1468
1469 let notify = Arc::new(Notify::new());
1470 manager
1471 .register_loader(
1472 "ProcedureToRecover",
1473 ProcedureToRecover::loader(notify.clone()),
1474 )
1475 .unwrap();
1476
1477 let procedure_store = ProcedureStore::from_object_store(object_store.clone());
1479 let root: BoxedProcedure = Box::new(ProcedureToRecover::new("test procedure recovery"));
1480 let root_id = ProcedureId::random();
1481 for step in 0..3 {
1483 let type_name = root.type_name().to_string();
1484 let data = root.dump().unwrap();
1485 procedure_store
1486 .store_procedure(root_id, step, type_name, data, None)
1487 .await
1488 .unwrap();
1489 }
1490
1491 manager.recover().await.unwrap();
1493 timeout(Duration::from_secs(10), notify.notified())
1494 .await
1495 .unwrap();
1496 }
1497}