1use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_error::ext::PlainError;
21use common_error::status_code::StatusCode;
22use common_event_recorder::EventRecorderRef;
23use common_telemetry::tracing_context::{FutureExt, TracingContext};
24use common_telemetry::{debug, error, info, tracing};
25use rand::Rng;
26use snafu::ResultExt;
27use tokio::time;
28
29use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
30use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
31use crate::procedure::{Output, StringKey};
32use crate::rwlock::OwnedKeyRwLockGuard;
33use crate::store::{ProcedureMessage, ProcedureStore};
34use crate::{
35 BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
36};
37
38struct ProcedureGuard {
40 meta: ProcedureMetaRef,
41 manager_ctx: Arc<ManagerContext>,
42 key_guards: Vec<OwnedKeyRwLockGuard>,
43 finish: bool,
44}
45
46impl ProcedureGuard {
47 fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
49 ProcedureGuard {
50 meta,
51 manager_ctx,
52 key_guards: vec![],
53 finish: false,
54 }
55 }
56
57 fn finish(mut self) {
59 self.finish = true;
60 }
61}
62
63impl Drop for ProcedureGuard {
64 fn drop(&mut self) {
65 if !self.finish {
66 error!("Procedure {} exits unexpectedly", self.meta.id);
67
68 let err = ProcedurePanicSnafu {
72 procedure_id: self.meta.id,
73 }
74 .build();
75 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
76 }
77
78 if let Some(parent_id) = self.meta.parent_id {
80 self.manager_ctx.notify_by_subprocedure(parent_id);
81 }
82
83 while !self.key_guards.is_empty() {
85 self.key_guards.pop();
86 }
87
88 self.manager_ctx
90 .key_lock
91 .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
92 }
93}
94
95fn find_lock_conflicts<'a>(
101 parent_keys: impl Iterator<Item = &'a StringKey>,
102 child_keys: impl Iterator<Item = &'a StringKey>,
103) -> Vec<String> {
104 use std::collections::HashMap;
105
106 let mut parent_map = HashMap::new();
108 for key in parent_keys {
109 match key {
110 StringKey::Exclusive(k) => {
111 parent_map.insert(k.as_str(), true);
112 }
113 StringKey::Share(k) => {
114 parent_map.entry(k.as_str()).or_insert(false);
115 }
116 }
117 }
118
119 child_keys
120 .filter_map(|child_key| match child_key {
121 StringKey::Exclusive(k) | StringKey::Share(k)
122 if parent_map.get(k.as_str()) == Some(&true) =>
123 {
124 Some(k.clone())
125 }
126 StringKey::Exclusive(k) if parent_map.get(k.as_str()) == Some(&false) => {
127 Some(k.clone())
128 }
129 _ => None,
130 })
131 .collect()
132}
133
134pub(crate) struct Runner {
135 pub(crate) meta: ProcedureMetaRef,
136 pub(crate) procedure: BoxedProcedure,
137 pub(crate) manager_ctx: Arc<ManagerContext>,
138 pub(crate) step: u32,
139 pub(crate) exponential_builder: ExponentialBuilder,
140 pub(crate) store: Arc<ProcedureStore>,
141 pub(crate) rolling_back: bool,
142 pub(crate) event_recorder: Option<EventRecorderRef>,
143}
144
145impl Runner {
146 pub(crate) fn running(&self) -> bool {
148 self.manager_ctx.running()
149 }
150
151 pub(crate) async fn run(mut self) {
153 let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
155
156 info!(
157 "Runner {}-{} starts",
158 self.procedure.type_name(),
159 self.meta.id
160 );
161
162 for key in self.meta.lock_key.keys_to_lock() {
165 let key_guard = match key {
167 StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
168 StringKey::Exclusive(key) => {
169 self.manager_ctx.key_lock.write(key.clone()).await.into()
170 }
171 };
172
173 guard.key_guards.push(key_guard);
174 }
175
176 self.meta.set_start_time_ms();
179 self.execute_procedure_in_loop().await;
180 self.meta.set_end_time_ms();
181
182 guard.finish();
189
190 if self.meta.parent_id.is_none() {
192 let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
193 self.manager_ctx.on_procedures_finish(&procedure_ids);
195
196 if !self.running() {
198 return;
199 }
200
201 for id in procedure_ids {
202 if let Err(e) = self.store.delete_procedure(id).await {
203 error!(
204 e;
205 "Runner {}-{} failed to delete procedure {}",
206 self.procedure.type_name(),
207 self.meta.id,
208 id,
209 );
210 }
211 }
212 }
213
214 info!(
215 "Runner {}-{} exits",
216 self.procedure.type_name(),
217 self.meta.id
218 );
219 }
220
221 async fn execute_procedure_in_loop(&mut self) {
222 let ctx = Context {
223 procedure_id: self.meta.id,
224 provider: self.manager_ctx.clone(),
225 };
226
227 self.rolling_back = false;
228 self.execute_once_with_retry(&ctx).await;
229 }
230
231 async fn execute_once_with_retry(&mut self, ctx: &Context) {
232 let mut retry = self.exponential_builder.build();
233 let mut retry_times = 0;
234
235 let mut rollback = self.exponential_builder.build();
236 let mut rollback_times = 0;
237
238 loop {
239 if !self.running() {
241 self.meta.set_state(ProcedureState::failed(Arc::new(
242 error::ManagerNotStartSnafu {}.build(),
243 )));
244 return;
245 }
246 let state = self.meta.state();
247 match state {
248 ProcedureState::Running => {}
249 ProcedureState::Retrying { error } => {
250 retry_times += 1;
251 if let Some(d) = retry.next() {
252 let millis = d.as_millis() as u64;
253 let noise = rand::rng().random_range(0..(millis / 4) + 1);
255 let d = d.add(Duration::from_millis(noise));
256
257 self.wait_on_err(d, retry_times).await;
258 } else {
259 self.meta
260 .set_state(ProcedureState::prepare_rollback(Arc::new(
261 Error::RetryTimesExceeded {
262 source: error.clone(),
263 procedure_id: self.meta.id,
264 },
265 )));
266 }
267 }
268 ProcedureState::PrepareRollback { error }
269 | ProcedureState::RollingBack { error } => {
270 rollback_times += 1;
271 if let Some(d) = rollback.next() {
272 self.wait_on_err(d, rollback_times).await;
273 } else {
274 let err = Err::<(), Arc<Error>>(error)
275 .context(RollbackTimesExceededSnafu {
276 procedure_id: self.meta.id,
277 })
278 .unwrap_err();
279 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
280 return;
281 }
282 }
283 ProcedureState::Done { .. } => return,
284 ProcedureState::Failed { .. } => return,
285 ProcedureState::Poisoned { .. } => return,
286 }
287 self.execute_once(ctx).await;
288 }
289 }
290
291 async fn clean_poisons(&mut self) -> Result<()> {
292 let mut error = None;
293 for key in self.meta.poison_keys.iter() {
294 let key = key.to_string();
295 if let Err(e) = self
296 .manager_ctx
297 .poison_manager
298 .delete_poison(key, self.meta.id.to_string())
299 .await
300 {
301 error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
302 error = Some(e);
303 }
304 }
305
306 if let Some(e) = error {
308 return Err(e);
309 }
310 Ok(())
311 }
312
313 async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
314 if self.procedure.rollback_supported()
315 && let Err(e) = self.procedure.rollback(ctx).await
316 {
317 self.meta
318 .set_state(ProcedureState::rolling_back(Arc::new(e)));
319 return;
320 }
321 self.meta.set_state(ProcedureState::failed(err));
322 }
323
324 async fn prepare_rollback(&mut self, err: Arc<Error>) {
325 if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
326 self.meta
327 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
328 return;
329 }
330 if self.procedure.rollback_supported() {
331 self.meta.set_state(ProcedureState::rolling_back(err));
332 } else {
333 self.meta.set_state(ProcedureState::failed(err));
334 }
335 }
336
337 async fn execute_once(&mut self, ctx: &Context) {
338 match self.meta.state() {
339 ProcedureState::Running | ProcedureState::Retrying { .. } => {
340 match self.procedure.execute(ctx).await {
341 Ok(status) => {
342 debug!(
343 "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
344 self.procedure.type_name(),
345 self.meta.id,
346 status,
347 status.need_persist(),
348 );
349
350 if !self.running() {
352 self.meta.set_state(ProcedureState::failed(Arc::new(
353 error::ManagerNotStartSnafu {}.build(),
354 )));
355 return;
356 }
357
358 if status.need_clean_poisons()
360 && let Err(e) = self.clean_poisons().await
361 {
362 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
363 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
364 return;
365 }
366
367 if status.need_persist()
368 && let Err(e) = self.persist_procedure().await
369 {
370 error!(e; "Failed to persist procedure: {}", self.meta.id);
371 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
372 return;
373 }
374
375 match status {
376 Status::Executing { .. } => {
377 let prev_state = self.meta.state();
378 if !matches!(prev_state, ProcedureState::Running) {
379 info!(
380 "Set Procedure {}-{} state to running, prev_state: {:?}",
381 self.procedure.type_name(),
382 self.meta.id,
383 prev_state
384 );
385 self.meta.set_state(ProcedureState::Running);
386 }
387 }
388 Status::Suspended { subprocedures, .. } => {
389 let prev_state = self.meta.state();
390 if !matches!(prev_state, ProcedureState::Running) {
391 info!(
392 "Set Procedure {}-{} state to running, prev_state: {:?}",
393 self.procedure.type_name(),
394 self.meta.id,
395 prev_state
396 );
397 self.meta.set_state(ProcedureState::Running);
398 }
399 self.on_suspended(subprocedures).await;
400 }
401 Status::Done { output } => {
402 if let Err(e) = self.commit_procedure().await {
403 error!(e; "Failed to commit procedure: {}", self.meta.id);
404 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
405 return;
406 }
407
408 self.done(output);
409 }
410 Status::Poisoned { error, keys } => {
411 error!(
412 error;
413 "Procedure {}-{} is poisoned, keys: {:?}",
414 self.procedure.type_name(),
415 self.meta.id,
416 keys,
417 );
418 self.meta
419 .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
420 }
421 }
422 }
423 Err(e) => {
424 error!(
425 e;
426 "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
427 self.procedure.type_name(),
428 self.meta.id,
429 e.is_retry_later(),
430 e.need_clean_poisons(),
431 );
432
433 if !self.running() {
435 self.meta.set_state(ProcedureState::failed(Arc::new(
436 error::ManagerNotStartSnafu {}.build(),
437 )));
438 return;
439 }
440
441 if e.need_clean_poisons() {
442 if let Err(e) = self.clean_poisons().await {
443 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
444 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
445 return;
446 }
447 debug!(
448 "Procedure {}-{} cleaned poisons",
449 self.procedure.type_name(),
450 self.meta.id,
451 );
452 }
453
454 if e.is_retry_later() {
455 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
456 return;
457 }
458
459 if self.procedure.rollback_supported() {
460 self.meta
461 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
462 } else {
463 self.meta.set_state(ProcedureState::failed(Arc::new(e)));
464 }
465 }
466 }
467 }
468 ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
469 ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
470 ProcedureState::Failed { .. }
471 | ProcedureState::Done { .. }
472 | ProcedureState::Poisoned { .. } => (),
473 }
474 }
475
476 fn submit_subprocedure(
478 &self,
479 procedure_id: ProcedureId,
480 procedure_state: ProcedureState,
481 procedure: BoxedProcedure,
482 ) {
483 if self.manager_ctx.contains_procedure(procedure_id) {
484 return;
486 }
487
488 let step = 0;
489
490 let meta = Arc::new(ProcedureMeta::new(
491 procedure_id,
492 procedure_state,
493 Some(self.meta.id),
494 procedure.lock_key(),
495 procedure.poison_keys(),
496 procedure.type_name(),
497 self.event_recorder.clone(),
498 procedure.user_metadata(),
499 ));
500 let runner = Runner {
501 meta: meta.clone(),
502 procedure,
503 manager_ctx: self.manager_ctx.clone(),
504 step,
505 exponential_builder: self.exponential_builder,
506 store: self.store.clone(),
507 rolling_back: false,
508 event_recorder: self.event_recorder.clone(),
509 };
510
511 assert!(
515 self.manager_ctx.try_insert_procedure(meta),
516 "Procedure {}-{} submit an existing procedure {}-{}",
517 self.procedure.type_name(),
518 self.meta.id,
519 runner.procedure.type_name(),
520 procedure_id,
521 );
522
523 self.meta.push_child(procedure_id);
525 let parent_id = self.meta.id;
526
527 let tracing_context = TracingContext::from_current_span();
528 let _handle = common_runtime::spawn_global(async move {
529 let span = tracing_context.attach(tracing::info_span!(
530 "LocalManager::submit_subprocedure",
531 procedure_name = %runner.meta.type_name,
532 procedure_id = %runner.meta.id,
533 parent_id = %parent_id,
534 ));
535 runner.run().trace(span).await
539 });
540 }
541
542 async fn wait_on_err(&mut self, d: Duration, i: u64) {
544 info!(
545 "Procedure {}-{} retry for the {} times after {} millis",
546 self.procedure.type_name(),
547 self.meta.id,
548 i,
549 d.as_millis(),
550 );
551 time::sleep(d).await;
552 }
553
554 async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
555 let has_child = !subprocedures.is_empty();
556
557 for sub in &subprocedures {
562 let conflicting = find_lock_conflicts(
563 self.meta.lock_key.keys_to_lock(),
564 sub.procedure.lock_key().keys_to_lock(),
565 );
566 if !conflicting.is_empty() {
567 let err_msg = format!(
568 "Deadlock prevented: subprocedure {}-{} shares conflicting lock key(s) {:?} \
569 with parent {}-{}. Parent holds these locks and would wait for child \
570 completion, but child cannot acquire them.",
571 sub.procedure.type_name(),
572 sub.id,
573 conflicting,
574 self.procedure.type_name(),
575 self.meta.id,
576 );
577 error!("{}", err_msg);
578 let err = Arc::new(Error::external(PlainError::new(
579 err_msg,
580 StatusCode::Internal,
581 )));
582 if self.procedure.rollback_supported() {
583 self.meta.set_state(ProcedureState::prepare_rollback(err));
584 } else {
585 self.meta.set_state(ProcedureState::failed(err));
586 }
587 return;
588 }
589 }
590
591 for subprocedure in subprocedures {
592 info!(
593 "Procedure {}-{} submit subprocedure {}-{}",
594 self.procedure.type_name(),
595 self.meta.id,
596 subprocedure.procedure.type_name(),
597 subprocedure.id,
598 );
599
600 self.submit_subprocedure(
601 subprocedure.id,
602 ProcedureState::Running,
603 subprocedure.procedure,
604 );
605 }
606
607 info!(
608 "Procedure {}-{} is waiting for subprocedures",
609 self.procedure.type_name(),
610 self.meta.id,
611 );
612
613 if has_child {
615 self.meta.child_notify.notified().await;
616
617 info!(
618 "Procedure {}-{} is waked up",
619 self.procedure.type_name(),
620 self.meta.id,
621 );
622 }
623 }
624
625 async fn persist_procedure(&mut self) -> Result<()> {
626 let type_name = self.procedure.type_name().to_string();
627 let data = self.procedure.dump()?;
628
629 self.store
630 .store_procedure(
631 self.meta.id,
632 self.step,
633 type_name,
634 data,
635 self.meta.parent_id,
636 )
637 .await
638 .map_err(|e| {
639 error!(
640 e; "Failed to persist procedure {}-{}",
641 self.procedure.type_name(),
642 self.meta.id
643 );
644 e
645 })?;
646 self.step += 1;
647 Ok(())
648 }
649
650 async fn commit_procedure(&mut self) -> Result<()> {
651 self.store
652 .commit_procedure(self.meta.id, self.step)
653 .await
654 .map_err(|e| {
655 error!(
656 e; "Failed to commit procedure {}-{}",
657 self.procedure.type_name(),
658 self.meta.id
659 );
660 e
661 })?;
662 self.step += 1;
663 Ok(())
664 }
665
666 async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
667 let type_name = self.procedure.type_name().to_string();
669 let data = self.procedure.dump()?;
670 let message = ProcedureMessage {
671 type_name,
672 data,
673 parent_id: self.meta.parent_id,
674 step: self.step,
675 error: Some(error),
676 };
677 self.store
678 .rollback_procedure(self.meta.id, message)
679 .await
680 .map_err(|e| {
681 error!(
682 e; "Failed to write rollback key for procedure {}-{}",
683 self.procedure.type_name(),
684 self.meta.id
685 );
686 e
687 })?;
688 self.step += 1;
689 Ok(())
690 }
691
692 fn done(&self, output: Option<Output>) {
693 info!(
695 "Procedure {}-{} done",
696 self.procedure.type_name(),
697 self.meta.id,
698 );
699
700 self.meta.set_state(ProcedureState::Done { output });
702 }
703}
704
705#[cfg(test)]
706mod tests {
707 use std::assert_matches::assert_matches;
708 use std::sync::Arc;
709 use std::sync::atomic::{AtomicU64, Ordering};
710
711 use async_trait::async_trait;
712 use common_error::ext::{ErrorExt, PlainError};
713 use common_error::mock::MockError;
714 use common_error::status_code::StatusCode;
715 use common_test_util::temp_dir::create_temp_dir;
716 use futures::future::join_all;
717 use futures_util::FutureExt;
718 use futures_util::future::BoxFuture;
719 use object_store::{EntryMode, ObjectStore};
720 use tokio::sync::mpsc;
721 use tokio::sync::watch::Receiver;
722
723 use super::*;
724 use crate::local::{DynamicKeyLockGuard, test_util};
725 use crate::procedure::PoisonKeys;
726 use crate::store::proc_path;
727 use crate::test_util::InMemoryPoisonStore;
728 use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
729
730 const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
731
732 fn new_runner(
733 meta: ProcedureMetaRef,
734 procedure: BoxedProcedure,
735 store: Arc<ProcedureStore>,
736 ) -> Runner {
737 Runner {
738 meta,
739 procedure,
740 manager_ctx: Arc::new(ManagerContext::new(
741 Arc::new(InMemoryPoisonStore::default()),
742 )),
743 step: 0,
744 exponential_builder: ExponentialBuilder::default(),
745 store,
746 rolling_back: false,
747 event_recorder: None,
748 }
749 }
750
751 async fn check_files(
752 object_store: &ObjectStore,
753 procedure_store: &ProcedureStore,
754 procedure_id: ProcedureId,
755 files: &[&str],
756 ) {
757 let dir = proc_path!(procedure_store, "{procedure_id}/");
758 let lister = object_store.list(&dir).await.unwrap();
759 let mut files_in_dir: Vec<_> = lister
760 .into_iter()
761 .filter(|x| x.metadata().mode() == EntryMode::FILE)
762 .map(|de| de.name().to_string())
763 .collect();
764 files_in_dir.sort_unstable();
765 assert_eq!(files, files_in_dir);
766 }
767
768 fn context_with_provider(
769 procedure_id: ProcedureId,
770 provider: Arc<dyn ContextProvider>,
771 ) -> Context {
772 Context {
773 procedure_id,
774 provider,
775 }
776 }
777
778 fn context_without_provider(procedure_id: ProcedureId) -> Context {
779 struct MockProvider;
780
781 #[async_trait]
782 impl ContextProvider for MockProvider {
783 async fn procedure_state(
784 &self,
785 _procedure_id: ProcedureId,
786 ) -> Result<Option<ProcedureState>> {
787 unimplemented!()
788 }
789
790 async fn procedure_state_receiver(
791 &self,
792 _procedure_id: ProcedureId,
793 ) -> Result<Option<Receiver<ProcedureState>>> {
794 unimplemented!()
795 }
796
797 async fn try_put_poison(
798 &self,
799 _key: &PoisonKey,
800 _procedure_id: ProcedureId,
801 ) -> Result<()> {
802 unimplemented!()
803 }
804
805 async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
806 unimplemented!()
807 }
808 }
809
810 Context {
811 procedure_id,
812 provider: Arc::new(MockProvider),
813 }
814 }
815
816 type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
817
818 struct ProcedureAdapter<F> {
819 data: String,
820 lock_key: LockKey,
821 poison_keys: PoisonKeys,
822 exec_fn: F,
823 rollback_fn: Option<RollbackFn>,
824 }
825
826 impl<F> ProcedureAdapter<F> {
827 fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
828 let mut meta = test_util::procedure_meta_for_test();
829 meta.id = ProcedureId::parse_str(uuid).unwrap();
830 meta.lock_key = self.lock_key.clone();
831 meta.poison_keys = self.poison_keys.clone();
832
833 Arc::new(meta)
834 }
835 }
836
837 #[async_trait]
838 impl<F> Procedure for ProcedureAdapter<F>
839 where
840 F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
841 {
842 fn type_name(&self) -> &str {
843 "ProcedureAdapter"
844 }
845
846 async fn execute(&mut self, ctx: &Context) -> Result<Status> {
847 let f = (self.exec_fn)(ctx.clone());
848 f.await
849 }
850
851 async fn rollback(&mut self, ctx: &Context) -> Result<()> {
852 if let Some(f) = &mut self.rollback_fn {
853 return (f)(ctx.clone()).await;
854 }
855 Ok(())
856 }
857
858 fn rollback_supported(&self) -> bool {
859 self.rollback_fn.is_some()
860 }
861
862 fn dump(&self) -> Result<String> {
863 Ok(self.data.clone())
864 }
865
866 fn lock_key(&self) -> LockKey {
867 self.lock_key.clone()
868 }
869
870 fn poison_keys(&self) -> PoisonKeys {
871 self.poison_keys.clone()
872 }
873 }
874
875 async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
876 let mut times = 0;
877 let exec_fn = move |_| {
878 times += 1;
879 async move {
880 if times == 1 {
881 Ok(Status::executing(persist))
882 } else {
883 Ok(Status::done())
884 }
885 }
886 .boxed()
887 };
888 let normal = ProcedureAdapter {
889 data: "normal".to_string(),
890 lock_key: LockKey::single_exclusive("catalog.schema.table"),
891 poison_keys: PoisonKeys::default(),
892 exec_fn,
893 rollback_fn: None,
894 };
895
896 let dir = create_temp_dir("normal");
897 let meta = normal.new_meta(ROOT_ID);
898 let ctx = context_without_provider(meta.id);
899 let object_store = test_util::new_object_store(&dir);
900 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
901 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
902 runner.manager_ctx.start();
903
904 runner.execute_once(&ctx).await;
905 let state = runner.meta.state();
906 assert!(state.is_running(), "{state:?}");
907 check_files(
908 &object_store,
909 &procedure_store,
910 ctx.procedure_id,
911 first_files,
912 )
913 .await;
914
915 runner.execute_once(&ctx).await;
916 let state = runner.meta.state();
917 assert!(state.is_done(), "{state:?}");
918 check_files(
919 &object_store,
920 &procedure_store,
921 ctx.procedure_id,
922 second_files,
923 )
924 .await;
925 }
926
927 #[tokio::test]
928 async fn test_execute_once_normal() {
929 execute_once_normal(
930 true,
931 &["0000000000.step"],
932 &["0000000000.step", "0000000001.commit"],
933 )
934 .await;
935 }
936
937 #[tokio::test]
938 async fn test_execute_once_normal_skip_persist() {
939 execute_once_normal(false, &[], &["0000000000.commit"]).await;
940 }
941
942 #[tokio::test]
943 async fn test_on_suspend_empty() {
944 let exec_fn = move |_| {
945 async move {
946 Ok(Status::Suspended {
947 subprocedures: Vec::new(),
948 persist: false,
949 })
950 }
951 .boxed()
952 };
953 let suspend = ProcedureAdapter {
954 data: "suspend".to_string(),
955 lock_key: LockKey::single_exclusive("catalog.schema.table"),
956 poison_keys: PoisonKeys::default(),
957 exec_fn,
958 rollback_fn: None,
959 };
960
961 let dir = create_temp_dir("suspend");
962 let meta = suspend.new_meta(ROOT_ID);
963 let ctx = context_without_provider(meta.id);
964 let object_store = test_util::new_object_store(&dir);
965 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
966 let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
967 runner.manager_ctx.start();
968
969 runner.execute_once(&ctx).await;
970 let state = runner.meta.state();
971 assert!(state.is_running(), "{state:?}");
972 }
973
974 fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
975 let mut times = 0;
976 let exec_fn = move |_| {
977 times += 1;
978 async move {
979 if times == 1 {
980 time::sleep(Duration::from_millis(200)).await;
981 Ok(Status::executing(true))
982 } else {
983 Ok(Status::done())
984 }
985 }
986 .boxed()
987 };
988 let child = ProcedureAdapter {
989 data: "child".to_string(),
990 lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
991 poison_keys: PoisonKeys::default(),
992 exec_fn,
993 rollback_fn: None,
994 };
995
996 ProcedureWithId {
997 id: procedure_id,
998 procedure: Box::new(child),
999 }
1000 }
1001
1002 #[tokio::test]
1003 async fn test_on_suspend_by_subprocedures() {
1004 let mut times = 0;
1005 let children_ids = [ProcedureId::random(), ProcedureId::random()];
1006 let keys = [
1007 &[
1008 "catalog.schema.table.region-0",
1009 "catalog.schema.table.region-1",
1010 ],
1011 &[
1012 "catalog.schema.table.region-2",
1013 "catalog.schema.table.region-3",
1014 ],
1015 ];
1016
1017 let exec_fn = move |ctx: Context| {
1018 times += 1;
1019 async move {
1020 if times == 1 {
1021 Ok(Status::Suspended {
1023 subprocedures: children_ids
1024 .into_iter()
1025 .zip(keys)
1026 .map(|(id, key_slice)| new_child_procedure(id, key_slice))
1027 .collect(),
1028 persist: true,
1029 })
1030 } else {
1031 let mut all_child_done = true;
1033 for id in children_ids {
1034 let is_not_done = ctx
1035 .provider
1036 .procedure_state(id)
1037 .await
1038 .unwrap()
1039 .map(|s| !s.is_done())
1040 .unwrap_or(true);
1041 if is_not_done {
1042 all_child_done = false;
1043 }
1044 }
1045 if all_child_done {
1046 Ok(Status::done())
1047 } else {
1048 Ok(Status::Suspended {
1050 subprocedures: Vec::new(),
1051 persist: false,
1052 })
1053 }
1054 }
1055 }
1056 .boxed()
1057 };
1058 let parent = ProcedureAdapter {
1059 data: "parent".to_string(),
1060 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1061 poison_keys: PoisonKeys::default(),
1062 exec_fn,
1063 rollback_fn: None,
1064 };
1065
1066 let dir = create_temp_dir("parent");
1067 let meta = parent.new_meta(ROOT_ID);
1068 let procedure_id = meta.id;
1069
1070 let object_store = test_util::new_object_store(&dir);
1071 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1072 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
1073 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1074 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1075 manager_ctx.start();
1076 assert!(manager_ctx.try_insert_procedure(meta));
1078 runner.manager_ctx = manager_ctx.clone();
1080
1081 runner.run().await;
1082 assert!(manager_ctx.key_lock.is_empty());
1083
1084 for child_id in children_ids {
1086 let state = manager_ctx.state(child_id).unwrap();
1087 assert!(state.is_done(), "{state:?}");
1088 }
1089 let state = manager_ctx.state(procedure_id).unwrap();
1090 assert!(state.is_done(), "{state:?}");
1091 check_files(&object_store, &procedure_store, procedure_id, &[]).await;
1093
1094 tokio::time::sleep(Duration::from_millis(5)).await;
1095 manager_ctx.remove_outdated_meta(Duration::from_millis(1));
1097 assert!(manager_ctx.state(procedure_id).is_none());
1098 assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
1099 for child_id in children_ids {
1100 assert!(manager_ctx.state(child_id).is_none());
1101 }
1102 }
1103
1104 #[tokio::test]
1105 async fn test_running_is_stopped() {
1106 let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
1107 let normal = ProcedureAdapter {
1108 data: "normal".to_string(),
1109 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1110 poison_keys: PoisonKeys::default(),
1111 exec_fn,
1112 rollback_fn: None,
1113 };
1114
1115 let dir = create_temp_dir("test_running_is_stopped");
1116 let meta = normal.new_meta(ROOT_ID);
1117 let ctx = context_without_provider(meta.id);
1118 let object_store = test_util::new_object_store(&dir);
1119 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1120 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1121 runner.manager_ctx.start();
1122
1123 runner.execute_once(&ctx).await;
1124 let state = runner.meta.state();
1125 assert!(state.is_running(), "{state:?}");
1126 check_files(
1127 &object_store,
1128 &procedure_store,
1129 ctx.procedure_id,
1130 &["0000000000.step"],
1131 )
1132 .await;
1133
1134 runner.manager_ctx.stop();
1135 runner.execute_once(&ctx).await;
1136 let state = runner.meta.state();
1137 assert!(state.is_failed(), "{state:?}");
1138 check_files(
1140 &object_store,
1141 &procedure_store,
1142 ctx.procedure_id,
1143 &["0000000000.step"],
1144 )
1145 .await;
1146 }
1147
1148 #[tokio::test]
1149 async fn test_running_is_stopped_on_error() {
1150 let exec_fn =
1151 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1152 let normal = ProcedureAdapter {
1153 data: "fail".to_string(),
1154 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1155 poison_keys: PoisonKeys::default(),
1156 exec_fn,
1157 rollback_fn: None,
1158 };
1159
1160 let dir = create_temp_dir("test_running_is_stopped_on_error");
1161 let meta = normal.new_meta(ROOT_ID);
1162 let ctx = context_without_provider(meta.id);
1163 let object_store = test_util::new_object_store(&dir);
1164 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1165 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1166 runner.manager_ctx.stop();
1167
1168 runner.execute_once(&ctx).await;
1169 let state = runner.meta.state();
1170 assert!(state.is_failed(), "{state:?}");
1171 check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1173 }
1174
1175 #[tokio::test]
1176 async fn test_execute_on_error() {
1177 let exec_fn =
1178 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1179 let fail = ProcedureAdapter {
1180 data: "fail".to_string(),
1181 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1182 poison_keys: PoisonKeys::default(),
1183 exec_fn,
1184 rollback_fn: None,
1185 };
1186
1187 let dir = create_temp_dir("fail");
1188 let meta = fail.new_meta(ROOT_ID);
1189 let ctx = context_without_provider(meta.id);
1190 let object_store = test_util::new_object_store(&dir);
1191 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1192 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1193 runner.manager_ctx.start();
1194
1195 runner.execute_once(&ctx).await;
1196 let state = runner.meta.state();
1197 assert!(state.is_failed(), "{state:?}");
1198 check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1199 }
1200
1201 #[tokio::test]
1202 async fn test_execute_with_rollback_on_error() {
1203 let exec_fn =
1204 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1205 let rollback_fn = move |_| async move { Ok(()) }.boxed();
1206 let fail = ProcedureAdapter {
1207 data: "fail".to_string(),
1208 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1209 poison_keys: PoisonKeys::default(),
1210 exec_fn,
1211 rollback_fn: Some(Box::new(rollback_fn)),
1212 };
1213
1214 let dir = create_temp_dir("fail");
1215 let meta = fail.new_meta(ROOT_ID);
1216 let ctx = context_without_provider(meta.id);
1217 let object_store = test_util::new_object_store(&dir);
1218 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1219 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1220 runner.manager_ctx.start();
1221
1222 runner.execute_once(&ctx).await;
1223 let state = runner.meta.state();
1224 assert!(state.is_prepare_rollback(), "{state:?}");
1225
1226 runner.execute_once(&ctx).await;
1227 let state = runner.meta.state();
1228 assert!(state.is_rolling_back(), "{state:?}");
1229
1230 runner.execute_once(&ctx).await;
1231 let state = runner.meta.state();
1232 assert!(state.is_failed(), "{state:?}");
1233 check_files(
1234 &object_store,
1235 &procedure_store,
1236 ctx.procedure_id,
1237 &["0000000000.rollback"],
1238 )
1239 .await;
1240 }
1241
1242 #[tokio::test]
1243 async fn test_execute_on_retry_later_error() {
1244 let mut times = 0;
1245
1246 let exec_fn = move |_| {
1247 times += 1;
1248 async move {
1249 if times == 1 {
1250 Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1251 } else if times == 2 {
1252 Ok(Status::executing(false))
1253 } else {
1254 Ok(Status::done())
1255 }
1256 }
1257 .boxed()
1258 };
1259
1260 let retry_later = ProcedureAdapter {
1261 data: "retry_later".to_string(),
1262 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1263 poison_keys: PoisonKeys::default(),
1264 exec_fn,
1265 rollback_fn: None,
1266 };
1267
1268 let dir = create_temp_dir("retry_later");
1269 let meta = retry_later.new_meta(ROOT_ID);
1270 let ctx = context_without_provider(meta.id);
1271 let object_store = test_util::new_object_store(&dir);
1272 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1273 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1274 runner.manager_ctx.start();
1275 runner.execute_once(&ctx).await;
1276 let state = runner.meta.state();
1277 assert!(state.is_retrying(), "{state:?}");
1278
1279 runner.execute_once(&ctx).await;
1280 let state = runner.meta.state();
1281 assert!(state.is_running(), "{state:?}");
1282
1283 runner.execute_once(&ctx).await;
1284 let state = runner.meta.state();
1285 assert!(state.is_done(), "{state:?}");
1286 assert!(meta.state().is_done());
1287 check_files(
1288 &object_store,
1289 &procedure_store,
1290 ctx.procedure_id,
1291 &["0000000000.commit"],
1292 )
1293 .await;
1294 }
1295
1296 #[tokio::test(flavor = "multi_thread")]
1297 async fn test_execute_on_retry_later_error_with_child() {
1298 common_telemetry::init_default_ut_logging();
1299 let mut times = 0;
1300 let child_id = ProcedureId::random();
1301
1302 let exec_fn = move |_| {
1303 times += 1;
1304 async move {
1305 debug!("times: {}", times);
1306 if times == 1 {
1307 Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1308 } else if times == 2 {
1309 let exec_fn = |_| {
1310 async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1311 .boxed()
1312 };
1313 let fail = ProcedureAdapter {
1314 data: "fail".to_string(),
1315 lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1316 poison_keys: PoisonKeys::default(),
1317 exec_fn,
1318 rollback_fn: None,
1319 };
1320
1321 Ok(Status::Suspended {
1322 subprocedures: vec![ProcedureWithId {
1323 id: child_id,
1324 procedure: Box::new(fail),
1325 }],
1326 persist: true,
1327 })
1328 } else {
1329 Ok(Status::done())
1330 }
1331 }
1332 .boxed()
1333 };
1334
1335 let retry_later = ProcedureAdapter {
1336 data: "retry_later".to_string(),
1337 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1338 poison_keys: PoisonKeys::default(),
1339 exec_fn,
1340 rollback_fn: None,
1341 };
1342
1343 let dir = create_temp_dir("retry_later");
1344 let meta = retry_later.new_meta(ROOT_ID);
1345 let ctx = context_without_provider(meta.id);
1346 let object_store = test_util::new_object_store(&dir);
1347 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1348 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1349 runner.manager_ctx.start();
1350 debug!("execute_once 1");
1351 runner.execute_once(&ctx).await;
1352 let state = runner.meta.state();
1353 assert!(state.is_retrying(), "{state:?}");
1354
1355 let moved_meta = meta.clone();
1356 tokio::spawn(async move {
1357 moved_meta.child_notify.notify_one();
1358 });
1359 runner.execute_once(&ctx).await;
1360 let state = runner.meta.state();
1361 assert!(state.is_running(), "{state:?}");
1362
1363 runner.execute_once(&ctx).await;
1364 let state = runner.meta.state();
1365 assert!(state.is_done(), "{state:?}");
1366 assert!(meta.state().is_done());
1367 check_files(
1368 &object_store,
1369 &procedure_store,
1370 ctx.procedure_id,
1371 &["0000000000.step", "0000000001.commit"],
1372 )
1373 .await;
1374 }
1375
1376 #[tokio::test]
1377 async fn test_execute_exceed_max_retry_later() {
1378 let exec_fn =
1379 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1380
1381 let exceed_max_retry_later = ProcedureAdapter {
1382 data: "exceed_max_retry_later".to_string(),
1383 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1384 poison_keys: PoisonKeys::default(),
1385 exec_fn,
1386 rollback_fn: None,
1387 };
1388
1389 let dir = create_temp_dir("exceed_max_retry_later");
1390 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1391 let object_store = test_util::new_object_store(&dir);
1392 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1393 let mut runner = new_runner(
1394 meta.clone(),
1395 Box::new(exceed_max_retry_later),
1396 procedure_store,
1397 );
1398 runner.manager_ctx.start();
1399
1400 runner.exponential_builder = ExponentialBuilder::default()
1401 .with_min_delay(Duration::from_millis(1))
1402 .with_max_times(3);
1403
1404 runner.execute_procedure_in_loop().await;
1406 let err = meta.state().error().unwrap().to_string();
1407 assert!(err.contains("Procedure retry exceeded max times"));
1408 }
1409
1410 #[tokio::test]
1411 async fn test_rollback_exceed_max_retry_later() {
1412 let exec_fn =
1413 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1414 let rollback_fn = move |_| {
1415 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1416 };
1417 let exceed_max_retry_later = ProcedureAdapter {
1418 data: "exceed_max_rollback".to_string(),
1419 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1420 poison_keys: PoisonKeys::default(),
1421 exec_fn,
1422 rollback_fn: Some(Box::new(rollback_fn)),
1423 };
1424
1425 let dir = create_temp_dir("exceed_max_rollback");
1426 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1427 let object_store = test_util::new_object_store(&dir);
1428 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1429 let mut runner = new_runner(
1430 meta.clone(),
1431 Box::new(exceed_max_retry_later),
1432 procedure_store,
1433 );
1434 runner.manager_ctx.start();
1435 runner.exponential_builder = ExponentialBuilder::default()
1436 .with_min_delay(Duration::from_millis(1))
1437 .with_max_times(3);
1438
1439 runner.execute_procedure_in_loop().await;
1441 let err = meta.state().error().unwrap().to_string();
1442 assert!(err.contains("Procedure rollback exceeded max times"));
1443 }
1444
1445 #[tokio::test]
1446 async fn test_rollback_after_retry_fail() {
1447 let exec_fn = move |_| {
1448 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1449 };
1450
1451 let (tx, mut rx) = mpsc::channel(1);
1452 let rollback_fn = move |_| {
1453 let tx = tx.clone();
1454 async move {
1455 tx.send(()).await.unwrap();
1456 Ok(())
1457 }
1458 .boxed()
1459 };
1460 let retry_later = ProcedureAdapter {
1461 data: "rollback_after_retry_fail".to_string(),
1462 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1463 poison_keys: PoisonKeys::default(),
1464 exec_fn,
1465 rollback_fn: Some(Box::new(rollback_fn)),
1466 };
1467
1468 let dir = create_temp_dir("retry_later");
1469 let meta = retry_later.new_meta(ROOT_ID);
1470 let ctx = context_without_provider(meta.id);
1471 let object_store = test_util::new_object_store(&dir);
1472 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1473 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1474 runner.manager_ctx.start();
1475 runner.exponential_builder = ExponentialBuilder::default()
1476 .with_min_delay(Duration::from_millis(1))
1477 .with_max_times(3);
1478 runner.execute_procedure_in_loop().await;
1480 rx.recv().await.unwrap();
1481 assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1482 check_files(
1483 &object_store,
1484 &procedure_store,
1485 ctx.procedure_id,
1486 &["0000000000.rollback"],
1487 )
1488 .await;
1489 }
1490
1491 #[tokio::test]
1492 async fn test_child_error() {
1493 let mut times = 0;
1494 let child_id = ProcedureId::random();
1495 common_telemetry::init_default_ut_logging();
1496 let exec_fn = move |ctx: Context| {
1497 times += 1;
1498 async move {
1499 if times == 1 {
1500 let exec_fn = |_| {
1502 async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1503 .boxed()
1504 };
1505 let fail = ProcedureAdapter {
1506 data: "fail".to_string(),
1507 lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1508 poison_keys: PoisonKeys::default(),
1509 exec_fn,
1510 rollback_fn: None,
1511 };
1512
1513 Ok(Status::Suspended {
1514 subprocedures: vec![ProcedureWithId {
1515 id: child_id,
1516 procedure: Box::new(fail),
1517 }],
1518 persist: true,
1519 })
1520 } else {
1521 let state = ctx.provider.procedure_state(child_id).await.unwrap();
1523 let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1524 if is_failed {
1525 Err(Error::from_error_ext(PlainError::new(
1527 "subprocedure failed".to_string(),
1528 StatusCode::Unexpected,
1529 )))
1530 } else {
1531 Ok(Status::Suspended {
1533 subprocedures: Vec::new(),
1534 persist: false,
1535 })
1536 }
1537 }
1538 }
1539 .boxed()
1540 };
1541 let parent = ProcedureAdapter {
1542 data: "parent".to_string(),
1543 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1544 poison_keys: PoisonKeys::default(),
1545 exec_fn,
1546 rollback_fn: None,
1547 };
1548
1549 let dir = create_temp_dir("child_err");
1550 let meta = parent.new_meta(ROOT_ID);
1551
1552 let object_store = test_util::new_object_store(&dir);
1553 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1554 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1555 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1556 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1557 manager_ctx.start();
1558 assert!(manager_ctx.try_insert_procedure(meta.clone()));
1560 runner.manager_ctx = manager_ctx.clone();
1562
1563 runner.run().await;
1565 assert!(manager_ctx.key_lock.is_empty());
1566 let err = meta.state().error().unwrap().output_msg();
1567 assert!(err.contains("subprocedure failed"), "{err}");
1568 }
1569
1570 #[tokio::test]
1571 async fn test_execute_with_clean_poisons() {
1572 common_telemetry::init_default_ut_logging();
1573 let mut times = 0;
1574 let poison_key = PoisonKey::new("table/1024");
1575 let moved_poison_key = poison_key.clone();
1576 let exec_fn = move |ctx: Context| {
1577 times += 1;
1578 let poison_key = moved_poison_key.clone();
1579 async move {
1580 if times == 1 {
1581 ctx.provider
1583 .try_put_poison(&poison_key, ctx.procedure_id)
1584 .await
1585 .unwrap();
1586
1587 Ok(Status::executing(true))
1588 } else {
1589 Ok(Status::executing_with_clean_poisons(true))
1590 }
1591 }
1592 .boxed()
1593 };
1594 let poison = ProcedureAdapter {
1595 data: "poison".to_string(),
1596 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1597 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1598 exec_fn,
1599 rollback_fn: None,
1600 };
1601
1602 let dir = create_temp_dir("clean_poisons");
1603 let meta = poison.new_meta(ROOT_ID);
1604
1605 let object_store = test_util::new_object_store(&dir);
1606 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1607 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1608
1609 let ctx = context_with_provider(
1611 meta.id,
1612 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1613 );
1614 runner
1616 .manager_ctx
1617 .procedures
1618 .write()
1619 .unwrap()
1620 .insert(meta.id, runner.meta.clone());
1621
1622 runner.manager_ctx.start();
1623 runner.execute_once(&ctx).await;
1624 let state = runner.meta.state();
1625 assert!(state.is_running(), "{state:?}");
1626
1627 let procedure_id = runner
1628 .manager_ctx
1629 .poison_manager
1630 .get_poison(&poison_key.to_string())
1631 .await
1632 .unwrap();
1633 assert!(procedure_id.is_some());
1635
1636 runner.execute_once(&ctx).await;
1637 let state = runner.meta.state();
1638 assert!(state.is_running(), "{state:?}");
1639
1640 let procedure_id = runner
1641 .manager_ctx
1642 .poison_manager
1643 .get_poison(&poison_key.to_string())
1644 .await
1645 .unwrap();
1646 assert!(procedure_id.is_none());
1648 }
1649
1650 #[tokio::test]
1651 async fn test_execute_error_with_clean_poisons() {
1652 common_telemetry::init_default_ut_logging();
1653 let mut times = 0;
1654 let poison_key = PoisonKey::new("table/1024");
1655 let moved_poison_key = poison_key.clone();
1656 let exec_fn = move |ctx: Context| {
1657 times += 1;
1658 let poison_key = moved_poison_key.clone();
1659 async move {
1660 if times == 1 {
1661 ctx.provider
1663 .try_put_poison(&poison_key, ctx.procedure_id)
1664 .await
1665 .unwrap();
1666
1667 Ok(Status::executing(true))
1668 } else {
1669 Err(Error::external_and_clean_poisons(MockError::new(
1670 StatusCode::Unexpected,
1671 )))
1672 }
1673 }
1674 .boxed()
1675 };
1676 let poison = ProcedureAdapter {
1677 data: "poison".to_string(),
1678 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1679 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1680 exec_fn,
1681 rollback_fn: None,
1682 };
1683
1684 let dir = create_temp_dir("error_with_clean_poisons");
1685 let meta = poison.new_meta(ROOT_ID);
1686
1687 let object_store = test_util::new_object_store(&dir);
1688 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1689 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1690
1691 let ctx = context_with_provider(
1693 meta.id,
1694 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1695 );
1696 runner
1698 .manager_ctx
1699 .procedures
1700 .write()
1701 .unwrap()
1702 .insert(meta.id, runner.meta.clone());
1703
1704 runner.manager_ctx.start();
1705 runner.execute_once(&ctx).await;
1706 let state = runner.meta.state();
1707 assert!(state.is_running(), "{state:?}");
1708
1709 let procedure_id = runner
1710 .manager_ctx
1711 .poison_manager
1712 .get_poison(&poison_key.to_string())
1713 .await
1714 .unwrap();
1715 assert!(procedure_id.is_some());
1717
1718 runner.execute_once(&ctx).await;
1719 let state = runner.meta.state();
1720 assert!(state.is_failed(), "{state:?}");
1721
1722 let procedure_id = runner
1723 .manager_ctx
1724 .poison_manager
1725 .get_poison(&poison_key.to_string())
1726 .await
1727 .unwrap();
1728 assert!(procedure_id.is_none());
1730 }
1731
1732 #[tokio::test]
1733 async fn test_execute_failed_after_set_poison() {
1734 let mut times = 0;
1735 let poison_key = PoisonKey::new("table/1024");
1736 let moved_poison_key = poison_key.clone();
1737 let exec_fn = move |ctx: Context| {
1738 times += 1;
1739 let poison_key = moved_poison_key.clone();
1740 async move {
1741 if times == 1 {
1742 Ok(Status::executing(true))
1743 } else {
1744 ctx.provider
1746 .try_put_poison(&poison_key, ctx.procedure_id)
1747 .await
1748 .unwrap();
1749 Err(Error::external(MockError::new(StatusCode::Unexpected)))
1750 }
1751 }
1752 .boxed()
1753 };
1754 let poison = ProcedureAdapter {
1755 data: "poison".to_string(),
1756 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1757 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1758 exec_fn,
1759 rollback_fn: None,
1760 };
1761
1762 let dir = create_temp_dir("poison");
1763 let meta = poison.new_meta(ROOT_ID);
1764
1765 let object_store = test_util::new_object_store(&dir);
1766 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1767 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1768
1769 let ctx = context_with_provider(
1771 meta.id,
1772 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1773 );
1774 runner
1776 .manager_ctx
1777 .procedures
1778 .write()
1779 .unwrap()
1780 .insert(meta.id, runner.meta.clone());
1781
1782 runner.manager_ctx.start();
1783 runner.execute_once(&ctx).await;
1784 let state = runner.meta.state();
1785 assert!(state.is_running(), "{state:?}");
1786
1787 runner.execute_once(&ctx).await;
1788 let state = runner.meta.state();
1789 assert!(state.is_failed(), "{state:?}");
1790 assert!(meta.state().is_failed());
1791
1792 let procedure_id = runner
1794 .manager_ctx
1795 .poison_manager
1796 .get_poison(&poison_key.to_string())
1797 .await
1798 .unwrap()
1799 .unwrap();
1800
1801 assert_eq!(&procedure_id.clone(), ROOT_ID);
1803 }
1804
1805 #[tokio::test]
1806 async fn test_execute_exceed_max_retry_after_set_poison() {
1807 common_telemetry::init_default_ut_logging();
1808 let mut times = 0;
1809 let poison_key = PoisonKey::new("table/1024");
1810 let moved_poison_key = poison_key.clone();
1811 let exec_fn = move |ctx: Context| {
1812 times += 1;
1813 let poison_key = moved_poison_key.clone();
1814 async move {
1815 if times == 1 {
1816 Ok(Status::executing(true))
1817 } else {
1818 ctx.provider
1820 .try_put_poison(&poison_key, ctx.procedure_id)
1821 .await
1822 .unwrap();
1823 Err(Error::retry_later_and_clean_poisons(MockError::new(
1824 StatusCode::Unexpected,
1825 )))
1826 }
1827 }
1828 .boxed()
1829 };
1830 let poison = ProcedureAdapter {
1831 data: "poison".to_string(),
1832 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1833 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1834 exec_fn,
1835 rollback_fn: None,
1836 };
1837
1838 let dir = create_temp_dir("exceed_max_after_set_poison");
1839 let meta = poison.new_meta(ROOT_ID);
1840 let object_store = test_util::new_object_store(&dir);
1841 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1842 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
1843 runner.manager_ctx.start();
1844 runner.exponential_builder = ExponentialBuilder::default()
1845 .with_min_delay(Duration::from_millis(1))
1846 .with_max_times(3);
1847 let ctx = context_with_provider(
1849 meta.id,
1850 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1851 );
1852 runner
1854 .manager_ctx
1855 .procedures
1856 .write()
1857 .unwrap()
1858 .insert(meta.id, runner.meta.clone());
1859 runner.execute_once_with_retry(&ctx).await;
1861 let err = meta.state().error().unwrap().clone();
1862 assert_matches!(&*err, Error::RetryTimesExceeded { .. });
1863
1864 let procedure_id = runner
1866 .manager_ctx
1867 .poison_manager
1868 .get_poison(&poison_key.to_string())
1869 .await
1870 .unwrap();
1871 assert_eq!(procedure_id, None);
1872 }
1873
1874 #[tokio::test]
1875 async fn test_execute_poisoned() {
1876 let mut times = 0;
1877 let poison_key = PoisonKey::new("table/1024");
1878 let moved_poison_key = poison_key.clone();
1879 let exec_fn = move |ctx: Context| {
1880 times += 1;
1881 let poison_key = moved_poison_key.clone();
1882 async move {
1883 if times == 1 {
1884 Ok(Status::executing(true))
1885 } else {
1886 ctx.provider
1888 .try_put_poison(&poison_key, ctx.procedure_id)
1889 .await
1890 .unwrap();
1891 Ok(Status::Poisoned {
1892 keys: PoisonKeys::new(vec![poison_key.clone()]),
1893 error: Error::external(MockError::new(StatusCode::Unexpected)),
1894 })
1895 }
1896 }
1897 .boxed()
1898 };
1899 let poison = ProcedureAdapter {
1900 data: "poison".to_string(),
1901 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1902 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1903 exec_fn,
1904 rollback_fn: None,
1905 };
1906
1907 let dir = create_temp_dir("poison");
1908 let meta = poison.new_meta(ROOT_ID);
1909
1910 let object_store = test_util::new_object_store(&dir);
1911 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1912 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1913
1914 let ctx = context_with_provider(
1916 meta.id,
1917 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1918 );
1919 runner
1921 .manager_ctx
1922 .procedures
1923 .write()
1924 .unwrap()
1925 .insert(meta.id, runner.meta.clone());
1926
1927 runner.manager_ctx.start();
1928 runner.execute_once(&ctx).await;
1929 let state = runner.meta.state();
1930 assert!(state.is_running(), "{state:?}");
1931
1932 runner.execute_once(&ctx).await;
1933 let state = runner.meta.state();
1934 assert!(state.is_poisoned(), "{state:?}");
1935 assert!(meta.state().is_poisoned());
1936 check_files(
1937 &object_store,
1938 &procedure_store,
1939 ctx.procedure_id,
1940 &["0000000000.step"],
1941 )
1942 .await;
1943
1944 let procedure_id = runner
1946 .manager_ctx
1947 .poison_manager
1948 .get_poison(&poison_key.to_string())
1949 .await
1950 .unwrap()
1951 .unwrap();
1952
1953 assert_eq!(procedure_id, ROOT_ID);
1955 }
1956
1957 fn test_procedure_with_dynamic_lock(
1958 shared_atomic_value: Arc<AtomicU64>,
1959 id: u64,
1960 ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1961 let exec_fn = move |ctx: Context| {
1962 let moved_shared_atomic_value = shared_atomic_value.clone();
1963 let moved_ctx = ctx.clone();
1964 async move {
1965 debug!("Acquiring write lock, id: {}", id);
1966 let key = StringKey::Exclusive("test_lock".to_string());
1967 let guard = moved_ctx.provider.acquire_lock(&key).await;
1968 debug!("Acquired write lock, id: {}", id);
1969 let millis = rand::rng().random_range(10..=50);
1970 tokio::time::sleep(Duration::from_millis(millis)).await;
1971 let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1972 moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1973 debug!("Dropping write lock, id: {}", id);
1974 drop(guard);
1975
1976 Ok(Status::done())
1977 }
1978 .boxed()
1979 };
1980
1981 let adapter = ProcedureAdapter {
1982 data: "dynamic_lock".to_string(),
1983 lock_key: LockKey::new_exclusive([]),
1984 poison_keys: PoisonKeys::new([]),
1985 exec_fn,
1986 rollback_fn: None,
1987 };
1988 let meta = adapter.new_meta(ROOT_ID);
1989
1990 (Box::new(adapter), meta)
1991 }
1992
1993 #[tokio::test(flavor = "multi_thread")]
1994 async fn test_execute_with_dynamic_lock() {
1995 common_telemetry::init_default_ut_logging();
1996 let shared_atomic_value = Arc::new(AtomicU64::new(0));
1997 let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1998 let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1999
2000 let dir = create_temp_dir("dynamic_lock");
2001 let object_store = test_util::new_object_store(&dir);
2002 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
2003 let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
2004 let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
2005 let ctx1 = context_with_provider(
2006 meta1.id,
2007 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
2008 );
2009 let ctx2 = context_with_provider(
2010 meta2.id,
2011 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
2013 );
2014 let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
2015 join_all(tasks).await;
2016 assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
2017 }
2018 #[tokio::test]
2019 async fn test_on_suspend_deadlock_detected_no_rollback() {
2020 let child_id = ProcedureId::random();
2023 let exec_fn = move |_| {
2024 async move {
2025 let child_exec_fn = |_| async { Ok(Status::done()) }.boxed();
2026 let child = ProcedureAdapter {
2027 data: "child".to_string(),
2028 lock_key: LockKey::single_exclusive("catalog.schema.table"),
2029 poison_keys: PoisonKeys::default(),
2030 exec_fn: child_exec_fn,
2031 rollback_fn: None,
2032 };
2033 Ok(Status::Suspended {
2034 subprocedures: vec![ProcedureWithId {
2035 id: child_id,
2036 procedure: Box::new(child),
2037 }],
2038 persist: false,
2039 })
2040 }
2041 .boxed()
2042 };
2043 let parent = ProcedureAdapter {
2044 data: "parent".to_string(),
2045 lock_key: LockKey::single_exclusive("catalog.schema.table"),
2046 poison_keys: PoisonKeys::default(),
2047 exec_fn,
2048 rollback_fn: None, };
2050
2051 let dir = create_temp_dir("deadlock_no_rollback");
2052 let meta = parent.new_meta(ROOT_ID);
2053 let ctx = context_without_provider(meta.id);
2054 let object_store = test_util::new_object_store(&dir);
2055 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
2056 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
2057 runner.manager_ctx.start();
2058
2059 runner.execute_once(&ctx).await;
2060 let state = runner.meta.state();
2061 assert!(state.is_failed(), "Expected Failed, got {state:?}");
2062 assert!(
2064 state.error().is_some(),
2065 "Failed state should contain an error"
2066 );
2067 assert!(
2069 !runner.manager_ctx.contains_procedure(child_id),
2070 "Child procedure should not be submitted when deadlock is detected"
2071 );
2072 }
2073
2074 #[tokio::test]
2075 async fn test_on_suspend_deadlock_detected_with_rollback() {
2076 let child_id = ProcedureId::random();
2079 let exec_fn = move |_| {
2080 async move {
2081 let child_exec_fn = |_| async { Ok(Status::done()) }.boxed();
2082 let child = ProcedureAdapter {
2083 data: "child".to_string(),
2084 lock_key: LockKey::single_exclusive("catalog.schema.table"),
2085 poison_keys: PoisonKeys::default(),
2086 exec_fn: child_exec_fn,
2087 rollback_fn: None,
2088 };
2089 Ok(Status::Suspended {
2090 subprocedures: vec![ProcedureWithId {
2091 id: child_id,
2092 procedure: Box::new(child),
2093 }],
2094 persist: false,
2095 })
2096 }
2097 .boxed()
2098 };
2099 let rollback_fn = move |_| async move { Ok(()) }.boxed();
2100 let parent = ProcedureAdapter {
2101 data: "parent".to_string(),
2102 lock_key: LockKey::single_exclusive("catalog.schema.table"),
2103 poison_keys: PoisonKeys::default(),
2104 exec_fn,
2105 rollback_fn: Some(Box::new(rollback_fn)), };
2107
2108 let dir = create_temp_dir("deadlock_with_rollback");
2109 let meta = parent.new_meta(ROOT_ID);
2110 let ctx = context_without_provider(meta.id);
2111 let object_store = test_util::new_object_store(&dir);
2112 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
2113 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
2114 runner.manager_ctx.start();
2115
2116 runner.execute_once(&ctx).await;
2117 let state = runner.meta.state();
2118 assert!(
2119 state.is_prepare_rollback(),
2120 "Expected PrepareRollback, got {state:?}"
2121 );
2122 match &state {
2124 ProcedureState::PrepareRollback { error } => {
2125 assert!(!error.to_string().is_empty(), "Error should not be empty");
2126 }
2127 _ => panic!("Expected PrepareRollback, got {state:?}"),
2128 }
2129 assert!(
2131 !runner.manager_ctx.contains_procedure(child_id),
2132 "Child procedure should not be submitted when deadlock is detected"
2133 );
2134 }
2135
2136 #[test]
2137 fn test_find_lock_conflicts() {
2138 use crate::procedure::StringKey;
2139
2140 let parent = [StringKey::Share("A".to_string())];
2142 let child = [StringKey::Share("A".to_string())];
2143 assert!(super::find_lock_conflicts(parent.iter(), child.iter()).is_empty());
2144
2145 let parent = [StringKey::Share("A".to_string())];
2147 let child = [StringKey::Exclusive("A".to_string())];
2148 assert_eq!(
2149 super::find_lock_conflicts(parent.iter(), child.iter()),
2150 vec!["A".to_string()]
2151 );
2152
2153 let parent = [StringKey::Exclusive("A".to_string())];
2155 let child = [StringKey::Share("A".to_string())];
2156 assert_eq!(
2157 super::find_lock_conflicts(parent.iter(), child.iter()),
2158 vec!["A".to_string()]
2159 );
2160
2161 let parent = [StringKey::Exclusive("A".to_string())];
2163 let child = [StringKey::Exclusive("A".to_string())];
2164 assert_eq!(
2165 super::find_lock_conflicts(parent.iter(), child.iter()),
2166 vec!["A".to_string()]
2167 );
2168
2169 let parent = [
2171 StringKey::Share("A".to_string()),
2172 StringKey::Exclusive("B".to_string()),
2173 ];
2174 let child = [
2175 StringKey::Exclusive("A".to_string()), StringKey::Share("B".to_string()), StringKey::Exclusive("C".to_string()), ];
2179 let mut conflicts = super::find_lock_conflicts(parent.iter(), child.iter());
2180 conflicts.sort();
2181 assert_eq!(conflicts, vec!["A".to_string(), "B".to_string()]);
2182 }
2183}