1use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_event_recorder::EventRecorderRef;
21use common_telemetry::{debug, error, info};
22use rand::Rng;
23use snafu::ResultExt;
24use tokio::time;
25
26use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
27use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
28use crate::procedure::{Output, StringKey};
29use crate::rwlock::OwnedKeyRwLockGuard;
30use crate::store::{ProcedureMessage, ProcedureStore};
31use crate::{
32 BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
33};
34
35struct ProcedureGuard {
37 meta: ProcedureMetaRef,
38 manager_ctx: Arc<ManagerContext>,
39 key_guards: Vec<OwnedKeyRwLockGuard>,
40 finish: bool,
41}
42
43impl ProcedureGuard {
44 fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
46 ProcedureGuard {
47 meta,
48 manager_ctx,
49 key_guards: vec![],
50 finish: false,
51 }
52 }
53
54 fn finish(mut self) {
56 self.finish = true;
57 }
58}
59
60impl Drop for ProcedureGuard {
61 fn drop(&mut self) {
62 if !self.finish {
63 error!("Procedure {} exits unexpectedly", self.meta.id);
64
65 let err = ProcedurePanicSnafu {
69 procedure_id: self.meta.id,
70 }
71 .build();
72 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
73 }
74
75 if let Some(parent_id) = self.meta.parent_id {
77 self.manager_ctx.notify_by_subprocedure(parent_id);
78 }
79
80 while !self.key_guards.is_empty() {
82 self.key_guards.pop();
83 }
84
85 self.manager_ctx
87 .key_lock
88 .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
89 }
90}
91
92pub(crate) struct Runner {
93 pub(crate) meta: ProcedureMetaRef,
94 pub(crate) procedure: BoxedProcedure,
95 pub(crate) manager_ctx: Arc<ManagerContext>,
96 pub(crate) step: u32,
97 pub(crate) exponential_builder: ExponentialBuilder,
98 pub(crate) store: Arc<ProcedureStore>,
99 pub(crate) rolling_back: bool,
100 pub(crate) event_recorder: Option<EventRecorderRef>,
101}
102
103impl Runner {
104 pub(crate) fn running(&self) -> bool {
106 self.manager_ctx.running()
107 }
108
109 pub(crate) async fn run(mut self) {
111 let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
113
114 info!(
115 "Runner {}-{} starts",
116 self.procedure.type_name(),
117 self.meta.id
118 );
119
120 for key in self.meta.lock_key.keys_to_lock() {
123 let key_guard = match key {
125 StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
126 StringKey::Exclusive(key) => {
127 self.manager_ctx.key_lock.write(key.clone()).await.into()
128 }
129 };
130
131 guard.key_guards.push(key_guard);
132 }
133
134 self.meta.set_start_time_ms();
137 self.execute_procedure_in_loop().await;
138 self.meta.set_end_time_ms();
139
140 guard.finish();
147
148 if self.meta.parent_id.is_none() {
150 let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
151 self.manager_ctx.on_procedures_finish(&procedure_ids);
153
154 if !self.running() {
156 return;
157 }
158
159 for id in procedure_ids {
160 if let Err(e) = self.store.delete_procedure(id).await {
161 error!(
162 e;
163 "Runner {}-{} failed to delete procedure {}",
164 self.procedure.type_name(),
165 self.meta.id,
166 id,
167 );
168 }
169 }
170 }
171
172 info!(
173 "Runner {}-{} exits",
174 self.procedure.type_name(),
175 self.meta.id
176 );
177 }
178
179 async fn execute_procedure_in_loop(&mut self) {
180 let ctx = Context {
181 procedure_id: self.meta.id,
182 provider: self.manager_ctx.clone(),
183 };
184
185 self.rolling_back = false;
186 self.execute_once_with_retry(&ctx).await;
187 }
188
189 async fn execute_once_with_retry(&mut self, ctx: &Context) {
190 let mut retry = self.exponential_builder.build();
191 let mut retry_times = 0;
192
193 let mut rollback = self.exponential_builder.build();
194 let mut rollback_times = 0;
195
196 loop {
197 if !self.running() {
199 self.meta.set_state(ProcedureState::failed(Arc::new(
200 error::ManagerNotStartSnafu {}.build(),
201 )));
202 return;
203 }
204 let state = self.meta.state();
205 match state {
206 ProcedureState::Running => {}
207 ProcedureState::Retrying { error } => {
208 retry_times += 1;
209 if let Some(d) = retry.next() {
210 let millis = d.as_millis() as u64;
211 let noise = rand::rng().random_range(0..(millis / 4) + 1);
213 let d = d.add(Duration::from_millis(noise));
214
215 self.wait_on_err(d, retry_times).await;
216 } else {
217 self.meta
218 .set_state(ProcedureState::prepare_rollback(Arc::new(
219 Error::RetryTimesExceeded {
220 source: error.clone(),
221 procedure_id: self.meta.id,
222 },
223 )));
224 }
225 }
226 ProcedureState::PrepareRollback { error }
227 | ProcedureState::RollingBack { error } => {
228 rollback_times += 1;
229 if let Some(d) = rollback.next() {
230 self.wait_on_err(d, rollback_times).await;
231 } else {
232 let err = Err::<(), Arc<Error>>(error)
233 .context(RollbackTimesExceededSnafu {
234 procedure_id: self.meta.id,
235 })
236 .unwrap_err();
237 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
238 return;
239 }
240 }
241 ProcedureState::Done { .. } => return,
242 ProcedureState::Failed { .. } => return,
243 ProcedureState::Poisoned { .. } => return,
244 }
245 self.execute_once(ctx).await;
246 }
247 }
248
249 async fn clean_poisons(&mut self) -> Result<()> {
250 let mut error = None;
251 for key in self.meta.poison_keys.iter() {
252 let key = key.to_string();
253 if let Err(e) = self
254 .manager_ctx
255 .poison_manager
256 .delete_poison(key, self.meta.id.to_string())
257 .await
258 {
259 error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
260 error = Some(e);
261 }
262 }
263
264 if let Some(e) = error {
266 return Err(e);
267 }
268 Ok(())
269 }
270
271 async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
272 if self.procedure.rollback_supported()
273 && let Err(e) = self.procedure.rollback(ctx).await
274 {
275 self.meta
276 .set_state(ProcedureState::rolling_back(Arc::new(e)));
277 return;
278 }
279 self.meta.set_state(ProcedureState::failed(err));
280 }
281
282 async fn prepare_rollback(&mut self, err: Arc<Error>) {
283 if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
284 self.meta
285 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
286 return;
287 }
288 if self.procedure.rollback_supported() {
289 self.meta.set_state(ProcedureState::rolling_back(err));
290 } else {
291 self.meta.set_state(ProcedureState::failed(err));
292 }
293 }
294
295 async fn execute_once(&mut self, ctx: &Context) {
296 match self.meta.state() {
297 ProcedureState::Running | ProcedureState::Retrying { .. } => {
298 match self.procedure.execute(ctx).await {
299 Ok(status) => {
300 debug!(
301 "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
302 self.procedure.type_name(),
303 self.meta.id,
304 status,
305 status.need_persist(),
306 );
307
308 if !self.running() {
310 self.meta.set_state(ProcedureState::failed(Arc::new(
311 error::ManagerNotStartSnafu {}.build(),
312 )));
313 return;
314 }
315
316 if status.need_clean_poisons()
318 && let Err(e) = self.clean_poisons().await
319 {
320 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
321 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
322 return;
323 }
324
325 if status.need_persist()
326 && let Err(e) = self.persist_procedure().await
327 {
328 error!(e; "Failed to persist procedure: {}", self.meta.id);
329 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
330 return;
331 }
332
333 match status {
334 Status::Executing { .. } => {
335 let prev_state = self.meta.state();
336 if !matches!(prev_state, ProcedureState::Running) {
337 info!(
338 "Set Procedure {}-{} state to running, prev_state: {:?}",
339 self.procedure.type_name(),
340 self.meta.id,
341 prev_state
342 );
343 self.meta.set_state(ProcedureState::Running);
344 }
345 }
346 Status::Suspended { subprocedures, .. } => {
347 let prev_state = self.meta.state();
348 if !matches!(prev_state, ProcedureState::Running) {
349 info!(
350 "Set Procedure {}-{} state to running, prev_state: {:?}",
351 self.procedure.type_name(),
352 self.meta.id,
353 prev_state
354 );
355 self.meta.set_state(ProcedureState::Running);
356 }
357 self.on_suspended(subprocedures).await;
358 }
359 Status::Done { output } => {
360 if let Err(e) = self.commit_procedure().await {
361 error!(e; "Failed to commit procedure: {}", self.meta.id);
362 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
363 return;
364 }
365
366 self.done(output);
367 }
368 Status::Poisoned { error, keys } => {
369 error!(
370 error;
371 "Procedure {}-{} is poisoned, keys: {:?}",
372 self.procedure.type_name(),
373 self.meta.id,
374 keys,
375 );
376 self.meta
377 .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
378 }
379 }
380 }
381 Err(e) => {
382 error!(
383 e;
384 "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
385 self.procedure.type_name(),
386 self.meta.id,
387 e.is_retry_later(),
388 e.need_clean_poisons(),
389 );
390
391 if !self.running() {
393 self.meta.set_state(ProcedureState::failed(Arc::new(
394 error::ManagerNotStartSnafu {}.build(),
395 )));
396 return;
397 }
398
399 if e.need_clean_poisons() {
400 if let Err(e) = self.clean_poisons().await {
401 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
402 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
403 return;
404 }
405 debug!(
406 "Procedure {}-{} cleaned poisons",
407 self.procedure.type_name(),
408 self.meta.id,
409 );
410 }
411
412 if e.is_retry_later() {
413 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
414 return;
415 }
416
417 if self.procedure.rollback_supported() {
418 self.meta
419 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
420 } else {
421 self.meta.set_state(ProcedureState::failed(Arc::new(e)));
422 }
423 }
424 }
425 }
426 ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
427 ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
428 ProcedureState::Failed { .. }
429 | ProcedureState::Done { .. }
430 | ProcedureState::Poisoned { .. } => (),
431 }
432 }
433
434 fn submit_subprocedure(
436 &self,
437 procedure_id: ProcedureId,
438 procedure_state: ProcedureState,
439 procedure: BoxedProcedure,
440 ) {
441 if self.manager_ctx.contains_procedure(procedure_id) {
442 return;
444 }
445
446 let step = 0;
447
448 let meta = Arc::new(ProcedureMeta::new(
449 procedure_id,
450 procedure_state,
451 Some(self.meta.id),
452 procedure.lock_key(),
453 procedure.poison_keys(),
454 procedure.type_name(),
455 self.event_recorder.clone(),
456 procedure.user_metadata(),
457 ));
458 let runner = Runner {
459 meta: meta.clone(),
460 procedure,
461 manager_ctx: self.manager_ctx.clone(),
462 step,
463 exponential_builder: self.exponential_builder,
464 store: self.store.clone(),
465 rolling_back: false,
466 event_recorder: self.event_recorder.clone(),
467 };
468
469 assert!(
473 self.manager_ctx.try_insert_procedure(meta),
474 "Procedure {}-{} submit an existing procedure {}-{}",
475 self.procedure.type_name(),
476 self.meta.id,
477 runner.procedure.type_name(),
478 procedure_id,
479 );
480
481 self.meta.push_child(procedure_id);
483
484 let _handle = common_runtime::spawn_global(async move {
485 runner.run().await
487 });
488 }
489
490 async fn wait_on_err(&mut self, d: Duration, i: u64) {
492 info!(
493 "Procedure {}-{} retry for the {} times after {} millis",
494 self.procedure.type_name(),
495 self.meta.id,
496 i,
497 d.as_millis(),
498 );
499 time::sleep(d).await;
500 }
501
502 async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
503 let has_child = !subprocedures.is_empty();
504 for subprocedure in subprocedures {
505 info!(
506 "Procedure {}-{} submit subprocedure {}-{}",
507 self.procedure.type_name(),
508 self.meta.id,
509 subprocedure.procedure.type_name(),
510 subprocedure.id,
511 );
512
513 self.submit_subprocedure(
514 subprocedure.id,
515 ProcedureState::Running,
516 subprocedure.procedure,
517 );
518 }
519
520 info!(
521 "Procedure {}-{} is waiting for subprocedures",
522 self.procedure.type_name(),
523 self.meta.id,
524 );
525
526 if has_child {
528 self.meta.child_notify.notified().await;
529
530 info!(
531 "Procedure {}-{} is waked up",
532 self.procedure.type_name(),
533 self.meta.id,
534 );
535 }
536 }
537
538 async fn persist_procedure(&mut self) -> Result<()> {
539 let type_name = self.procedure.type_name().to_string();
540 let data = self.procedure.dump()?;
541
542 self.store
543 .store_procedure(
544 self.meta.id,
545 self.step,
546 type_name,
547 data,
548 self.meta.parent_id,
549 )
550 .await
551 .map_err(|e| {
552 error!(
553 e; "Failed to persist procedure {}-{}",
554 self.procedure.type_name(),
555 self.meta.id
556 );
557 e
558 })?;
559 self.step += 1;
560 Ok(())
561 }
562
563 async fn commit_procedure(&mut self) -> Result<()> {
564 self.store
565 .commit_procedure(self.meta.id, self.step)
566 .await
567 .map_err(|e| {
568 error!(
569 e; "Failed to commit procedure {}-{}",
570 self.procedure.type_name(),
571 self.meta.id
572 );
573 e
574 })?;
575 self.step += 1;
576 Ok(())
577 }
578
579 async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
580 let type_name = self.procedure.type_name().to_string();
582 let data = self.procedure.dump()?;
583 let message = ProcedureMessage {
584 type_name,
585 data,
586 parent_id: self.meta.parent_id,
587 step: self.step,
588 error: Some(error),
589 };
590 self.store
591 .rollback_procedure(self.meta.id, message)
592 .await
593 .map_err(|e| {
594 error!(
595 e; "Failed to write rollback key for procedure {}-{}",
596 self.procedure.type_name(),
597 self.meta.id
598 );
599 e
600 })?;
601 self.step += 1;
602 Ok(())
603 }
604
605 fn done(&self, output: Option<Output>) {
606 info!(
608 "Procedure {}-{} done",
609 self.procedure.type_name(),
610 self.meta.id,
611 );
612
613 self.meta.set_state(ProcedureState::Done { output });
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use std::assert_matches::assert_matches;
621 use std::sync::Arc;
622 use std::sync::atomic::{AtomicU64, Ordering};
623
624 use async_trait::async_trait;
625 use common_error::ext::{ErrorExt, PlainError};
626 use common_error::mock::MockError;
627 use common_error::status_code::StatusCode;
628 use common_test_util::temp_dir::create_temp_dir;
629 use futures::future::join_all;
630 use futures_util::FutureExt;
631 use futures_util::future::BoxFuture;
632 use object_store::{EntryMode, ObjectStore};
633 use tokio::sync::mpsc;
634 use tokio::sync::watch::Receiver;
635
636 use super::*;
637 use crate::local::{DynamicKeyLockGuard, test_util};
638 use crate::procedure::PoisonKeys;
639 use crate::store::proc_path;
640 use crate::test_util::InMemoryPoisonStore;
641 use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
642
643 const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
644
645 fn new_runner(
646 meta: ProcedureMetaRef,
647 procedure: BoxedProcedure,
648 store: Arc<ProcedureStore>,
649 ) -> Runner {
650 Runner {
651 meta,
652 procedure,
653 manager_ctx: Arc::new(ManagerContext::new(
654 Arc::new(InMemoryPoisonStore::default()),
655 )),
656 step: 0,
657 exponential_builder: ExponentialBuilder::default(),
658 store,
659 rolling_back: false,
660 event_recorder: None,
661 }
662 }
663
664 async fn check_files(
665 object_store: &ObjectStore,
666 procedure_store: &ProcedureStore,
667 procedure_id: ProcedureId,
668 files: &[&str],
669 ) {
670 let dir = proc_path!(procedure_store, "{procedure_id}/");
671 let lister = object_store.list(&dir).await.unwrap();
672 let mut files_in_dir: Vec<_> = lister
673 .into_iter()
674 .filter(|x| x.metadata().mode() == EntryMode::FILE)
675 .map(|de| de.name().to_string())
676 .collect();
677 files_in_dir.sort_unstable();
678 assert_eq!(files, files_in_dir);
679 }
680
681 fn context_with_provider(
682 procedure_id: ProcedureId,
683 provider: Arc<dyn ContextProvider>,
684 ) -> Context {
685 Context {
686 procedure_id,
687 provider,
688 }
689 }
690
691 fn context_without_provider(procedure_id: ProcedureId) -> Context {
692 struct MockProvider;
693
694 #[async_trait]
695 impl ContextProvider for MockProvider {
696 async fn procedure_state(
697 &self,
698 _procedure_id: ProcedureId,
699 ) -> Result<Option<ProcedureState>> {
700 unimplemented!()
701 }
702
703 async fn procedure_state_receiver(
704 &self,
705 _procedure_id: ProcedureId,
706 ) -> Result<Option<Receiver<ProcedureState>>> {
707 unimplemented!()
708 }
709
710 async fn try_put_poison(
711 &self,
712 _key: &PoisonKey,
713 _procedure_id: ProcedureId,
714 ) -> Result<()> {
715 unimplemented!()
716 }
717
718 async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
719 unimplemented!()
720 }
721 }
722
723 Context {
724 procedure_id,
725 provider: Arc::new(MockProvider),
726 }
727 }
728
729 type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
730
731 struct ProcedureAdapter<F> {
732 data: String,
733 lock_key: LockKey,
734 poison_keys: PoisonKeys,
735 exec_fn: F,
736 rollback_fn: Option<RollbackFn>,
737 }
738
739 impl<F> ProcedureAdapter<F> {
740 fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
741 let mut meta = test_util::procedure_meta_for_test();
742 meta.id = ProcedureId::parse_str(uuid).unwrap();
743 meta.lock_key = self.lock_key.clone();
744 meta.poison_keys = self.poison_keys.clone();
745
746 Arc::new(meta)
747 }
748 }
749
750 #[async_trait]
751 impl<F> Procedure for ProcedureAdapter<F>
752 where
753 F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
754 {
755 fn type_name(&self) -> &str {
756 "ProcedureAdapter"
757 }
758
759 async fn execute(&mut self, ctx: &Context) -> Result<Status> {
760 let f = (self.exec_fn)(ctx.clone());
761 f.await
762 }
763
764 async fn rollback(&mut self, ctx: &Context) -> Result<()> {
765 if let Some(f) = &mut self.rollback_fn {
766 return (f)(ctx.clone()).await;
767 }
768 Ok(())
769 }
770
771 fn rollback_supported(&self) -> bool {
772 self.rollback_fn.is_some()
773 }
774
775 fn dump(&self) -> Result<String> {
776 Ok(self.data.clone())
777 }
778
779 fn lock_key(&self) -> LockKey {
780 self.lock_key.clone()
781 }
782
783 fn poison_keys(&self) -> PoisonKeys {
784 self.poison_keys.clone()
785 }
786 }
787
788 async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
789 let mut times = 0;
790 let exec_fn = move |_| {
791 times += 1;
792 async move {
793 if times == 1 {
794 Ok(Status::executing(persist))
795 } else {
796 Ok(Status::done())
797 }
798 }
799 .boxed()
800 };
801 let normal = ProcedureAdapter {
802 data: "normal".to_string(),
803 lock_key: LockKey::single_exclusive("catalog.schema.table"),
804 poison_keys: PoisonKeys::default(),
805 exec_fn,
806 rollback_fn: None,
807 };
808
809 let dir = create_temp_dir("normal");
810 let meta = normal.new_meta(ROOT_ID);
811 let ctx = context_without_provider(meta.id);
812 let object_store = test_util::new_object_store(&dir);
813 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
814 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
815 runner.manager_ctx.start();
816
817 runner.execute_once(&ctx).await;
818 let state = runner.meta.state();
819 assert!(state.is_running(), "{state:?}");
820 check_files(
821 &object_store,
822 &procedure_store,
823 ctx.procedure_id,
824 first_files,
825 )
826 .await;
827
828 runner.execute_once(&ctx).await;
829 let state = runner.meta.state();
830 assert!(state.is_done(), "{state:?}");
831 check_files(
832 &object_store,
833 &procedure_store,
834 ctx.procedure_id,
835 second_files,
836 )
837 .await;
838 }
839
840 #[tokio::test]
841 async fn test_execute_once_normal() {
842 execute_once_normal(
843 true,
844 &["0000000000.step"],
845 &["0000000000.step", "0000000001.commit"],
846 )
847 .await;
848 }
849
850 #[tokio::test]
851 async fn test_execute_once_normal_skip_persist() {
852 execute_once_normal(false, &[], &["0000000000.commit"]).await;
853 }
854
855 #[tokio::test]
856 async fn test_on_suspend_empty() {
857 let exec_fn = move |_| {
858 async move {
859 Ok(Status::Suspended {
860 subprocedures: Vec::new(),
861 persist: false,
862 })
863 }
864 .boxed()
865 };
866 let suspend = ProcedureAdapter {
867 data: "suspend".to_string(),
868 lock_key: LockKey::single_exclusive("catalog.schema.table"),
869 poison_keys: PoisonKeys::default(),
870 exec_fn,
871 rollback_fn: None,
872 };
873
874 let dir = create_temp_dir("suspend");
875 let meta = suspend.new_meta(ROOT_ID);
876 let ctx = context_without_provider(meta.id);
877 let object_store = test_util::new_object_store(&dir);
878 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
879 let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
880 runner.manager_ctx.start();
881
882 runner.execute_once(&ctx).await;
883 let state = runner.meta.state();
884 assert!(state.is_running(), "{state:?}");
885 }
886
887 fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
888 let mut times = 0;
889 let exec_fn = move |_| {
890 times += 1;
891 async move {
892 if times == 1 {
893 time::sleep(Duration::from_millis(200)).await;
894 Ok(Status::executing(true))
895 } else {
896 Ok(Status::done())
897 }
898 }
899 .boxed()
900 };
901 let child = ProcedureAdapter {
902 data: "child".to_string(),
903 lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
904 poison_keys: PoisonKeys::default(),
905 exec_fn,
906 rollback_fn: None,
907 };
908
909 ProcedureWithId {
910 id: procedure_id,
911 procedure: Box::new(child),
912 }
913 }
914
915 #[tokio::test]
916 async fn test_on_suspend_by_subprocedures() {
917 let mut times = 0;
918 let children_ids = [ProcedureId::random(), ProcedureId::random()];
919 let keys = [
920 &[
921 "catalog.schema.table.region-0",
922 "catalog.schema.table.region-1",
923 ],
924 &[
925 "catalog.schema.table.region-2",
926 "catalog.schema.table.region-3",
927 ],
928 ];
929
930 let exec_fn = move |ctx: Context| {
931 times += 1;
932 async move {
933 if times == 1 {
934 Ok(Status::Suspended {
936 subprocedures: children_ids
937 .into_iter()
938 .zip(keys)
939 .map(|(id, key_slice)| new_child_procedure(id, key_slice))
940 .collect(),
941 persist: true,
942 })
943 } else {
944 let mut all_child_done = true;
946 for id in children_ids {
947 let is_not_done = ctx
948 .provider
949 .procedure_state(id)
950 .await
951 .unwrap()
952 .map(|s| !s.is_done())
953 .unwrap_or(true);
954 if is_not_done {
955 all_child_done = false;
956 }
957 }
958 if all_child_done {
959 Ok(Status::done())
960 } else {
961 Ok(Status::Suspended {
963 subprocedures: Vec::new(),
964 persist: false,
965 })
966 }
967 }
968 }
969 .boxed()
970 };
971 let parent = ProcedureAdapter {
972 data: "parent".to_string(),
973 lock_key: LockKey::single_exclusive("catalog.schema.table"),
974 poison_keys: PoisonKeys::default(),
975 exec_fn,
976 rollback_fn: None,
977 };
978
979 let dir = create_temp_dir("parent");
980 let meta = parent.new_meta(ROOT_ID);
981 let procedure_id = meta.id;
982
983 let object_store = test_util::new_object_store(&dir);
984 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
985 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
986 let poison_manager = Arc::new(InMemoryPoisonStore::default());
987 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
988 manager_ctx.start();
989 assert!(manager_ctx.try_insert_procedure(meta));
991 runner.manager_ctx = manager_ctx.clone();
993
994 runner.run().await;
995 assert!(manager_ctx.key_lock.is_empty());
996
997 for child_id in children_ids {
999 let state = manager_ctx.state(child_id).unwrap();
1000 assert!(state.is_done(), "{state:?}");
1001 }
1002 let state = manager_ctx.state(procedure_id).unwrap();
1003 assert!(state.is_done(), "{state:?}");
1004 check_files(&object_store, &procedure_store, procedure_id, &[]).await;
1006
1007 tokio::time::sleep(Duration::from_millis(5)).await;
1008 manager_ctx.remove_outdated_meta(Duration::from_millis(1));
1010 assert!(manager_ctx.state(procedure_id).is_none());
1011 assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
1012 for child_id in children_ids {
1013 assert!(manager_ctx.state(child_id).is_none());
1014 }
1015 }
1016
1017 #[tokio::test]
1018 async fn test_running_is_stopped() {
1019 let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
1020 let normal = ProcedureAdapter {
1021 data: "normal".to_string(),
1022 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1023 poison_keys: PoisonKeys::default(),
1024 exec_fn,
1025 rollback_fn: None,
1026 };
1027
1028 let dir = create_temp_dir("test_running_is_stopped");
1029 let meta = normal.new_meta(ROOT_ID);
1030 let ctx = context_without_provider(meta.id);
1031 let object_store = test_util::new_object_store(&dir);
1032 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1033 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1034 runner.manager_ctx.start();
1035
1036 runner.execute_once(&ctx).await;
1037 let state = runner.meta.state();
1038 assert!(state.is_running(), "{state:?}");
1039 check_files(
1040 &object_store,
1041 &procedure_store,
1042 ctx.procedure_id,
1043 &["0000000000.step"],
1044 )
1045 .await;
1046
1047 runner.manager_ctx.stop();
1048 runner.execute_once(&ctx).await;
1049 let state = runner.meta.state();
1050 assert!(state.is_failed(), "{state:?}");
1051 check_files(
1053 &object_store,
1054 &procedure_store,
1055 ctx.procedure_id,
1056 &["0000000000.step"],
1057 )
1058 .await;
1059 }
1060
1061 #[tokio::test]
1062 async fn test_running_is_stopped_on_error() {
1063 let exec_fn =
1064 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1065 let normal = ProcedureAdapter {
1066 data: "fail".to_string(),
1067 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1068 poison_keys: PoisonKeys::default(),
1069 exec_fn,
1070 rollback_fn: None,
1071 };
1072
1073 let dir = create_temp_dir("test_running_is_stopped_on_error");
1074 let meta = normal.new_meta(ROOT_ID);
1075 let ctx = context_without_provider(meta.id);
1076 let object_store = test_util::new_object_store(&dir);
1077 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1078 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1079 runner.manager_ctx.stop();
1080
1081 runner.execute_once(&ctx).await;
1082 let state = runner.meta.state();
1083 assert!(state.is_failed(), "{state:?}");
1084 check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1086 }
1087
1088 #[tokio::test]
1089 async fn test_execute_on_error() {
1090 let exec_fn =
1091 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1092 let fail = ProcedureAdapter {
1093 data: "fail".to_string(),
1094 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1095 poison_keys: PoisonKeys::default(),
1096 exec_fn,
1097 rollback_fn: None,
1098 };
1099
1100 let dir = create_temp_dir("fail");
1101 let meta = fail.new_meta(ROOT_ID);
1102 let ctx = context_without_provider(meta.id);
1103 let object_store = test_util::new_object_store(&dir);
1104 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1105 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1106 runner.manager_ctx.start();
1107
1108 runner.execute_once(&ctx).await;
1109 let state = runner.meta.state();
1110 assert!(state.is_failed(), "{state:?}");
1111 check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1112 }
1113
1114 #[tokio::test]
1115 async fn test_execute_with_rollback_on_error() {
1116 let exec_fn =
1117 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1118 let rollback_fn = move |_| async move { Ok(()) }.boxed();
1119 let fail = ProcedureAdapter {
1120 data: "fail".to_string(),
1121 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1122 poison_keys: PoisonKeys::default(),
1123 exec_fn,
1124 rollback_fn: Some(Box::new(rollback_fn)),
1125 };
1126
1127 let dir = create_temp_dir("fail");
1128 let meta = fail.new_meta(ROOT_ID);
1129 let ctx = context_without_provider(meta.id);
1130 let object_store = test_util::new_object_store(&dir);
1131 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1132 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1133 runner.manager_ctx.start();
1134
1135 runner.execute_once(&ctx).await;
1136 let state = runner.meta.state();
1137 assert!(state.is_prepare_rollback(), "{state:?}");
1138
1139 runner.execute_once(&ctx).await;
1140 let state = runner.meta.state();
1141 assert!(state.is_rolling_back(), "{state:?}");
1142
1143 runner.execute_once(&ctx).await;
1144 let state = runner.meta.state();
1145 assert!(state.is_failed(), "{state:?}");
1146 check_files(
1147 &object_store,
1148 &procedure_store,
1149 ctx.procedure_id,
1150 &["0000000000.rollback"],
1151 )
1152 .await;
1153 }
1154
1155 #[tokio::test]
1156 async fn test_execute_on_retry_later_error() {
1157 let mut times = 0;
1158
1159 let exec_fn = move |_| {
1160 times += 1;
1161 async move {
1162 if times == 1 {
1163 Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1164 } else if times == 2 {
1165 Ok(Status::executing(false))
1166 } else {
1167 Ok(Status::done())
1168 }
1169 }
1170 .boxed()
1171 };
1172
1173 let retry_later = ProcedureAdapter {
1174 data: "retry_later".to_string(),
1175 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1176 poison_keys: PoisonKeys::default(),
1177 exec_fn,
1178 rollback_fn: None,
1179 };
1180
1181 let dir = create_temp_dir("retry_later");
1182 let meta = retry_later.new_meta(ROOT_ID);
1183 let ctx = context_without_provider(meta.id);
1184 let object_store = test_util::new_object_store(&dir);
1185 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1186 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1187 runner.manager_ctx.start();
1188 runner.execute_once(&ctx).await;
1189 let state = runner.meta.state();
1190 assert!(state.is_retrying(), "{state:?}");
1191
1192 runner.execute_once(&ctx).await;
1193 let state = runner.meta.state();
1194 assert!(state.is_running(), "{state:?}");
1195
1196 runner.execute_once(&ctx).await;
1197 let state = runner.meta.state();
1198 assert!(state.is_done(), "{state:?}");
1199 assert!(meta.state().is_done());
1200 check_files(
1201 &object_store,
1202 &procedure_store,
1203 ctx.procedure_id,
1204 &["0000000000.commit"],
1205 )
1206 .await;
1207 }
1208
1209 #[tokio::test(flavor = "multi_thread")]
1210 async fn test_execute_on_retry_later_error_with_child() {
1211 common_telemetry::init_default_ut_logging();
1212 let mut times = 0;
1213 let child_id = ProcedureId::random();
1214
1215 let exec_fn = move |_| {
1216 times += 1;
1217 async move {
1218 debug!("times: {}", times);
1219 if times == 1 {
1220 Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1221 } else if times == 2 {
1222 let exec_fn = |_| {
1223 async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1224 .boxed()
1225 };
1226 let fail = ProcedureAdapter {
1227 data: "fail".to_string(),
1228 lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1229 poison_keys: PoisonKeys::default(),
1230 exec_fn,
1231 rollback_fn: None,
1232 };
1233
1234 Ok(Status::Suspended {
1235 subprocedures: vec![ProcedureWithId {
1236 id: child_id,
1237 procedure: Box::new(fail),
1238 }],
1239 persist: true,
1240 })
1241 } else {
1242 Ok(Status::done())
1243 }
1244 }
1245 .boxed()
1246 };
1247
1248 let retry_later = ProcedureAdapter {
1249 data: "retry_later".to_string(),
1250 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1251 poison_keys: PoisonKeys::default(),
1252 exec_fn,
1253 rollback_fn: None,
1254 };
1255
1256 let dir = create_temp_dir("retry_later");
1257 let meta = retry_later.new_meta(ROOT_ID);
1258 let ctx = context_without_provider(meta.id);
1259 let object_store = test_util::new_object_store(&dir);
1260 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1261 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1262 runner.manager_ctx.start();
1263 debug!("execute_once 1");
1264 runner.execute_once(&ctx).await;
1265 let state = runner.meta.state();
1266 assert!(state.is_retrying(), "{state:?}");
1267
1268 let moved_meta = meta.clone();
1269 tokio::spawn(async move {
1270 moved_meta.child_notify.notify_one();
1271 });
1272 runner.execute_once(&ctx).await;
1273 let state = runner.meta.state();
1274 assert!(state.is_running(), "{state:?}");
1275
1276 runner.execute_once(&ctx).await;
1277 let state = runner.meta.state();
1278 assert!(state.is_done(), "{state:?}");
1279 assert!(meta.state().is_done());
1280 check_files(
1281 &object_store,
1282 &procedure_store,
1283 ctx.procedure_id,
1284 &["0000000000.step", "0000000001.commit"],
1285 )
1286 .await;
1287 }
1288
1289 #[tokio::test]
1290 async fn test_execute_exceed_max_retry_later() {
1291 let exec_fn =
1292 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1293
1294 let exceed_max_retry_later = ProcedureAdapter {
1295 data: "exceed_max_retry_later".to_string(),
1296 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1297 poison_keys: PoisonKeys::default(),
1298 exec_fn,
1299 rollback_fn: None,
1300 };
1301
1302 let dir = create_temp_dir("exceed_max_retry_later");
1303 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1304 let object_store = test_util::new_object_store(&dir);
1305 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1306 let mut runner = new_runner(
1307 meta.clone(),
1308 Box::new(exceed_max_retry_later),
1309 procedure_store,
1310 );
1311 runner.manager_ctx.start();
1312
1313 runner.exponential_builder = ExponentialBuilder::default()
1314 .with_min_delay(Duration::from_millis(1))
1315 .with_max_times(3);
1316
1317 runner.execute_procedure_in_loop().await;
1319 let err = meta.state().error().unwrap().to_string();
1320 assert!(err.contains("Procedure retry exceeded max times"));
1321 }
1322
1323 #[tokio::test]
1324 async fn test_rollback_exceed_max_retry_later() {
1325 let exec_fn =
1326 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1327 let rollback_fn = move |_| {
1328 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1329 };
1330 let exceed_max_retry_later = ProcedureAdapter {
1331 data: "exceed_max_rollback".to_string(),
1332 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1333 poison_keys: PoisonKeys::default(),
1334 exec_fn,
1335 rollback_fn: Some(Box::new(rollback_fn)),
1336 };
1337
1338 let dir = create_temp_dir("exceed_max_rollback");
1339 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1340 let object_store = test_util::new_object_store(&dir);
1341 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1342 let mut runner = new_runner(
1343 meta.clone(),
1344 Box::new(exceed_max_retry_later),
1345 procedure_store,
1346 );
1347 runner.manager_ctx.start();
1348 runner.exponential_builder = ExponentialBuilder::default()
1349 .with_min_delay(Duration::from_millis(1))
1350 .with_max_times(3);
1351
1352 runner.execute_procedure_in_loop().await;
1354 let err = meta.state().error().unwrap().to_string();
1355 assert!(err.contains("Procedure rollback exceeded max times"));
1356 }
1357
1358 #[tokio::test]
1359 async fn test_rollback_after_retry_fail() {
1360 let exec_fn = move |_| {
1361 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1362 };
1363
1364 let (tx, mut rx) = mpsc::channel(1);
1365 let rollback_fn = move |_| {
1366 let tx = tx.clone();
1367 async move {
1368 tx.send(()).await.unwrap();
1369 Ok(())
1370 }
1371 .boxed()
1372 };
1373 let retry_later = ProcedureAdapter {
1374 data: "rollback_after_retry_fail".to_string(),
1375 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1376 poison_keys: PoisonKeys::default(),
1377 exec_fn,
1378 rollback_fn: Some(Box::new(rollback_fn)),
1379 };
1380
1381 let dir = create_temp_dir("retry_later");
1382 let meta = retry_later.new_meta(ROOT_ID);
1383 let ctx = context_without_provider(meta.id);
1384 let object_store = test_util::new_object_store(&dir);
1385 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1386 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1387 runner.manager_ctx.start();
1388 runner.exponential_builder = ExponentialBuilder::default()
1389 .with_min_delay(Duration::from_millis(1))
1390 .with_max_times(3);
1391 runner.execute_procedure_in_loop().await;
1393 rx.recv().await.unwrap();
1394 assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1395 check_files(
1396 &object_store,
1397 &procedure_store,
1398 ctx.procedure_id,
1399 &["0000000000.rollback"],
1400 )
1401 .await;
1402 }
1403
1404 #[tokio::test]
1405 async fn test_child_error() {
1406 let mut times = 0;
1407 let child_id = ProcedureId::random();
1408 common_telemetry::init_default_ut_logging();
1409 let exec_fn = move |ctx: Context| {
1410 times += 1;
1411 async move {
1412 if times == 1 {
1413 let exec_fn = |_| {
1415 async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1416 .boxed()
1417 };
1418 let fail = ProcedureAdapter {
1419 data: "fail".to_string(),
1420 lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1421 poison_keys: PoisonKeys::default(),
1422 exec_fn,
1423 rollback_fn: None,
1424 };
1425
1426 Ok(Status::Suspended {
1427 subprocedures: vec![ProcedureWithId {
1428 id: child_id,
1429 procedure: Box::new(fail),
1430 }],
1431 persist: true,
1432 })
1433 } else {
1434 let state = ctx.provider.procedure_state(child_id).await.unwrap();
1436 let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1437 if is_failed {
1438 Err(Error::from_error_ext(PlainError::new(
1440 "subprocedure failed".to_string(),
1441 StatusCode::Unexpected,
1442 )))
1443 } else {
1444 Ok(Status::Suspended {
1446 subprocedures: Vec::new(),
1447 persist: false,
1448 })
1449 }
1450 }
1451 }
1452 .boxed()
1453 };
1454 let parent = ProcedureAdapter {
1455 data: "parent".to_string(),
1456 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1457 poison_keys: PoisonKeys::default(),
1458 exec_fn,
1459 rollback_fn: None,
1460 };
1461
1462 let dir = create_temp_dir("child_err");
1463 let meta = parent.new_meta(ROOT_ID);
1464
1465 let object_store = test_util::new_object_store(&dir);
1466 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1467 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1468 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1469 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1470 manager_ctx.start();
1471 assert!(manager_ctx.try_insert_procedure(meta.clone()));
1473 runner.manager_ctx = manager_ctx.clone();
1475
1476 runner.run().await;
1478 assert!(manager_ctx.key_lock.is_empty());
1479 let err = meta.state().error().unwrap().output_msg();
1480 assert!(err.contains("subprocedure failed"), "{err}");
1481 }
1482
1483 #[tokio::test]
1484 async fn test_execute_with_clean_poisons() {
1485 common_telemetry::init_default_ut_logging();
1486 let mut times = 0;
1487 let poison_key = PoisonKey::new("table/1024");
1488 let moved_poison_key = poison_key.clone();
1489 let exec_fn = move |ctx: Context| {
1490 times += 1;
1491 let poison_key = moved_poison_key.clone();
1492 async move {
1493 if times == 1 {
1494 ctx.provider
1496 .try_put_poison(&poison_key, ctx.procedure_id)
1497 .await
1498 .unwrap();
1499
1500 Ok(Status::executing(true))
1501 } else {
1502 Ok(Status::executing_with_clean_poisons(true))
1503 }
1504 }
1505 .boxed()
1506 };
1507 let poison = ProcedureAdapter {
1508 data: "poison".to_string(),
1509 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1510 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1511 exec_fn,
1512 rollback_fn: None,
1513 };
1514
1515 let dir = create_temp_dir("clean_poisons");
1516 let meta = poison.new_meta(ROOT_ID);
1517
1518 let object_store = test_util::new_object_store(&dir);
1519 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1520 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1521
1522 let ctx = context_with_provider(
1524 meta.id,
1525 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1526 );
1527 runner
1529 .manager_ctx
1530 .procedures
1531 .write()
1532 .unwrap()
1533 .insert(meta.id, runner.meta.clone());
1534
1535 runner.manager_ctx.start();
1536 runner.execute_once(&ctx).await;
1537 let state = runner.meta.state();
1538 assert!(state.is_running(), "{state:?}");
1539
1540 let procedure_id = runner
1541 .manager_ctx
1542 .poison_manager
1543 .get_poison(&poison_key.to_string())
1544 .await
1545 .unwrap();
1546 assert!(procedure_id.is_some());
1548
1549 runner.execute_once(&ctx).await;
1550 let state = runner.meta.state();
1551 assert!(state.is_running(), "{state:?}");
1552
1553 let procedure_id = runner
1554 .manager_ctx
1555 .poison_manager
1556 .get_poison(&poison_key.to_string())
1557 .await
1558 .unwrap();
1559 assert!(procedure_id.is_none());
1561 }
1562
1563 #[tokio::test]
1564 async fn test_execute_error_with_clean_poisons() {
1565 common_telemetry::init_default_ut_logging();
1566 let mut times = 0;
1567 let poison_key = PoisonKey::new("table/1024");
1568 let moved_poison_key = poison_key.clone();
1569 let exec_fn = move |ctx: Context| {
1570 times += 1;
1571 let poison_key = moved_poison_key.clone();
1572 async move {
1573 if times == 1 {
1574 ctx.provider
1576 .try_put_poison(&poison_key, ctx.procedure_id)
1577 .await
1578 .unwrap();
1579
1580 Ok(Status::executing(true))
1581 } else {
1582 Err(Error::external_and_clean_poisons(MockError::new(
1583 StatusCode::Unexpected,
1584 )))
1585 }
1586 }
1587 .boxed()
1588 };
1589 let poison = ProcedureAdapter {
1590 data: "poison".to_string(),
1591 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1592 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1593 exec_fn,
1594 rollback_fn: None,
1595 };
1596
1597 let dir = create_temp_dir("error_with_clean_poisons");
1598 let meta = poison.new_meta(ROOT_ID);
1599
1600 let object_store = test_util::new_object_store(&dir);
1601 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1602 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1603
1604 let ctx = context_with_provider(
1606 meta.id,
1607 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1608 );
1609 runner
1611 .manager_ctx
1612 .procedures
1613 .write()
1614 .unwrap()
1615 .insert(meta.id, runner.meta.clone());
1616
1617 runner.manager_ctx.start();
1618 runner.execute_once(&ctx).await;
1619 let state = runner.meta.state();
1620 assert!(state.is_running(), "{state:?}");
1621
1622 let procedure_id = runner
1623 .manager_ctx
1624 .poison_manager
1625 .get_poison(&poison_key.to_string())
1626 .await
1627 .unwrap();
1628 assert!(procedure_id.is_some());
1630
1631 runner.execute_once(&ctx).await;
1632 let state = runner.meta.state();
1633 assert!(state.is_failed(), "{state:?}");
1634
1635 let procedure_id = runner
1636 .manager_ctx
1637 .poison_manager
1638 .get_poison(&poison_key.to_string())
1639 .await
1640 .unwrap();
1641 assert!(procedure_id.is_none());
1643 }
1644
1645 #[tokio::test]
1646 async fn test_execute_failed_after_set_poison() {
1647 let mut times = 0;
1648 let poison_key = PoisonKey::new("table/1024");
1649 let moved_poison_key = poison_key.clone();
1650 let exec_fn = move |ctx: Context| {
1651 times += 1;
1652 let poison_key = moved_poison_key.clone();
1653 async move {
1654 if times == 1 {
1655 Ok(Status::executing(true))
1656 } else {
1657 ctx.provider
1659 .try_put_poison(&poison_key, ctx.procedure_id)
1660 .await
1661 .unwrap();
1662 Err(Error::external(MockError::new(StatusCode::Unexpected)))
1663 }
1664 }
1665 .boxed()
1666 };
1667 let poison = ProcedureAdapter {
1668 data: "poison".to_string(),
1669 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1670 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1671 exec_fn,
1672 rollback_fn: None,
1673 };
1674
1675 let dir = create_temp_dir("poison");
1676 let meta = poison.new_meta(ROOT_ID);
1677
1678 let object_store = test_util::new_object_store(&dir);
1679 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1680 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1681
1682 let ctx = context_with_provider(
1684 meta.id,
1685 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1686 );
1687 runner
1689 .manager_ctx
1690 .procedures
1691 .write()
1692 .unwrap()
1693 .insert(meta.id, runner.meta.clone());
1694
1695 runner.manager_ctx.start();
1696 runner.execute_once(&ctx).await;
1697 let state = runner.meta.state();
1698 assert!(state.is_running(), "{state:?}");
1699
1700 runner.execute_once(&ctx).await;
1701 let state = runner.meta.state();
1702 assert!(state.is_failed(), "{state:?}");
1703 assert!(meta.state().is_failed());
1704
1705 let procedure_id = runner
1707 .manager_ctx
1708 .poison_manager
1709 .get_poison(&poison_key.to_string())
1710 .await
1711 .unwrap()
1712 .unwrap();
1713
1714 assert_eq!(&procedure_id.clone(), ROOT_ID);
1716 }
1717
1718 #[tokio::test]
1719 async fn test_execute_exceed_max_retry_after_set_poison() {
1720 common_telemetry::init_default_ut_logging();
1721 let mut times = 0;
1722 let poison_key = PoisonKey::new("table/1024");
1723 let moved_poison_key = poison_key.clone();
1724 let exec_fn = move |ctx: Context| {
1725 times += 1;
1726 let poison_key = moved_poison_key.clone();
1727 async move {
1728 if times == 1 {
1729 Ok(Status::executing(true))
1730 } else {
1731 ctx.provider
1733 .try_put_poison(&poison_key, ctx.procedure_id)
1734 .await
1735 .unwrap();
1736 Err(Error::retry_later_and_clean_poisons(MockError::new(
1737 StatusCode::Unexpected,
1738 )))
1739 }
1740 }
1741 .boxed()
1742 };
1743 let poison = ProcedureAdapter {
1744 data: "poison".to_string(),
1745 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1746 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1747 exec_fn,
1748 rollback_fn: None,
1749 };
1750
1751 let dir = create_temp_dir("exceed_max_after_set_poison");
1752 let meta = poison.new_meta(ROOT_ID);
1753 let object_store = test_util::new_object_store(&dir);
1754 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1755 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
1756 runner.manager_ctx.start();
1757 runner.exponential_builder = ExponentialBuilder::default()
1758 .with_min_delay(Duration::from_millis(1))
1759 .with_max_times(3);
1760 let ctx = context_with_provider(
1762 meta.id,
1763 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1764 );
1765 runner
1767 .manager_ctx
1768 .procedures
1769 .write()
1770 .unwrap()
1771 .insert(meta.id, runner.meta.clone());
1772 runner.execute_once_with_retry(&ctx).await;
1774 let err = meta.state().error().unwrap().clone();
1775 assert_matches!(&*err, Error::RetryTimesExceeded { .. });
1776
1777 let procedure_id = runner
1779 .manager_ctx
1780 .poison_manager
1781 .get_poison(&poison_key.to_string())
1782 .await
1783 .unwrap();
1784 assert_eq!(procedure_id, None);
1785 }
1786
1787 #[tokio::test]
1788 async fn test_execute_poisoned() {
1789 let mut times = 0;
1790 let poison_key = PoisonKey::new("table/1024");
1791 let moved_poison_key = poison_key.clone();
1792 let exec_fn = move |ctx: Context| {
1793 times += 1;
1794 let poison_key = moved_poison_key.clone();
1795 async move {
1796 if times == 1 {
1797 Ok(Status::executing(true))
1798 } else {
1799 ctx.provider
1801 .try_put_poison(&poison_key, ctx.procedure_id)
1802 .await
1803 .unwrap();
1804 Ok(Status::Poisoned {
1805 keys: PoisonKeys::new(vec![poison_key.clone()]),
1806 error: Error::external(MockError::new(StatusCode::Unexpected)),
1807 })
1808 }
1809 }
1810 .boxed()
1811 };
1812 let poison = ProcedureAdapter {
1813 data: "poison".to_string(),
1814 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1815 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1816 exec_fn,
1817 rollback_fn: None,
1818 };
1819
1820 let dir = create_temp_dir("poison");
1821 let meta = poison.new_meta(ROOT_ID);
1822
1823 let object_store = test_util::new_object_store(&dir);
1824 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1825 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1826
1827 let ctx = context_with_provider(
1829 meta.id,
1830 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1831 );
1832 runner
1834 .manager_ctx
1835 .procedures
1836 .write()
1837 .unwrap()
1838 .insert(meta.id, runner.meta.clone());
1839
1840 runner.manager_ctx.start();
1841 runner.execute_once(&ctx).await;
1842 let state = runner.meta.state();
1843 assert!(state.is_running(), "{state:?}");
1844
1845 runner.execute_once(&ctx).await;
1846 let state = runner.meta.state();
1847 assert!(state.is_poisoned(), "{state:?}");
1848 assert!(meta.state().is_poisoned());
1849 check_files(
1850 &object_store,
1851 &procedure_store,
1852 ctx.procedure_id,
1853 &["0000000000.step"],
1854 )
1855 .await;
1856
1857 let procedure_id = runner
1859 .manager_ctx
1860 .poison_manager
1861 .get_poison(&poison_key.to_string())
1862 .await
1863 .unwrap()
1864 .unwrap();
1865
1866 assert_eq!(procedure_id, ROOT_ID);
1868 }
1869
1870 fn test_procedure_with_dynamic_lock(
1871 shared_atomic_value: Arc<AtomicU64>,
1872 id: u64,
1873 ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1874 let exec_fn = move |ctx: Context| {
1875 let moved_shared_atomic_value = shared_atomic_value.clone();
1876 let moved_ctx = ctx.clone();
1877 async move {
1878 debug!("Acquiring write lock, id: {}", id);
1879 let key = StringKey::Exclusive("test_lock".to_string());
1880 let guard = moved_ctx.provider.acquire_lock(&key).await;
1881 debug!("Acquired write lock, id: {}", id);
1882 let millis = rand::rng().random_range(10..=50);
1883 tokio::time::sleep(Duration::from_millis(millis)).await;
1884 let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1885 moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1886 debug!("Dropping write lock, id: {}", id);
1887 drop(guard);
1888
1889 Ok(Status::done())
1890 }
1891 .boxed()
1892 };
1893
1894 let adapter = ProcedureAdapter {
1895 data: "dynamic_lock".to_string(),
1896 lock_key: LockKey::new_exclusive([]),
1897 poison_keys: PoisonKeys::new([]),
1898 exec_fn,
1899 rollback_fn: None,
1900 };
1901 let meta = adapter.new_meta(ROOT_ID);
1902
1903 (Box::new(adapter), meta)
1904 }
1905
1906 #[tokio::test(flavor = "multi_thread")]
1907 async fn test_execute_with_dynamic_lock() {
1908 common_telemetry::init_default_ut_logging();
1909 let shared_atomic_value = Arc::new(AtomicU64::new(0));
1910 let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1911 let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1912
1913 let dir = create_temp_dir("dynamic_lock");
1914 let object_store = test_util::new_object_store(&dir);
1915 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1916 let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
1917 let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
1918 let ctx1 = context_with_provider(
1919 meta1.id,
1920 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1921 );
1922 let ctx2 = context_with_provider(
1923 meta2.id,
1924 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1926 );
1927 let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
1928 join_all(tasks).await;
1929 assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
1930 }
1931}