1use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_error::ext::PlainError;
21use common_error::status_code::StatusCode;
22use common_event_recorder::EventRecorderRef;
23use common_telemetry::tracing_context::{FutureExt, TracingContext};
24use common_telemetry::{debug, error, info, tracing};
25use rand::Rng;
26use snafu::ResultExt;
27use tokio::time;
28
29use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
30use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
31use crate::procedure::{Output, StringKey};
32use crate::rwlock::OwnedKeyRwLockGuard;
33use crate::store::{ProcedureMessage, ProcedureStore};
34use crate::{
35 BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
36};
37
38struct ProcedureGuard {
40 meta: ProcedureMetaRef,
41 manager_ctx: Arc<ManagerContext>,
42 key_guards: Vec<OwnedKeyRwLockGuard>,
43 finish: bool,
44}
45
46impl ProcedureGuard {
47 fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
49 ProcedureGuard {
50 meta,
51 manager_ctx,
52 key_guards: vec![],
53 finish: false,
54 }
55 }
56
57 fn finish(mut self) {
59 self.finish = true;
60 }
61}
62
63impl Drop for ProcedureGuard {
64 fn drop(&mut self) {
65 if !self.finish {
66 error!("Procedure {} exits unexpectedly", self.meta.id);
67
68 let err = ProcedurePanicSnafu {
72 procedure_id: self.meta.id,
73 }
74 .build();
75 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
76 }
77
78 if let Some(parent_id) = self.meta.parent_id {
80 self.manager_ctx.notify_by_subprocedure(parent_id);
81 }
82
83 while !self.key_guards.is_empty() {
85 self.key_guards.pop();
86 }
87
88 self.manager_ctx
90 .key_lock
91 .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
92 }
93}
94
95fn find_lock_conflicts<'a>(
101 parent_keys: impl Iterator<Item = &'a StringKey>,
102 child_keys: impl Iterator<Item = &'a StringKey>,
103) -> Vec<String> {
104 use std::collections::HashMap;
105
106 let mut parent_map = HashMap::new();
108 for key in parent_keys {
109 match key {
110 StringKey::Exclusive(k) => {
111 parent_map.insert(k.as_str(), true);
112 }
113 StringKey::Share(k) => {
114 parent_map.entry(k.as_str()).or_insert(false);
115 }
116 }
117 }
118
119 child_keys
120 .filter_map(|child_key| match child_key {
121 StringKey::Exclusive(k) | StringKey::Share(k)
122 if parent_map.get(k.as_str()) == Some(&true) =>
123 {
124 Some(k.clone())
125 }
126 StringKey::Exclusive(k) if parent_map.get(k.as_str()) == Some(&false) => {
127 Some(k.clone())
128 }
129 _ => None,
130 })
131 .collect()
132}
133
134pub(crate) struct Runner {
135 pub(crate) meta: ProcedureMetaRef,
136 pub(crate) procedure: BoxedProcedure,
137 pub(crate) manager_ctx: Arc<ManagerContext>,
138 pub(crate) step: u32,
139 pub(crate) exponential_builder: ExponentialBuilder,
140 pub(crate) store: Arc<ProcedureStore>,
141 pub(crate) rolling_back: bool,
142 pub(crate) event_recorder: Option<EventRecorderRef>,
143}
144
145impl Runner {
146 pub(crate) fn running(&self) -> bool {
148 self.manager_ctx.running()
149 }
150
151 pub(crate) async fn run(mut self) {
153 let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
155
156 info!(
157 "Runner {}-{} starts",
158 self.procedure.type_name(),
159 self.meta.id
160 );
161
162 for key in self.meta.lock_key.keys_to_lock() {
165 let key_guard = match key {
167 StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
168 StringKey::Exclusive(key) => {
169 self.manager_ctx.key_lock.write(key.clone()).await.into()
170 }
171 };
172
173 guard.key_guards.push(key_guard);
174 }
175
176 self.meta.set_start_time_ms();
179 self.execute_procedure_in_loop().await;
180 self.meta.set_end_time_ms();
181
182 guard.finish();
189
190 if self.meta.parent_id.is_none() {
192 let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
193 self.manager_ctx.on_procedures_finish(&procedure_ids);
195
196 if !self.running() {
198 return;
199 }
200
201 for id in procedure_ids {
202 if let Err(e) = self.store.delete_procedure(id).await {
203 error!(
204 e;
205 "Runner {}-{} failed to delete procedure {}",
206 self.procedure.type_name(),
207 self.meta.id,
208 id,
209 );
210 }
211 }
212 }
213
214 info!(
215 "Runner {}-{} exits",
216 self.procedure.type_name(),
217 self.meta.id
218 );
219 }
220
221 async fn execute_procedure_in_loop(&mut self) {
222 let ctx = Context {
223 procedure_id: self.meta.id,
224 provider: self.manager_ctx.clone(),
225 };
226
227 self.rolling_back = false;
228 self.execute_once_with_retry(&ctx).await;
229 }
230
231 async fn execute_once_with_retry(&mut self, ctx: &Context) {
232 let mut retry = self.exponential_builder.build();
233 let mut retry_times = 0;
234
235 let mut rollback = self.exponential_builder.build();
236 let mut rollback_times = 0;
237
238 loop {
239 if !self.running() {
241 self.meta.set_state(ProcedureState::failed(Arc::new(
242 error::ManagerNotStartSnafu {}.build(),
243 )));
244 return;
245 }
246 let state = self.meta.state();
247 match state {
248 ProcedureState::Running => {}
249 ProcedureState::Retrying { error } => {
250 retry_times += 1;
251 if let Some(d) = retry.next() {
252 let millis = d.as_millis() as u64;
253 let noise = rand::rng().random_range(0..(millis / 4) + 1);
255 let d = d.add(Duration::from_millis(noise));
256
257 self.wait_on_err(d, retry_times).await;
258 } else {
259 self.meta
260 .set_state(ProcedureState::prepare_rollback(Arc::new(
261 Error::RetryTimesExceeded {
262 source: error.clone(),
263 procedure_id: self.meta.id,
264 },
265 )));
266 }
267 }
268 ProcedureState::PrepareRollback { error }
269 | ProcedureState::RollingBack { error } => {
270 rollback_times += 1;
271 if let Some(d) = rollback.next() {
272 self.wait_on_err(d, rollback_times).await;
273 } else {
274 let err = Err::<(), Arc<Error>>(error)
275 .context(RollbackTimesExceededSnafu {
276 procedure_id: self.meta.id,
277 })
278 .unwrap_err();
279 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
280 return;
281 }
282 }
283 ProcedureState::Done { .. } => return,
284 ProcedureState::Failed { .. } => return,
285 ProcedureState::Poisoned { .. } => return,
286 }
287 self.execute_once(ctx).await;
288 }
289 }
290
291 async fn clean_poisons(&mut self) -> Result<()> {
292 let mut error = None;
293 for key in self.meta.poison_keys.iter() {
294 let key = key.to_string();
295 if let Err(e) = self
296 .manager_ctx
297 .poison_manager
298 .delete_poison(key, self.meta.id.to_string())
299 .await
300 {
301 error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
302 error = Some(e);
303 }
304 }
305
306 if let Some(e) = error {
308 return Err(e);
309 }
310 Ok(())
311 }
312
313 async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
314 if self.procedure.rollback_supported()
315 && let Err(e) = self.procedure.rollback(ctx).await
316 {
317 self.meta
318 .set_state(ProcedureState::rolling_back(Arc::new(e)));
319 return;
320 }
321 self.meta.set_state(ProcedureState::failed(err));
322 }
323
324 async fn prepare_rollback(&mut self, err: Arc<Error>) {
325 if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
326 self.meta
327 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
328 return;
329 }
330 if self.procedure.rollback_supported() {
331 self.meta.set_state(ProcedureState::rolling_back(err));
332 } else {
333 self.meta.set_state(ProcedureState::failed(err));
334 }
335 }
336
337 async fn execute_once(&mut self, ctx: &Context) {
338 match self.meta.state() {
339 ProcedureState::Running | ProcedureState::Retrying { .. } => {
340 match self.procedure.execute(ctx).await {
341 Ok(status) => {
342 debug!(
343 "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
344 self.procedure.type_name(),
345 self.meta.id,
346 status,
347 status.need_persist(),
348 );
349
350 if !self.running() {
352 self.meta.set_state(ProcedureState::failed(Arc::new(
353 error::ManagerNotStartSnafu {}.build(),
354 )));
355 return;
356 }
357
358 if status.need_clean_poisons()
360 && let Err(e) = self.clean_poisons().await
361 {
362 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
363 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
364 return;
365 }
366
367 if status.need_persist()
368 && let Err(e) = self.persist_procedure().await
369 {
370 error!(e; "Failed to persist procedure: {}", self.meta.id);
371 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
372 return;
373 }
374
375 match status {
376 Status::Executing { .. } => {
377 let prev_state = self.meta.state();
378 if !matches!(prev_state, ProcedureState::Running) {
379 info!(
380 "Set Procedure {}-{} state to running, prev_state: {:?}",
381 self.procedure.type_name(),
382 self.meta.id,
383 prev_state
384 );
385 self.meta.set_state(ProcedureState::Running);
386 }
387 }
388 Status::Suspended { subprocedures, .. } => {
389 let prev_state = self.meta.state();
390 if !matches!(prev_state, ProcedureState::Running) {
391 info!(
392 "Set Procedure {}-{} state to running, prev_state: {:?}",
393 self.procedure.type_name(),
394 self.meta.id,
395 prev_state
396 );
397 self.meta.set_state(ProcedureState::Running);
398 }
399 self.on_suspended(subprocedures).await;
400 }
401 Status::Done { output } => {
402 if let Err(e) = self.commit_procedure().await {
403 error!(e; "Failed to commit procedure: {}", self.meta.id);
404 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
405 return;
406 }
407
408 self.done(output);
409 }
410 Status::Poisoned { error, keys } => {
411 error!(
412 error;
413 "Procedure {}-{} is poisoned, keys: {:?}",
414 self.procedure.type_name(),
415 self.meta.id,
416 keys,
417 );
418 self.meta
419 .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
420 }
421 }
422 }
423 Err(e) => {
424 error!(
425 e;
426 "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
427 self.procedure.type_name(),
428 self.meta.id,
429 e.is_retry_later(),
430 e.need_clean_poisons(),
431 );
432
433 if !self.running() {
435 self.meta.set_state(ProcedureState::failed(Arc::new(
436 error::ManagerNotStartSnafu {}.build(),
437 )));
438 return;
439 }
440
441 if e.need_clean_poisons() {
442 if let Err(e) = self.clean_poisons().await {
443 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
444 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
445 return;
446 }
447 debug!(
448 "Procedure {}-{} cleaned poisons",
449 self.procedure.type_name(),
450 self.meta.id,
451 );
452 }
453
454 if e.is_retry_later() {
455 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
456 return;
457 }
458
459 if self.procedure.rollback_supported() {
460 self.meta
461 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
462 } else {
463 self.meta.set_state(ProcedureState::failed(Arc::new(e)));
464 }
465 }
466 }
467 }
468 ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
469 ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
470 ProcedureState::Failed { .. }
471 | ProcedureState::Done { .. }
472 | ProcedureState::Poisoned { .. } => (),
473 }
474 }
475
476 fn submit_subprocedure(
478 &self,
479 procedure_id: ProcedureId,
480 procedure_state: ProcedureState,
481 procedure: BoxedProcedure,
482 ) {
483 if self.manager_ctx.contains_procedure(procedure_id) {
484 return;
486 }
487
488 let step = 0;
489
490 let meta = Arc::new(ProcedureMeta::new(
491 procedure_id,
492 procedure_state,
493 Some(self.meta.id),
494 procedure.lock_key(),
495 procedure.poison_keys(),
496 procedure.type_name(),
497 self.event_recorder.clone(),
498 procedure.user_metadata(),
499 ));
500 let runner = Runner {
501 meta: meta.clone(),
502 procedure,
503 manager_ctx: self.manager_ctx.clone(),
504 step,
505 exponential_builder: self.exponential_builder,
506 store: self.store.clone(),
507 rolling_back: false,
508 event_recorder: self.event_recorder.clone(),
509 };
510
511 assert!(
515 self.manager_ctx.try_insert_procedure(meta),
516 "Procedure {}-{} submit an existing procedure {}-{}",
517 self.procedure.type_name(),
518 self.meta.id,
519 runner.procedure.type_name(),
520 procedure_id,
521 );
522
523 self.meta.push_child(procedure_id);
525 let parent_id = self.meta.id;
526
527 let tracing_context = TracingContext::from_current_span();
528 let _handle = common_runtime::spawn_global(async move {
529 let span = tracing_context.attach(tracing::info_span!(
530 "LocalManager::submit_subprocedure",
531 procedure_name = %runner.meta.type_name,
532 procedure_id = %runner.meta.id,
533 parent_id = %parent_id,
534 ));
535 runner.run().trace(span).await
539 });
540 }
541
542 async fn wait_on_err(&mut self, d: Duration, i: u64) {
544 info!(
545 "Procedure {}-{} retry for the {} times after {} millis",
546 self.procedure.type_name(),
547 self.meta.id,
548 i,
549 d.as_millis(),
550 );
551 time::sleep(d).await;
552 }
553
554 async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
555 let has_child = !subprocedures.is_empty();
556
557 for sub in &subprocedures {
562 let conflicting = find_lock_conflicts(
563 self.meta.lock_key.keys_to_lock(),
564 sub.procedure.lock_key().keys_to_lock(),
565 );
566 if !conflicting.is_empty() {
567 let err_msg = format!(
568 "Deadlock prevented: subprocedure {}-{} shares conflicting lock key(s) {:?} \
569 with parent {}-{}. Parent holds these locks and would wait for child \
570 completion, but child cannot acquire them.",
571 sub.procedure.type_name(),
572 sub.id,
573 conflicting,
574 self.procedure.type_name(),
575 self.meta.id,
576 );
577 error!("{}", err_msg);
578 let err = Arc::new(Error::external(PlainError::new(
579 err_msg,
580 StatusCode::Internal,
581 )));
582 if self.procedure.rollback_supported() {
583 self.meta.set_state(ProcedureState::prepare_rollback(err));
584 } else {
585 self.meta.set_state(ProcedureState::failed(err));
586 }
587 return;
588 }
589 }
590
591 for subprocedure in subprocedures {
592 info!(
593 "Procedure {}-{} submit subprocedure {}-{}",
594 self.procedure.type_name(),
595 self.meta.id,
596 subprocedure.procedure.type_name(),
597 subprocedure.id,
598 );
599
600 self.submit_subprocedure(
601 subprocedure.id,
602 ProcedureState::Running,
603 subprocedure.procedure,
604 );
605 }
606
607 info!(
608 "Procedure {}-{} is waiting for subprocedures",
609 self.procedure.type_name(),
610 self.meta.id,
611 );
612
613 if has_child {
615 self.meta.child_notify.notified().await;
616
617 info!(
618 "Procedure {}-{} is waked up",
619 self.procedure.type_name(),
620 self.meta.id,
621 );
622 }
623 }
624
625 async fn persist_procedure(&mut self) -> Result<()> {
626 let type_name = self.procedure.type_name().to_string();
627 let data = self.procedure.dump()?;
628
629 self.store
630 .store_procedure(
631 self.meta.id,
632 self.step,
633 type_name,
634 data,
635 self.meta.parent_id,
636 )
637 .await
638 .map_err(|e| {
639 error!(
640 e; "Failed to persist procedure {}-{}",
641 self.procedure.type_name(),
642 self.meta.id
643 );
644 e
645 })?;
646 self.step += 1;
647 Ok(())
648 }
649
650 async fn commit_procedure(&mut self) -> Result<()> {
651 self.store
652 .commit_procedure(self.meta.id, self.step)
653 .await
654 .map_err(|e| {
655 error!(
656 e; "Failed to commit procedure {}-{}",
657 self.procedure.type_name(),
658 self.meta.id
659 );
660 e
661 })?;
662 self.step += 1;
663 Ok(())
664 }
665
666 async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
667 let type_name = self.procedure.type_name().to_string();
669 let data = self.procedure.dump()?;
670 let message = ProcedureMessage {
671 type_name,
672 data,
673 parent_id: self.meta.parent_id,
674 step: self.step,
675 error: Some(error),
676 };
677 self.store
678 .rollback_procedure(self.meta.id, message)
679 .await
680 .map_err(|e| {
681 error!(
682 e; "Failed to write rollback key for procedure {}-{}",
683 self.procedure.type_name(),
684 self.meta.id
685 );
686 e
687 })?;
688 self.step += 1;
689 Ok(())
690 }
691
692 fn done(&self, output: Option<Output>) {
693 info!(
695 "Procedure {}-{} done",
696 self.procedure.type_name(),
697 self.meta.id,
698 );
699
700 self.meta.set_state(ProcedureState::Done { output });
702 }
703}
704
705#[cfg(test)]
706mod tests {
707 use std::assert_matches;
708 use std::sync::Arc;
709 use std::sync::atomic::{AtomicU64, Ordering};
710
711 use async_trait::async_trait;
712 use common_error::ext::{ErrorExt, PlainError};
713 use common_error::mock::MockError;
714 use common_error::status_code::StatusCode;
715 use common_test_util::temp_dir::create_temp_dir;
716 use futures::future::join_all;
717 use futures_util::FutureExt;
718 use futures_util::future::BoxFuture;
719 use object_store::{EntryMode, ObjectStore};
720 use tokio::sync::mpsc;
721 use tokio::sync::watch::Receiver;
722
723 use super::*;
724 use crate::local::{DynamicKeyLockGuard, test_util};
725 use crate::procedure::PoisonKeys;
726 use crate::store::proc_path;
727 use crate::test_util::InMemoryPoisonStore;
728 use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
729
730 const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
731
732 fn new_runner(
733 meta: ProcedureMetaRef,
734 procedure: BoxedProcedure,
735 store: Arc<ProcedureStore>,
736 ) -> Runner {
737 Runner {
738 meta,
739 procedure,
740 manager_ctx: Arc::new(ManagerContext::new(
741 Arc::new(InMemoryPoisonStore::default()),
742 )),
743 step: 0,
744 exponential_builder: ExponentialBuilder::default(),
745 store,
746 rolling_back: false,
747 event_recorder: None,
748 }
749 }
750
751 async fn check_files(
752 object_store: &ObjectStore,
753 procedure_store: &ProcedureStore,
754 procedure_id: ProcedureId,
755 files: &[&str],
756 ) {
757 let dir = proc_path!(procedure_store, "{procedure_id}/");
758 let lister = object_store.list(&dir).await.unwrap();
759 let mut files_in_dir: Vec<_> = lister
760 .into_iter()
761 .filter(|x| x.metadata().mode() == EntryMode::FILE)
762 .map(|de| de.name().to_string())
763 .collect();
764 files_in_dir.sort_unstable();
765 assert_eq!(files, files_in_dir);
766 }
767
768 fn context_with_provider(
769 procedure_id: ProcedureId,
770 provider: Arc<dyn ContextProvider>,
771 ) -> Context {
772 Context {
773 procedure_id,
774 provider,
775 }
776 }
777
778 fn context_without_provider(procedure_id: ProcedureId) -> Context {
779 struct MockProvider;
780
781 #[async_trait]
782 impl ContextProvider for MockProvider {
783 async fn procedure_state(
784 &self,
785 _procedure_id: ProcedureId,
786 ) -> Result<Option<ProcedureState>> {
787 unimplemented!()
788 }
789
790 async fn procedure_state_receiver(
791 &self,
792 _procedure_id: ProcedureId,
793 ) -> Result<Option<Receiver<ProcedureState>>> {
794 unimplemented!()
795 }
796
797 async fn try_put_poison(
798 &self,
799 _key: &PoisonKey,
800 _procedure_id: ProcedureId,
801 ) -> Result<()> {
802 unimplemented!()
803 }
804
805 async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
806 unimplemented!()
807 }
808 }
809
810 Context {
811 procedure_id,
812 provider: Arc::new(MockProvider),
813 }
814 }
815
816 type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
817
818 struct ProcedureAdapter<F> {
819 data: String,
820 lock_key: LockKey,
821 poison_keys: PoisonKeys,
822 exec_fn: F,
823 rollback_fn: Option<RollbackFn>,
824 }
825
826 impl<F> ProcedureAdapter<F> {
827 fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
828 let mut meta = test_util::procedure_meta_for_test();
829 meta.id = ProcedureId::parse_str(uuid).unwrap();
830 meta.lock_key = self.lock_key.clone();
831 meta.poison_keys = self.poison_keys.clone();
832
833 Arc::new(meta)
834 }
835 }
836
837 #[async_trait]
838 impl<F> Procedure for ProcedureAdapter<F>
839 where
840 F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
841 {
842 fn type_name(&self) -> &str {
843 "ProcedureAdapter"
844 }
845
846 async fn execute(&mut self, ctx: &Context) -> Result<Status> {
847 let f = (self.exec_fn)(ctx.clone());
848 f.await
849 }
850
851 async fn rollback(&mut self, ctx: &Context) -> Result<()> {
852 if let Some(f) = &mut self.rollback_fn {
853 return (f)(ctx.clone()).await;
854 }
855 Ok(())
856 }
857
858 fn rollback_supported(&self) -> bool {
859 self.rollback_fn.is_some()
860 }
861
862 fn dump(&self) -> Result<String> {
863 Ok(self.data.clone())
864 }
865
866 fn lock_key(&self) -> LockKey {
867 self.lock_key.clone()
868 }
869
870 fn poison_keys(&self) -> PoisonKeys {
871 self.poison_keys.clone()
872 }
873 }
874
875 async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
876 let mut times = 0;
877 let exec_fn = move |_| {
878 times += 1;
879 async move {
880 if times == 1 {
881 Ok(Status::executing(persist))
882 } else {
883 Ok(Status::done())
884 }
885 }
886 .boxed()
887 };
888 let normal = ProcedureAdapter {
889 data: "normal".to_string(),
890 lock_key: LockKey::single_exclusive("catalog.schema.table"),
891 poison_keys: PoisonKeys::default(),
892 exec_fn,
893 rollback_fn: None,
894 };
895
896 let dir = create_temp_dir("normal");
897 let meta = normal.new_meta(ROOT_ID);
898 let ctx = context_without_provider(meta.id);
899 let object_store = test_util::new_object_store(&dir);
900 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
901 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
902 runner.manager_ctx.start();
903
904 runner.execute_once(&ctx).await;
905 let state = runner.meta.state();
906 assert!(state.is_running(), "{state:?}");
907 check_files(
908 &object_store,
909 &procedure_store,
910 ctx.procedure_id,
911 first_files,
912 )
913 .await;
914
915 runner.execute_once(&ctx).await;
916 let state = runner.meta.state();
917 assert!(state.is_done(), "{state:?}");
918 check_files(
919 &object_store,
920 &procedure_store,
921 ctx.procedure_id,
922 second_files,
923 )
924 .await;
925 }
926
927 #[tokio::test]
928 async fn test_execute_once_normal() {
929 execute_once_normal(
930 true,
931 &["0000000000.step"],
932 &["0000000000.step", "0000000001.commit"],
933 )
934 .await;
935 }
936
937 #[tokio::test]
938 async fn test_execute_once_normal_skip_persist() {
939 execute_once_normal(false, &[], &["0000000000.commit"]).await;
940 }
941
942 #[tokio::test]
943 async fn test_on_suspend_empty() {
944 let exec_fn = move |_| {
945 async move {
946 Ok(Status::Suspended {
947 subprocedures: Vec::new(),
948 persist: false,
949 })
950 }
951 .boxed()
952 };
953 let suspend = ProcedureAdapter {
954 data: "suspend".to_string(),
955 lock_key: LockKey::single_exclusive("catalog.schema.table"),
956 poison_keys: PoisonKeys::default(),
957 exec_fn,
958 rollback_fn: None,
959 };
960
961 let dir = create_temp_dir("suspend");
962 let meta = suspend.new_meta(ROOT_ID);
963 let ctx = context_without_provider(meta.id);
964 let object_store = test_util::new_object_store(&dir);
965 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
966 let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
967 runner.manager_ctx.start();
968
969 runner.execute_once(&ctx).await;
970 let state = runner.meta.state();
971 assert!(state.is_running(), "{state:?}");
972 }
973
974 fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
975 let mut times = 0;
976 let exec_fn = move |_| {
977 times += 1;
978 async move {
979 if times == 1 {
980 time::sleep(Duration::from_millis(200)).await;
981 Ok(Status::executing(true))
982 } else {
983 Ok(Status::done())
984 }
985 }
986 .boxed()
987 };
988 let child = ProcedureAdapter {
989 data: "child".to_string(),
990 lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
991 poison_keys: PoisonKeys::default(),
992 exec_fn,
993 rollback_fn: None,
994 };
995
996 ProcedureWithId {
997 id: procedure_id,
998 procedure: Box::new(child),
999 }
1000 }
1001
1002 #[tokio::test]
1003 async fn test_on_suspend_by_subprocedures() {
1004 let mut times = 0;
1005 let children_ids = [ProcedureId::random(), ProcedureId::random()];
1006 let keys = [
1007 &[
1008 "catalog.schema.table.region-0",
1009 "catalog.schema.table.region-1",
1010 ],
1011 &[
1012 "catalog.schema.table.region-2",
1013 "catalog.schema.table.region-3",
1014 ],
1015 ];
1016
1017 let exec_fn = move |ctx: Context| {
1018 times += 1;
1019 async move {
1020 if times == 1 {
1021 Ok(Status::Suspended {
1023 subprocedures: children_ids
1024 .into_iter()
1025 .zip(keys)
1026 .map(|(id, key_slice)| new_child_procedure(id, key_slice))
1027 .collect(),
1028 persist: true,
1029 })
1030 } else {
1031 let mut all_child_done = true;
1033 for id in children_ids {
1034 let is_not_done = ctx
1035 .provider
1036 .procedure_state(id)
1037 .await
1038 .unwrap()
1039 .map(|s| !s.is_done())
1040 .unwrap_or(true);
1041 if is_not_done {
1042 all_child_done = false;
1043 }
1044 }
1045 if all_child_done {
1046 Ok(Status::done())
1047 } else {
1048 Ok(Status::Suspended {
1050 subprocedures: Vec::new(),
1051 persist: false,
1052 })
1053 }
1054 }
1055 }
1056 .boxed()
1057 };
1058 let parent = ProcedureAdapter {
1059 data: "parent".to_string(),
1060 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1061 poison_keys: PoisonKeys::default(),
1062 exec_fn,
1063 rollback_fn: None,
1064 };
1065
1066 let dir = create_temp_dir("parent");
1067 let meta = parent.new_meta(ROOT_ID);
1068 let procedure_id = meta.id;
1069
1070 let object_store = test_util::new_object_store(&dir);
1071 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1072 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
1073 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1074 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1075 manager_ctx.start();
1076 assert!(manager_ctx.try_insert_procedure(meta));
1078 runner.manager_ctx = manager_ctx.clone();
1080
1081 runner.run().await;
1082 assert!(manager_ctx.key_lock.is_empty());
1083
1084 for child_id in children_ids {
1086 let state = manager_ctx.state(child_id).unwrap();
1087 assert!(state.is_done(), "{state:?}");
1088 }
1089 let state = manager_ctx.state(procedure_id).unwrap();
1090 assert!(state.is_done(), "{state:?}");
1091 check_files(&object_store, &procedure_store, procedure_id, &[]).await;
1093
1094 tokio::time::sleep(Duration::from_millis(5)).await;
1095 manager_ctx.remove_outdated_meta(Duration::from_millis(1));
1097 assert!(manager_ctx.state(procedure_id).is_none());
1098 assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
1099 for child_id in children_ids {
1100 assert!(manager_ctx.state(child_id).is_none());
1101 }
1102 }
1103
1104 #[tokio::test]
1105 async fn test_running_is_stopped() {
1106 let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
1107 let normal = ProcedureAdapter {
1108 data: "normal".to_string(),
1109 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1110 poison_keys: PoisonKeys::default(),
1111 exec_fn,
1112 rollback_fn: None,
1113 };
1114
1115 let dir = create_temp_dir("test_running_is_stopped");
1116 let meta = normal.new_meta(ROOT_ID);
1117 let ctx = context_without_provider(meta.id);
1118 let object_store = test_util::new_object_store(&dir);
1119 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1120 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1121 runner.manager_ctx.start();
1122
1123 runner.execute_once(&ctx).await;
1124 let state = runner.meta.state();
1125 assert!(state.is_running(), "{state:?}");
1126 check_files(
1127 &object_store,
1128 &procedure_store,
1129 ctx.procedure_id,
1130 &["0000000000.step"],
1131 )
1132 .await;
1133
1134 runner.manager_ctx.stop();
1135 runner.execute_once(&ctx).await;
1136 let state = runner.meta.state();
1137 assert!(state.is_failed(), "{state:?}");
1138 check_files(
1140 &object_store,
1141 &procedure_store,
1142 ctx.procedure_id,
1143 &["0000000000.step"],
1144 )
1145 .await;
1146 }
1147
1148 #[tokio::test]
1149 async fn test_running_is_stopped_on_error() {
1150 let exec_fn =
1151 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1152 let normal = ProcedureAdapter {
1153 data: "fail".to_string(),
1154 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1155 poison_keys: PoisonKeys::default(),
1156 exec_fn,
1157 rollback_fn: None,
1158 };
1159
1160 let dir = create_temp_dir("test_running_is_stopped_on_error");
1161 let meta = normal.new_meta(ROOT_ID);
1162 let ctx = context_without_provider(meta.id);
1163 let object_store = test_util::new_object_store(&dir);
1164 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1165 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1166 runner.manager_ctx.stop();
1167
1168 runner.execute_once(&ctx).await;
1169 let state = runner.meta.state();
1170 assert!(state.is_failed(), "{state:?}");
1171 check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1173 }
1174
1175 #[tokio::test]
1176 async fn test_execute_on_error() {
1177 let exec_fn =
1178 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1179 let fail = ProcedureAdapter {
1180 data: "fail".to_string(),
1181 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1182 poison_keys: PoisonKeys::default(),
1183 exec_fn,
1184 rollback_fn: None,
1185 };
1186
1187 let dir = create_temp_dir("fail");
1188 let meta = fail.new_meta(ROOT_ID);
1189 let ctx = context_without_provider(meta.id);
1190 let object_store = test_util::new_object_store(&dir);
1191 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1192 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1193 runner.manager_ctx.start();
1194
1195 runner.execute_once(&ctx).await;
1196 let state = runner.meta.state();
1197 assert!(state.is_failed(), "{state:?}");
1198 check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1199 }
1200
1201 #[tokio::test]
1202 async fn test_execute_with_rollback_on_error() {
1203 let exec_fn =
1204 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1205 let rollback_fn = move |_| async move { Ok(()) }.boxed();
1206 let fail = ProcedureAdapter {
1207 data: "fail".to_string(),
1208 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1209 poison_keys: PoisonKeys::default(),
1210 exec_fn,
1211 rollback_fn: Some(Box::new(rollback_fn)),
1212 };
1213
1214 let dir = create_temp_dir("fail");
1215 let meta = fail.new_meta(ROOT_ID);
1216 let ctx = context_without_provider(meta.id);
1217 let object_store = test_util::new_object_store(&dir);
1218 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1219 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1220 runner.manager_ctx.start();
1221
1222 runner.execute_once(&ctx).await;
1223 let state = runner.meta.state();
1224 assert!(state.is_prepare_rollback(), "{state:?}");
1225
1226 runner.execute_once(&ctx).await;
1227 let state = runner.meta.state();
1228 assert!(state.is_rolling_back(), "{state:?}");
1229
1230 runner.execute_once(&ctx).await;
1231 let state = runner.meta.state();
1232 assert!(state.is_failed(), "{state:?}");
1233 check_files(
1234 &object_store,
1235 &procedure_store,
1236 ctx.procedure_id,
1237 &["0000000000.rollback"],
1238 )
1239 .await;
1240 }
1241
1242 #[tokio::test]
1243 async fn test_execute_on_retry_later_error() {
1244 let mut times = 0;
1245
1246 let exec_fn = move |_| {
1247 times += 1;
1248 async move {
1249 if times == 1 {
1250 Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1251 } else if times == 2 {
1252 Ok(Status::executing(false))
1253 } else {
1254 Ok(Status::done())
1255 }
1256 }
1257 .boxed()
1258 };
1259
1260 let retry_later = ProcedureAdapter {
1261 data: "retry_later".to_string(),
1262 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1263 poison_keys: PoisonKeys::default(),
1264 exec_fn,
1265 rollback_fn: None,
1266 };
1267
1268 let dir = create_temp_dir("retry_later");
1269 let meta = retry_later.new_meta(ROOT_ID);
1270 let ctx = context_without_provider(meta.id);
1271 let object_store = test_util::new_object_store(&dir);
1272 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1273 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1274 runner.manager_ctx.start();
1275 runner.execute_once(&ctx).await;
1276 let state = runner.meta.state();
1277 assert!(state.is_retrying(), "{state:?}");
1278
1279 runner.execute_once(&ctx).await;
1280 let state = runner.meta.state();
1281 assert!(state.is_running(), "{state:?}");
1282
1283 runner.execute_once(&ctx).await;
1284 let state = runner.meta.state();
1285 assert!(state.is_done(), "{state:?}");
1286 assert!(meta.state().is_done());
1287 check_files(
1288 &object_store,
1289 &procedure_store,
1290 ctx.procedure_id,
1291 &["0000000000.commit"],
1292 )
1293 .await;
1294 }
1295
1296 #[tokio::test]
1297 async fn test_retrying_state_visible_in_context_on_retry() {
1298 let retrying_states = Arc::new(std::sync::Mutex::new(Vec::new()));
1299 let captured = retrying_states.clone();
1300 let mut times = 0;
1301
1302 let exec_fn = move |ctx: Context| {
1303 times += 1;
1304 let captured = captured.clone();
1305 async move {
1306 let is_retrying = ctx.is_retrying().await;
1307 captured.lock().unwrap().push(is_retrying);
1308 if times == 1 {
1309 Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1310 } else {
1311 Ok(Status::done())
1312 }
1313 }
1314 .boxed()
1315 };
1316
1317 let procedure = ProcedureAdapter {
1318 data: "retrying_state".to_string(),
1319 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1320 poison_keys: PoisonKeys::default(),
1321 exec_fn,
1322 rollback_fn: None,
1323 };
1324
1325 let dir = create_temp_dir("retrying_state");
1326 let meta = procedure.new_meta(ROOT_ID);
1327 let object_store = test_util::new_object_store(&dir);
1328 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store));
1329 let mut runner = new_runner(meta.clone(), Box::new(procedure), procedure_store);
1330 let ctx = context_with_provider(
1331 meta.id,
1332 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1333 );
1334
1335 runner
1336 .manager_ctx
1337 .procedures
1338 .write()
1339 .unwrap()
1340 .insert(meta.id, runner.meta.clone());
1341 runner.manager_ctx.start();
1342
1343 runner.execute_once(&ctx).await;
1344 runner.execute_once(&ctx).await;
1345
1346 let states = retrying_states.lock().unwrap().clone();
1347 assert_eq!(states, vec![Some(false), Some(true)]);
1348 }
1349
1350 #[tokio::test(flavor = "multi_thread")]
1351 async fn test_execute_on_retry_later_error_with_child() {
1352 common_telemetry::init_default_ut_logging();
1353 let mut times = 0;
1354 let child_id = ProcedureId::random();
1355
1356 let exec_fn = move |_| {
1357 times += 1;
1358 async move {
1359 debug!("times: {}", times);
1360 if times == 1 {
1361 Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1362 } else if times == 2 {
1363 let exec_fn = |_| {
1364 async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1365 .boxed()
1366 };
1367 let fail = ProcedureAdapter {
1368 data: "fail".to_string(),
1369 lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1370 poison_keys: PoisonKeys::default(),
1371 exec_fn,
1372 rollback_fn: None,
1373 };
1374
1375 Ok(Status::Suspended {
1376 subprocedures: vec![ProcedureWithId {
1377 id: child_id,
1378 procedure: Box::new(fail),
1379 }],
1380 persist: true,
1381 })
1382 } else {
1383 Ok(Status::done())
1384 }
1385 }
1386 .boxed()
1387 };
1388
1389 let retry_later = ProcedureAdapter {
1390 data: "retry_later".to_string(),
1391 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1392 poison_keys: PoisonKeys::default(),
1393 exec_fn,
1394 rollback_fn: None,
1395 };
1396
1397 let dir = create_temp_dir("retry_later");
1398 let meta = retry_later.new_meta(ROOT_ID);
1399 let ctx = context_without_provider(meta.id);
1400 let object_store = test_util::new_object_store(&dir);
1401 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1402 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1403 runner.manager_ctx.start();
1404 debug!("execute_once 1");
1405 runner.execute_once(&ctx).await;
1406 let state = runner.meta.state();
1407 assert!(state.is_retrying(), "{state:?}");
1408
1409 let moved_meta = meta.clone();
1410 tokio::spawn(async move {
1411 moved_meta.child_notify.notify_one();
1412 });
1413 runner.execute_once(&ctx).await;
1414 let state = runner.meta.state();
1415 assert!(state.is_running(), "{state:?}");
1416
1417 runner.execute_once(&ctx).await;
1418 let state = runner.meta.state();
1419 assert!(state.is_done(), "{state:?}");
1420 assert!(meta.state().is_done());
1421 check_files(
1422 &object_store,
1423 &procedure_store,
1424 ctx.procedure_id,
1425 &["0000000000.step", "0000000001.commit"],
1426 )
1427 .await;
1428 }
1429
1430 #[tokio::test]
1431 async fn test_execute_exceed_max_retry_later() {
1432 let exec_fn =
1433 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1434
1435 let exceed_max_retry_later = ProcedureAdapter {
1436 data: "exceed_max_retry_later".to_string(),
1437 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1438 poison_keys: PoisonKeys::default(),
1439 exec_fn,
1440 rollback_fn: None,
1441 };
1442
1443 let dir = create_temp_dir("exceed_max_retry_later");
1444 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1445 let object_store = test_util::new_object_store(&dir);
1446 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1447 let mut runner = new_runner(
1448 meta.clone(),
1449 Box::new(exceed_max_retry_later),
1450 procedure_store,
1451 );
1452 runner.manager_ctx.start();
1453
1454 runner.exponential_builder = ExponentialBuilder::default()
1455 .with_min_delay(Duration::from_millis(1))
1456 .with_max_times(3);
1457
1458 runner.execute_procedure_in_loop().await;
1460 let err = meta.state().error().unwrap().to_string();
1461 assert!(err.contains("Procedure retry exceeded max times"));
1462 }
1463
1464 #[tokio::test]
1465 async fn test_rollback_exceed_max_retry_later() {
1466 let exec_fn =
1467 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1468 let rollback_fn = move |_| {
1469 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1470 };
1471 let exceed_max_retry_later = ProcedureAdapter {
1472 data: "exceed_max_rollback".to_string(),
1473 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1474 poison_keys: PoisonKeys::default(),
1475 exec_fn,
1476 rollback_fn: Some(Box::new(rollback_fn)),
1477 };
1478
1479 let dir = create_temp_dir("exceed_max_rollback");
1480 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1481 let object_store = test_util::new_object_store(&dir);
1482 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1483 let mut runner = new_runner(
1484 meta.clone(),
1485 Box::new(exceed_max_retry_later),
1486 procedure_store,
1487 );
1488 runner.manager_ctx.start();
1489 runner.exponential_builder = ExponentialBuilder::default()
1490 .with_min_delay(Duration::from_millis(1))
1491 .with_max_times(3);
1492
1493 runner.execute_procedure_in_loop().await;
1495 let err = meta.state().error().unwrap().to_string();
1496 assert!(err.contains("Procedure rollback exceeded max times"));
1497 }
1498
1499 #[tokio::test]
1500 async fn test_rollback_after_retry_fail() {
1501 let exec_fn = move |_| {
1502 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1503 };
1504
1505 let (tx, mut rx) = mpsc::channel(1);
1506 let rollback_fn = move |_| {
1507 let tx = tx.clone();
1508 async move {
1509 tx.send(()).await.unwrap();
1510 Ok(())
1511 }
1512 .boxed()
1513 };
1514 let retry_later = ProcedureAdapter {
1515 data: "rollback_after_retry_fail".to_string(),
1516 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1517 poison_keys: PoisonKeys::default(),
1518 exec_fn,
1519 rollback_fn: Some(Box::new(rollback_fn)),
1520 };
1521
1522 let dir = create_temp_dir("retry_later");
1523 let meta = retry_later.new_meta(ROOT_ID);
1524 let ctx = context_without_provider(meta.id);
1525 let object_store = test_util::new_object_store(&dir);
1526 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1527 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1528 runner.manager_ctx.start();
1529 runner.exponential_builder = ExponentialBuilder::default()
1530 .with_min_delay(Duration::from_millis(1))
1531 .with_max_times(3);
1532 runner.execute_procedure_in_loop().await;
1534 rx.recv().await.unwrap();
1535 assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1536 check_files(
1537 &object_store,
1538 &procedure_store,
1539 ctx.procedure_id,
1540 &["0000000000.rollback"],
1541 )
1542 .await;
1543 }
1544
1545 #[tokio::test]
1546 async fn test_child_error() {
1547 let mut times = 0;
1548 let child_id = ProcedureId::random();
1549 common_telemetry::init_default_ut_logging();
1550 let exec_fn = move |ctx: Context| {
1551 times += 1;
1552 async move {
1553 if times == 1 {
1554 let exec_fn = |_| {
1556 async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1557 .boxed()
1558 };
1559 let fail = ProcedureAdapter {
1560 data: "fail".to_string(),
1561 lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1562 poison_keys: PoisonKeys::default(),
1563 exec_fn,
1564 rollback_fn: None,
1565 };
1566
1567 Ok(Status::Suspended {
1568 subprocedures: vec![ProcedureWithId {
1569 id: child_id,
1570 procedure: Box::new(fail),
1571 }],
1572 persist: true,
1573 })
1574 } else {
1575 let state = ctx.provider.procedure_state(child_id).await.unwrap();
1577 let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1578 if is_failed {
1579 Err(Error::from_error_ext(PlainError::new(
1581 "subprocedure failed".to_string(),
1582 StatusCode::Unexpected,
1583 )))
1584 } else {
1585 Ok(Status::Suspended {
1587 subprocedures: Vec::new(),
1588 persist: false,
1589 })
1590 }
1591 }
1592 }
1593 .boxed()
1594 };
1595 let parent = ProcedureAdapter {
1596 data: "parent".to_string(),
1597 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1598 poison_keys: PoisonKeys::default(),
1599 exec_fn,
1600 rollback_fn: None,
1601 };
1602
1603 let dir = create_temp_dir("child_err");
1604 let meta = parent.new_meta(ROOT_ID);
1605
1606 let object_store = test_util::new_object_store(&dir);
1607 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1608 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1609 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1610 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1611 manager_ctx.start();
1612 assert!(manager_ctx.try_insert_procedure(meta.clone()));
1614 runner.manager_ctx = manager_ctx.clone();
1616
1617 runner.run().await;
1619 assert!(manager_ctx.key_lock.is_empty());
1620 let err = meta.state().error().unwrap().output_msg();
1621 assert!(err.contains("subprocedure failed"), "{err}");
1622 }
1623
1624 #[tokio::test]
1625 async fn test_execute_with_clean_poisons() {
1626 common_telemetry::init_default_ut_logging();
1627 let mut times = 0;
1628 let poison_key = PoisonKey::new("table/1024");
1629 let moved_poison_key = poison_key.clone();
1630 let exec_fn = move |ctx: Context| {
1631 times += 1;
1632 let poison_key = moved_poison_key.clone();
1633 async move {
1634 if times == 1 {
1635 ctx.provider
1637 .try_put_poison(&poison_key, ctx.procedure_id)
1638 .await
1639 .unwrap();
1640
1641 Ok(Status::executing(true))
1642 } else {
1643 Ok(Status::executing_with_clean_poisons(true))
1644 }
1645 }
1646 .boxed()
1647 };
1648 let poison = ProcedureAdapter {
1649 data: "poison".to_string(),
1650 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1651 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1652 exec_fn,
1653 rollback_fn: None,
1654 };
1655
1656 let dir = create_temp_dir("clean_poisons");
1657 let meta = poison.new_meta(ROOT_ID);
1658
1659 let object_store = test_util::new_object_store(&dir);
1660 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1661 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1662
1663 let ctx = context_with_provider(
1665 meta.id,
1666 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1667 );
1668 runner
1670 .manager_ctx
1671 .procedures
1672 .write()
1673 .unwrap()
1674 .insert(meta.id, runner.meta.clone());
1675
1676 runner.manager_ctx.start();
1677 runner.execute_once(&ctx).await;
1678 let state = runner.meta.state();
1679 assert!(state.is_running(), "{state:?}");
1680
1681 let procedure_id = runner
1682 .manager_ctx
1683 .poison_manager
1684 .get_poison(&poison_key.to_string())
1685 .await
1686 .unwrap();
1687 assert!(procedure_id.is_some());
1689
1690 runner.execute_once(&ctx).await;
1691 let state = runner.meta.state();
1692 assert!(state.is_running(), "{state:?}");
1693
1694 let procedure_id = runner
1695 .manager_ctx
1696 .poison_manager
1697 .get_poison(&poison_key.to_string())
1698 .await
1699 .unwrap();
1700 assert!(procedure_id.is_none());
1702 }
1703
1704 #[tokio::test]
1705 async fn test_execute_error_with_clean_poisons() {
1706 common_telemetry::init_default_ut_logging();
1707 let mut times = 0;
1708 let poison_key = PoisonKey::new("table/1024");
1709 let moved_poison_key = poison_key.clone();
1710 let exec_fn = move |ctx: Context| {
1711 times += 1;
1712 let poison_key = moved_poison_key.clone();
1713 async move {
1714 if times == 1 {
1715 ctx.provider
1717 .try_put_poison(&poison_key, ctx.procedure_id)
1718 .await
1719 .unwrap();
1720
1721 Ok(Status::executing(true))
1722 } else {
1723 Err(Error::external_and_clean_poisons(MockError::new(
1724 StatusCode::Unexpected,
1725 )))
1726 }
1727 }
1728 .boxed()
1729 };
1730 let poison = ProcedureAdapter {
1731 data: "poison".to_string(),
1732 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1733 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1734 exec_fn,
1735 rollback_fn: None,
1736 };
1737
1738 let dir = create_temp_dir("error_with_clean_poisons");
1739 let meta = poison.new_meta(ROOT_ID);
1740
1741 let object_store = test_util::new_object_store(&dir);
1742 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1743 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1744
1745 let ctx = context_with_provider(
1747 meta.id,
1748 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1749 );
1750 runner
1752 .manager_ctx
1753 .procedures
1754 .write()
1755 .unwrap()
1756 .insert(meta.id, runner.meta.clone());
1757
1758 runner.manager_ctx.start();
1759 runner.execute_once(&ctx).await;
1760 let state = runner.meta.state();
1761 assert!(state.is_running(), "{state:?}");
1762
1763 let procedure_id = runner
1764 .manager_ctx
1765 .poison_manager
1766 .get_poison(&poison_key.to_string())
1767 .await
1768 .unwrap();
1769 assert!(procedure_id.is_some());
1771
1772 runner.execute_once(&ctx).await;
1773 let state = runner.meta.state();
1774 assert!(state.is_failed(), "{state:?}");
1775
1776 let procedure_id = runner
1777 .manager_ctx
1778 .poison_manager
1779 .get_poison(&poison_key.to_string())
1780 .await
1781 .unwrap();
1782 assert!(procedure_id.is_none());
1784 }
1785
1786 #[tokio::test]
1787 async fn test_execute_failed_after_set_poison() {
1788 let mut times = 0;
1789 let poison_key = PoisonKey::new("table/1024");
1790 let moved_poison_key = poison_key.clone();
1791 let exec_fn = move |ctx: Context| {
1792 times += 1;
1793 let poison_key = moved_poison_key.clone();
1794 async move {
1795 if times == 1 {
1796 Ok(Status::executing(true))
1797 } else {
1798 ctx.provider
1800 .try_put_poison(&poison_key, ctx.procedure_id)
1801 .await
1802 .unwrap();
1803 Err(Error::external(MockError::new(StatusCode::Unexpected)))
1804 }
1805 }
1806 .boxed()
1807 };
1808 let poison = ProcedureAdapter {
1809 data: "poison".to_string(),
1810 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1811 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1812 exec_fn,
1813 rollback_fn: None,
1814 };
1815
1816 let dir = create_temp_dir("poison");
1817 let meta = poison.new_meta(ROOT_ID);
1818
1819 let object_store = test_util::new_object_store(&dir);
1820 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1821 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1822
1823 let ctx = context_with_provider(
1825 meta.id,
1826 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1827 );
1828 runner
1830 .manager_ctx
1831 .procedures
1832 .write()
1833 .unwrap()
1834 .insert(meta.id, runner.meta.clone());
1835
1836 runner.manager_ctx.start();
1837 runner.execute_once(&ctx).await;
1838 let state = runner.meta.state();
1839 assert!(state.is_running(), "{state:?}");
1840
1841 runner.execute_once(&ctx).await;
1842 let state = runner.meta.state();
1843 assert!(state.is_failed(), "{state:?}");
1844 assert!(meta.state().is_failed());
1845
1846 let procedure_id = runner
1848 .manager_ctx
1849 .poison_manager
1850 .get_poison(&poison_key.to_string())
1851 .await
1852 .unwrap()
1853 .unwrap();
1854
1855 assert_eq!(&procedure_id.clone(), ROOT_ID);
1857 }
1858
1859 #[tokio::test]
1860 async fn test_execute_exceed_max_retry_after_set_poison() {
1861 common_telemetry::init_default_ut_logging();
1862 let mut times = 0;
1863 let poison_key = PoisonKey::new("table/1024");
1864 let moved_poison_key = poison_key.clone();
1865 let exec_fn = move |ctx: Context| {
1866 times += 1;
1867 let poison_key = moved_poison_key.clone();
1868 async move {
1869 if times == 1 {
1870 Ok(Status::executing(true))
1871 } else {
1872 ctx.provider
1874 .try_put_poison(&poison_key, ctx.procedure_id)
1875 .await
1876 .unwrap();
1877 Err(Error::retry_later_and_clean_poisons(MockError::new(
1878 StatusCode::Unexpected,
1879 )))
1880 }
1881 }
1882 .boxed()
1883 };
1884 let poison = ProcedureAdapter {
1885 data: "poison".to_string(),
1886 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1887 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1888 exec_fn,
1889 rollback_fn: None,
1890 };
1891
1892 let dir = create_temp_dir("exceed_max_after_set_poison");
1893 let meta = poison.new_meta(ROOT_ID);
1894 let object_store = test_util::new_object_store(&dir);
1895 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1896 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
1897 runner.manager_ctx.start();
1898 runner.exponential_builder = ExponentialBuilder::default()
1899 .with_min_delay(Duration::from_millis(1))
1900 .with_max_times(3);
1901 let ctx = context_with_provider(
1903 meta.id,
1904 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1905 );
1906 runner
1908 .manager_ctx
1909 .procedures
1910 .write()
1911 .unwrap()
1912 .insert(meta.id, runner.meta.clone());
1913 runner.execute_once_with_retry(&ctx).await;
1915 let err = meta.state().error().unwrap().clone();
1916 assert_matches!(&*err, Error::RetryTimesExceeded { .. });
1917
1918 let procedure_id = runner
1920 .manager_ctx
1921 .poison_manager
1922 .get_poison(&poison_key.to_string())
1923 .await
1924 .unwrap();
1925 assert_eq!(procedure_id, None);
1926 }
1927
1928 #[tokio::test]
1929 async fn test_execute_poisoned() {
1930 let mut times = 0;
1931 let poison_key = PoisonKey::new("table/1024");
1932 let moved_poison_key = poison_key.clone();
1933 let exec_fn = move |ctx: Context| {
1934 times += 1;
1935 let poison_key = moved_poison_key.clone();
1936 async move {
1937 if times == 1 {
1938 Ok(Status::executing(true))
1939 } else {
1940 ctx.provider
1942 .try_put_poison(&poison_key, ctx.procedure_id)
1943 .await
1944 .unwrap();
1945 Ok(Status::Poisoned {
1946 keys: PoisonKeys::new(vec![poison_key.clone()]),
1947 error: Error::external(MockError::new(StatusCode::Unexpected)),
1948 })
1949 }
1950 }
1951 .boxed()
1952 };
1953 let poison = ProcedureAdapter {
1954 data: "poison".to_string(),
1955 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1956 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1957 exec_fn,
1958 rollback_fn: None,
1959 };
1960
1961 let dir = create_temp_dir("poison");
1962 let meta = poison.new_meta(ROOT_ID);
1963
1964 let object_store = test_util::new_object_store(&dir);
1965 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1966 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1967
1968 let ctx = context_with_provider(
1970 meta.id,
1971 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1972 );
1973 runner
1975 .manager_ctx
1976 .procedures
1977 .write()
1978 .unwrap()
1979 .insert(meta.id, runner.meta.clone());
1980
1981 runner.manager_ctx.start();
1982 runner.execute_once(&ctx).await;
1983 let state = runner.meta.state();
1984 assert!(state.is_running(), "{state:?}");
1985
1986 runner.execute_once(&ctx).await;
1987 let state = runner.meta.state();
1988 assert!(state.is_poisoned(), "{state:?}");
1989 assert!(meta.state().is_poisoned());
1990 check_files(
1991 &object_store,
1992 &procedure_store,
1993 ctx.procedure_id,
1994 &["0000000000.step"],
1995 )
1996 .await;
1997
1998 let procedure_id = runner
2000 .manager_ctx
2001 .poison_manager
2002 .get_poison(&poison_key.to_string())
2003 .await
2004 .unwrap()
2005 .unwrap();
2006
2007 assert_eq!(procedure_id, ROOT_ID);
2009 }
2010
2011 fn test_procedure_with_dynamic_lock(
2012 shared_atomic_value: Arc<AtomicU64>,
2013 id: u64,
2014 ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
2015 let exec_fn = move |ctx: Context| {
2016 let moved_shared_atomic_value = shared_atomic_value.clone();
2017 let moved_ctx = ctx.clone();
2018 async move {
2019 debug!("Acquiring write lock, id: {}", id);
2020 let key = StringKey::Exclusive("test_lock".to_string());
2021 let guard = moved_ctx.provider.acquire_lock(&key).await;
2022 debug!("Acquired write lock, id: {}", id);
2023 let millis = rand::rng().random_range(10..=50);
2024 tokio::time::sleep(Duration::from_millis(millis)).await;
2025 let value = moved_shared_atomic_value.load(Ordering::Relaxed);
2026 moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
2027 debug!("Dropping write lock, id: {}", id);
2028 drop(guard);
2029
2030 Ok(Status::done())
2031 }
2032 .boxed()
2033 };
2034
2035 let adapter = ProcedureAdapter {
2036 data: "dynamic_lock".to_string(),
2037 lock_key: LockKey::new_exclusive([]),
2038 poison_keys: PoisonKeys::new([]),
2039 exec_fn,
2040 rollback_fn: None,
2041 };
2042 let meta = adapter.new_meta(ROOT_ID);
2043
2044 (Box::new(adapter), meta)
2045 }
2046
2047 #[tokio::test(flavor = "multi_thread")]
2048 async fn test_execute_with_dynamic_lock() {
2049 common_telemetry::init_default_ut_logging();
2050 let shared_atomic_value = Arc::new(AtomicU64::new(0));
2051 let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
2052 let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
2053
2054 let dir = create_temp_dir("dynamic_lock");
2055 let object_store = test_util::new_object_store(&dir);
2056 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
2057 let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
2058 let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
2059 let ctx1 = context_with_provider(
2060 meta1.id,
2061 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
2062 );
2063 let ctx2 = context_with_provider(
2064 meta2.id,
2065 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
2067 );
2068 let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
2069 join_all(tasks).await;
2070 assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
2071 }
2072 #[tokio::test]
2073 async fn test_on_suspend_deadlock_detected_no_rollback() {
2074 let child_id = ProcedureId::random();
2077 let exec_fn = move |_| {
2078 async move {
2079 let child_exec_fn = |_| async { Ok(Status::done()) }.boxed();
2080 let child = ProcedureAdapter {
2081 data: "child".to_string(),
2082 lock_key: LockKey::single_exclusive("catalog.schema.table"),
2083 poison_keys: PoisonKeys::default(),
2084 exec_fn: child_exec_fn,
2085 rollback_fn: None,
2086 };
2087 Ok(Status::Suspended {
2088 subprocedures: vec![ProcedureWithId {
2089 id: child_id,
2090 procedure: Box::new(child),
2091 }],
2092 persist: false,
2093 })
2094 }
2095 .boxed()
2096 };
2097 let parent = ProcedureAdapter {
2098 data: "parent".to_string(),
2099 lock_key: LockKey::single_exclusive("catalog.schema.table"),
2100 poison_keys: PoisonKeys::default(),
2101 exec_fn,
2102 rollback_fn: None, };
2104
2105 let dir = create_temp_dir("deadlock_no_rollback");
2106 let meta = parent.new_meta(ROOT_ID);
2107 let ctx = context_without_provider(meta.id);
2108 let object_store = test_util::new_object_store(&dir);
2109 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
2110 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
2111 runner.manager_ctx.start();
2112
2113 runner.execute_once(&ctx).await;
2114 let state = runner.meta.state();
2115 assert!(state.is_failed(), "Expected Failed, got {state:?}");
2116 assert!(
2118 state.error().is_some(),
2119 "Failed state should contain an error"
2120 );
2121 assert!(
2123 !runner.manager_ctx.contains_procedure(child_id),
2124 "Child procedure should not be submitted when deadlock is detected"
2125 );
2126 }
2127
2128 #[tokio::test]
2129 async fn test_on_suspend_deadlock_detected_with_rollback() {
2130 let child_id = ProcedureId::random();
2133 let exec_fn = move |_| {
2134 async move {
2135 let child_exec_fn = |_| async { Ok(Status::done()) }.boxed();
2136 let child = ProcedureAdapter {
2137 data: "child".to_string(),
2138 lock_key: LockKey::single_exclusive("catalog.schema.table"),
2139 poison_keys: PoisonKeys::default(),
2140 exec_fn: child_exec_fn,
2141 rollback_fn: None,
2142 };
2143 Ok(Status::Suspended {
2144 subprocedures: vec![ProcedureWithId {
2145 id: child_id,
2146 procedure: Box::new(child),
2147 }],
2148 persist: false,
2149 })
2150 }
2151 .boxed()
2152 };
2153 let rollback_fn = move |_| async move { Ok(()) }.boxed();
2154 let parent = ProcedureAdapter {
2155 data: "parent".to_string(),
2156 lock_key: LockKey::single_exclusive("catalog.schema.table"),
2157 poison_keys: PoisonKeys::default(),
2158 exec_fn,
2159 rollback_fn: Some(Box::new(rollback_fn)), };
2161
2162 let dir = create_temp_dir("deadlock_with_rollback");
2163 let meta = parent.new_meta(ROOT_ID);
2164 let ctx = context_without_provider(meta.id);
2165 let object_store = test_util::new_object_store(&dir);
2166 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
2167 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
2168 runner.manager_ctx.start();
2169
2170 runner.execute_once(&ctx).await;
2171 let state = runner.meta.state();
2172 assert!(
2173 state.is_prepare_rollback(),
2174 "Expected PrepareRollback, got {state:?}"
2175 );
2176 match &state {
2178 ProcedureState::PrepareRollback { error } => {
2179 assert!(!error.to_string().is_empty(), "Error should not be empty");
2180 }
2181 _ => panic!("Expected PrepareRollback, got {state:?}"),
2182 }
2183 assert!(
2185 !runner.manager_ctx.contains_procedure(child_id),
2186 "Child procedure should not be submitted when deadlock is detected"
2187 );
2188 }
2189
2190 #[test]
2191 fn test_find_lock_conflicts() {
2192 use crate::procedure::StringKey;
2193
2194 let parent = [StringKey::Share("A".to_string())];
2196 let child = [StringKey::Share("A".to_string())];
2197 assert!(super::find_lock_conflicts(parent.iter(), child.iter()).is_empty());
2198
2199 let parent = [StringKey::Share("A".to_string())];
2201 let child = [StringKey::Exclusive("A".to_string())];
2202 assert_eq!(
2203 super::find_lock_conflicts(parent.iter(), child.iter()),
2204 vec!["A".to_string()]
2205 );
2206
2207 let parent = [StringKey::Exclusive("A".to_string())];
2209 let child = [StringKey::Share("A".to_string())];
2210 assert_eq!(
2211 super::find_lock_conflicts(parent.iter(), child.iter()),
2212 vec!["A".to_string()]
2213 );
2214
2215 let parent = [StringKey::Exclusive("A".to_string())];
2217 let child = [StringKey::Exclusive("A".to_string())];
2218 assert_eq!(
2219 super::find_lock_conflicts(parent.iter(), child.iter()),
2220 vec!["A".to_string()]
2221 );
2222
2223 let parent = [
2225 StringKey::Share("A".to_string()),
2226 StringKey::Exclusive("B".to_string()),
2227 ];
2228 let child = [
2229 StringKey::Exclusive("A".to_string()), StringKey::Share("B".to_string()), StringKey::Exclusive("C".to_string()), ];
2233 let mut conflicts = super::find_lock_conflicts(parent.iter(), child.iter());
2234 conflicts.sort();
2235 assert_eq!(conflicts, vec!["A".to_string(), "B".to_string()]);
2236 }
2237}