1use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_telemetry::{debug, error, info};
21use rand::Rng;
22use snafu::ResultExt;
23use tokio::time;
24
25use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
26use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
27use crate::procedure::{Output, StringKey};
28use crate::rwlock::OwnedKeyRwLockGuard;
29use crate::store::{ProcedureMessage, ProcedureStore};
30use crate::{
31 BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
32};
33
34struct ProcedureGuard {
36 meta: ProcedureMetaRef,
37 manager_ctx: Arc<ManagerContext>,
38 key_guards: Vec<OwnedKeyRwLockGuard>,
39 finish: bool,
40}
41
42impl ProcedureGuard {
43 fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
45 ProcedureGuard {
46 meta,
47 manager_ctx,
48 key_guards: vec![],
49 finish: false,
50 }
51 }
52
53 fn finish(mut self) {
55 self.finish = true;
56 }
57}
58
59impl Drop for ProcedureGuard {
60 fn drop(&mut self) {
61 if !self.finish {
62 error!("Procedure {} exits unexpectedly", self.meta.id);
63
64 let err = ProcedurePanicSnafu {
68 procedure_id: self.meta.id,
69 }
70 .build();
71 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
72 }
73
74 if let Some(parent_id) = self.meta.parent_id {
76 self.manager_ctx.notify_by_subprocedure(parent_id);
77 }
78
79 while !self.key_guards.is_empty() {
81 self.key_guards.pop();
82 }
83
84 self.manager_ctx
86 .key_lock
87 .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
88 }
89}
90
91pub(crate) struct Runner {
92 pub(crate) meta: ProcedureMetaRef,
93 pub(crate) procedure: BoxedProcedure,
94 pub(crate) manager_ctx: Arc<ManagerContext>,
95 pub(crate) step: u32,
96 pub(crate) exponential_builder: ExponentialBuilder,
97 pub(crate) store: Arc<ProcedureStore>,
98 pub(crate) rolling_back: bool,
99}
100
101impl Runner {
102 pub(crate) fn running(&self) -> bool {
104 self.manager_ctx.running()
105 }
106
107 pub(crate) async fn run(mut self) {
109 let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
111
112 info!(
113 "Runner {}-{} starts",
114 self.procedure.type_name(),
115 self.meta.id
116 );
117
118 for key in self.meta.lock_key.keys_to_lock() {
121 let key_guard = match key {
123 StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
124 StringKey::Exclusive(key) => {
125 self.manager_ctx.key_lock.write(key.clone()).await.into()
126 }
127 };
128
129 guard.key_guards.push(key_guard);
130 }
131
132 self.meta.set_start_time_ms();
135 self.execute_procedure_in_loop().await;
136 self.meta.set_end_time_ms();
137
138 guard.finish();
145
146 if self.meta.parent_id.is_none() {
148 let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
149 self.manager_ctx.on_procedures_finish(&procedure_ids);
151
152 if !self.running() {
154 return;
155 }
156
157 for id in procedure_ids {
158 if let Err(e) = self.store.delete_procedure(id).await {
159 error!(
160 e;
161 "Runner {}-{} failed to delete procedure {}",
162 self.procedure.type_name(),
163 self.meta.id,
164 id,
165 );
166 }
167 }
168 }
169
170 info!(
171 "Runner {}-{} exits",
172 self.procedure.type_name(),
173 self.meta.id
174 );
175 }
176
177 async fn execute_procedure_in_loop(&mut self) {
178 let ctx = Context {
179 procedure_id: self.meta.id,
180 provider: self.manager_ctx.clone(),
181 };
182
183 self.rolling_back = false;
184 self.execute_once_with_retry(&ctx).await;
185 }
186
187 async fn execute_once_with_retry(&mut self, ctx: &Context) {
188 let mut retry = self.exponential_builder.build();
189 let mut retry_times = 0;
190
191 let mut rollback = self.exponential_builder.build();
192 let mut rollback_times = 0;
193
194 loop {
195 if !self.running() {
197 self.meta.set_state(ProcedureState::failed(Arc::new(
198 error::ManagerNotStartSnafu {}.build(),
199 )));
200 return;
201 }
202 let state = self.meta.state();
203 match state {
204 ProcedureState::Running => {}
205 ProcedureState::Retrying { error } => {
206 retry_times += 1;
207 if let Some(d) = retry.next() {
208 let millis = d.as_millis() as u64;
209 let noise = rand::rng().random_range(0..(millis / 4) + 1);
211 let d = d.add(Duration::from_millis(noise));
212
213 self.wait_on_err(d, retry_times).await;
214 } else {
215 self.meta
216 .set_state(ProcedureState::prepare_rollback(Arc::new(
217 Error::RetryTimesExceeded {
218 source: error.clone(),
219 procedure_id: self.meta.id,
220 },
221 )));
222 }
223 }
224 ProcedureState::PrepareRollback { error }
225 | ProcedureState::RollingBack { error } => {
226 rollback_times += 1;
227 if let Some(d) = rollback.next() {
228 self.wait_on_err(d, rollback_times).await;
229 } else {
230 let err = Err::<(), Arc<Error>>(error)
231 .context(RollbackTimesExceededSnafu {
232 procedure_id: self.meta.id,
233 })
234 .unwrap_err();
235 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
236 return;
237 }
238 }
239 ProcedureState::Done { .. } => return,
240 ProcedureState::Failed { .. } => return,
241 ProcedureState::Poisoned { .. } => return,
242 }
243 self.execute_once(ctx).await;
244 }
245 }
246
247 async fn clean_poisons(&mut self) -> Result<()> {
248 let mut error = None;
249 for key in self.meta.poison_keys.iter() {
250 let key = key.to_string();
251 if let Err(e) = self
252 .manager_ctx
253 .poison_manager
254 .delete_poison(key, self.meta.id.to_string())
255 .await
256 {
257 error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
258 error = Some(e);
259 }
260 }
261
262 if let Some(e) = error {
264 return Err(e);
265 }
266 Ok(())
267 }
268
269 async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
270 if self.procedure.rollback_supported() {
271 if let Err(e) = self.procedure.rollback(ctx).await {
272 self.meta
273 .set_state(ProcedureState::rolling_back(Arc::new(e)));
274 return;
275 }
276 }
277 self.meta.set_state(ProcedureState::failed(err));
278 }
279
280 async fn prepare_rollback(&mut self, err: Arc<Error>) {
281 if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
282 self.meta
283 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
284 return;
285 }
286 if self.procedure.rollback_supported() {
287 self.meta.set_state(ProcedureState::rolling_back(err));
288 } else {
289 self.meta.set_state(ProcedureState::failed(err));
290 }
291 }
292
293 async fn execute_once(&mut self, ctx: &Context) {
294 match self.meta.state() {
295 ProcedureState::Running | ProcedureState::Retrying { .. } => {
296 match self.procedure.execute(ctx).await {
297 Ok(status) => {
298 debug!(
299 "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
300 self.procedure.type_name(),
301 self.meta.id,
302 status,
303 status.need_persist(),
304 );
305
306 if !self.running() {
308 self.meta.set_state(ProcedureState::failed(Arc::new(
309 error::ManagerNotStartSnafu {}.build(),
310 )));
311 return;
312 }
313
314 if status.need_clean_poisons() {
316 if let Err(e) = self.clean_poisons().await {
317 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
318 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
319 return;
320 }
321 }
322
323 if status.need_persist() {
324 if let Err(e) = self.persist_procedure().await {
325 error!(e; "Failed to persist procedure: {}", self.meta.id);
326 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
327 return;
328 }
329 }
330
331 match status {
332 Status::Executing { .. } => {}
333 Status::Suspended { subprocedures, .. } => {
334 self.on_suspended(subprocedures).await;
335 }
336 Status::Done { output } => {
337 if let Err(e) = self.commit_procedure().await {
338 error!(e; "Failed to commit procedure: {}", self.meta.id);
339 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
340 return;
341 }
342
343 self.done(output);
344 }
345 Status::Poisoned { error, keys } => {
346 error!(
347 error;
348 "Procedure {}-{} is poisoned, keys: {:?}",
349 self.procedure.type_name(),
350 self.meta.id,
351 keys,
352 );
353 self.meta
354 .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
355 }
356 }
357 }
358 Err(e) => {
359 error!(
360 e;
361 "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
362 self.procedure.type_name(),
363 self.meta.id,
364 e.is_retry_later(),
365 e.need_clean_poisons(),
366 );
367
368 if !self.running() {
370 self.meta.set_state(ProcedureState::failed(Arc::new(
371 error::ManagerNotStartSnafu {}.build(),
372 )));
373 return;
374 }
375
376 if e.need_clean_poisons() {
377 if let Err(e) = self.clean_poisons().await {
378 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
379 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
380 return;
381 }
382 debug!(
383 "Procedure {}-{} cleaned poisons",
384 self.procedure.type_name(),
385 self.meta.id,
386 );
387 }
388
389 if e.is_retry_later() {
390 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
391 return;
392 }
393
394 self.meta
395 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
396 }
397 }
398 }
399 ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
400 ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
401 ProcedureState::Failed { .. }
402 | ProcedureState::Done { .. }
403 | ProcedureState::Poisoned { .. } => (),
404 }
405 }
406
407 fn submit_subprocedure(
409 &self,
410 procedure_id: ProcedureId,
411 procedure_state: ProcedureState,
412 procedure: BoxedProcedure,
413 ) {
414 if self.manager_ctx.contains_procedure(procedure_id) {
415 return;
417 }
418
419 let step = 0;
420
421 let meta = Arc::new(ProcedureMeta::new(
422 procedure_id,
423 procedure_state,
424 Some(self.meta.id),
425 procedure.lock_key(),
426 procedure.poison_keys(),
427 procedure.type_name(),
428 ));
429 let runner = Runner {
430 meta: meta.clone(),
431 procedure,
432 manager_ctx: self.manager_ctx.clone(),
433 step,
434 exponential_builder: self.exponential_builder,
435 store: self.store.clone(),
436 rolling_back: false,
437 };
438
439 assert!(
443 self.manager_ctx.try_insert_procedure(meta),
444 "Procedure {}-{} submit an existing procedure {}-{}",
445 self.procedure.type_name(),
446 self.meta.id,
447 runner.procedure.type_name(),
448 procedure_id,
449 );
450
451 self.meta.push_child(procedure_id);
453
454 let _handle = common_runtime::spawn_global(async move {
455 runner.run().await
457 });
458 }
459
460 async fn wait_on_err(&mut self, d: Duration, i: u64) {
462 info!(
463 "Procedure {}-{} retry for the {} times after {} millis",
464 self.procedure.type_name(),
465 self.meta.id,
466 i,
467 d.as_millis(),
468 );
469 time::sleep(d).await;
470 }
471
472 async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
473 let has_child = !subprocedures.is_empty();
474 for subprocedure in subprocedures {
475 info!(
476 "Procedure {}-{} submit subprocedure {}-{}",
477 self.procedure.type_name(),
478 self.meta.id,
479 subprocedure.procedure.type_name(),
480 subprocedure.id,
481 );
482
483 self.submit_subprocedure(
484 subprocedure.id,
485 ProcedureState::Running,
486 subprocedure.procedure,
487 );
488 }
489
490 info!(
491 "Procedure {}-{} is waiting for subprocedures",
492 self.procedure.type_name(),
493 self.meta.id,
494 );
495
496 if has_child {
498 self.meta.child_notify.notified().await;
499
500 info!(
501 "Procedure {}-{} is waked up",
502 self.procedure.type_name(),
503 self.meta.id,
504 );
505 }
506 }
507
508 async fn persist_procedure(&mut self) -> Result<()> {
509 let type_name = self.procedure.type_name().to_string();
510 let data = self.procedure.dump()?;
511
512 self.store
513 .store_procedure(
514 self.meta.id,
515 self.step,
516 type_name,
517 data,
518 self.meta.parent_id,
519 )
520 .await
521 .map_err(|e| {
522 error!(
523 e; "Failed to persist procedure {}-{}",
524 self.procedure.type_name(),
525 self.meta.id
526 );
527 e
528 })?;
529 self.step += 1;
530 Ok(())
531 }
532
533 async fn commit_procedure(&mut self) -> Result<()> {
534 self.store
535 .commit_procedure(self.meta.id, self.step)
536 .await
537 .map_err(|e| {
538 error!(
539 e; "Failed to commit procedure {}-{}",
540 self.procedure.type_name(),
541 self.meta.id
542 );
543 e
544 })?;
545 self.step += 1;
546 Ok(())
547 }
548
549 async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
550 let type_name = self.procedure.type_name().to_string();
552 let data = self.procedure.dump()?;
553 let message = ProcedureMessage {
554 type_name,
555 data,
556 parent_id: self.meta.parent_id,
557 step: self.step,
558 error: Some(error),
559 };
560 self.store
561 .rollback_procedure(self.meta.id, message)
562 .await
563 .map_err(|e| {
564 error!(
565 e; "Failed to write rollback key for procedure {}-{}",
566 self.procedure.type_name(),
567 self.meta.id
568 );
569 e
570 })?;
571 self.step += 1;
572 Ok(())
573 }
574
575 fn done(&self, output: Option<Output>) {
576 info!(
578 "Procedure {}-{} done",
579 self.procedure.type_name(),
580 self.meta.id,
581 );
582
583 self.meta.set_state(ProcedureState::Done { output });
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use std::assert_matches::assert_matches;
591 use std::sync::atomic::{AtomicU64, Ordering};
592 use std::sync::Arc;
593
594 use async_trait::async_trait;
595 use common_error::ext::{ErrorExt, PlainError};
596 use common_error::mock::MockError;
597 use common_error::status_code::StatusCode;
598 use common_test_util::temp_dir::create_temp_dir;
599 use futures::future::join_all;
600 use futures_util::future::BoxFuture;
601 use futures_util::FutureExt;
602 use object_store::{EntryMode, ObjectStore};
603 use tokio::sync::mpsc;
604
605 use super::*;
606 use crate::local::{test_util, DynamicKeyLockGuard};
607 use crate::procedure::PoisonKeys;
608 use crate::store::proc_path;
609 use crate::test_util::InMemoryPoisonStore;
610 use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
611
612 const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
613
614 fn new_runner(
615 meta: ProcedureMetaRef,
616 procedure: BoxedProcedure,
617 store: Arc<ProcedureStore>,
618 ) -> Runner {
619 Runner {
620 meta,
621 procedure,
622 manager_ctx: Arc::new(ManagerContext::new(
623 Arc::new(InMemoryPoisonStore::default()),
624 )),
625 step: 0,
626 exponential_builder: ExponentialBuilder::default(),
627 store,
628 rolling_back: false,
629 }
630 }
631
632 async fn check_files(
633 object_store: &ObjectStore,
634 procedure_store: &ProcedureStore,
635 procedure_id: ProcedureId,
636 files: &[&str],
637 ) {
638 let dir = proc_path!(procedure_store, "{procedure_id}/");
639 let lister = object_store.list(&dir).await.unwrap();
640 let mut files_in_dir: Vec<_> = lister
641 .into_iter()
642 .filter(|x| x.metadata().mode() == EntryMode::FILE)
643 .map(|de| de.name().to_string())
644 .collect();
645 files_in_dir.sort_unstable();
646 assert_eq!(files, files_in_dir);
647 }
648
649 fn context_with_provider(
650 procedure_id: ProcedureId,
651 provider: Arc<dyn ContextProvider>,
652 ) -> Context {
653 Context {
654 procedure_id,
655 provider,
656 }
657 }
658
659 fn context_without_provider(procedure_id: ProcedureId) -> Context {
660 struct MockProvider;
661
662 #[async_trait]
663 impl ContextProvider for MockProvider {
664 async fn procedure_state(
665 &self,
666 _procedure_id: ProcedureId,
667 ) -> Result<Option<ProcedureState>> {
668 unimplemented!()
669 }
670
671 async fn try_put_poison(
672 &self,
673 _key: &PoisonKey,
674 _procedure_id: ProcedureId,
675 ) -> Result<()> {
676 unimplemented!()
677 }
678
679 async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
680 unimplemented!()
681 }
682 }
683
684 Context {
685 procedure_id,
686 provider: Arc::new(MockProvider),
687 }
688 }
689
690 type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
691
692 struct ProcedureAdapter<F> {
693 data: String,
694 lock_key: LockKey,
695 poison_keys: PoisonKeys,
696 exec_fn: F,
697 rollback_fn: Option<RollbackFn>,
698 }
699
700 impl<F> ProcedureAdapter<F> {
701 fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
702 let mut meta = test_util::procedure_meta_for_test();
703 meta.id = ProcedureId::parse_str(uuid).unwrap();
704 meta.lock_key = self.lock_key.clone();
705 meta.poison_keys = self.poison_keys.clone();
706
707 Arc::new(meta)
708 }
709 }
710
711 #[async_trait]
712 impl<F> Procedure for ProcedureAdapter<F>
713 where
714 F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
715 {
716 fn type_name(&self) -> &str {
717 "ProcedureAdapter"
718 }
719
720 async fn execute(&mut self, ctx: &Context) -> Result<Status> {
721 let f = (self.exec_fn)(ctx.clone());
722 f.await
723 }
724
725 async fn rollback(&mut self, ctx: &Context) -> Result<()> {
726 if let Some(f) = &mut self.rollback_fn {
727 return (f)(ctx.clone()).await;
728 }
729 Ok(())
730 }
731
732 fn rollback_supported(&self) -> bool {
733 self.rollback_fn.is_some()
734 }
735
736 fn dump(&self) -> Result<String> {
737 Ok(self.data.clone())
738 }
739
740 fn lock_key(&self) -> LockKey {
741 self.lock_key.clone()
742 }
743
744 fn poison_keys(&self) -> PoisonKeys {
745 self.poison_keys.clone()
746 }
747 }
748
749 async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
750 let mut times = 0;
751 let exec_fn = move |_| {
752 times += 1;
753 async move {
754 if times == 1 {
755 Ok(Status::executing(persist))
756 } else {
757 Ok(Status::done())
758 }
759 }
760 .boxed()
761 };
762 let normal = ProcedureAdapter {
763 data: "normal".to_string(),
764 lock_key: LockKey::single_exclusive("catalog.schema.table"),
765 poison_keys: PoisonKeys::default(),
766 exec_fn,
767 rollback_fn: None,
768 };
769
770 let dir = create_temp_dir("normal");
771 let meta = normal.new_meta(ROOT_ID);
772 let ctx = context_without_provider(meta.id);
773 let object_store = test_util::new_object_store(&dir);
774 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
775 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
776 runner.manager_ctx.start();
777
778 runner.execute_once(&ctx).await;
779 let state = runner.meta.state();
780 assert!(state.is_running(), "{state:?}");
781 check_files(
782 &object_store,
783 &procedure_store,
784 ctx.procedure_id,
785 first_files,
786 )
787 .await;
788
789 runner.execute_once(&ctx).await;
790 let state = runner.meta.state();
791 assert!(state.is_done(), "{state:?}");
792 check_files(
793 &object_store,
794 &procedure_store,
795 ctx.procedure_id,
796 second_files,
797 )
798 .await;
799 }
800
801 #[tokio::test]
802 async fn test_execute_once_normal() {
803 execute_once_normal(
804 true,
805 &["0000000000.step"],
806 &["0000000000.step", "0000000001.commit"],
807 )
808 .await;
809 }
810
811 #[tokio::test]
812 async fn test_execute_once_normal_skip_persist() {
813 execute_once_normal(false, &[], &["0000000000.commit"]).await;
814 }
815
816 #[tokio::test]
817 async fn test_on_suspend_empty() {
818 let exec_fn = move |_| {
819 async move {
820 Ok(Status::Suspended {
821 subprocedures: Vec::new(),
822 persist: false,
823 })
824 }
825 .boxed()
826 };
827 let suspend = ProcedureAdapter {
828 data: "suspend".to_string(),
829 lock_key: LockKey::single_exclusive("catalog.schema.table"),
830 poison_keys: PoisonKeys::default(),
831 exec_fn,
832 rollback_fn: None,
833 };
834
835 let dir = create_temp_dir("suspend");
836 let meta = suspend.new_meta(ROOT_ID);
837 let ctx = context_without_provider(meta.id);
838 let object_store = test_util::new_object_store(&dir);
839 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
840 let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
841 runner.manager_ctx.start();
842
843 runner.execute_once(&ctx).await;
844 let state = runner.meta.state();
845 assert!(state.is_running(), "{state:?}");
846 }
847
848 fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
849 let mut times = 0;
850 let exec_fn = move |_| {
851 times += 1;
852 async move {
853 if times == 1 {
854 time::sleep(Duration::from_millis(200)).await;
855 Ok(Status::executing(true))
856 } else {
857 Ok(Status::done())
858 }
859 }
860 .boxed()
861 };
862 let child = ProcedureAdapter {
863 data: "child".to_string(),
864 lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
865 poison_keys: PoisonKeys::default(),
866 exec_fn,
867 rollback_fn: None,
868 };
869
870 ProcedureWithId {
871 id: procedure_id,
872 procedure: Box::new(child),
873 }
874 }
875
876 #[tokio::test]
877 async fn test_on_suspend_by_subprocedures() {
878 let mut times = 0;
879 let children_ids = [ProcedureId::random(), ProcedureId::random()];
880 let keys = [
881 &[
882 "catalog.schema.table.region-0",
883 "catalog.schema.table.region-1",
884 ],
885 &[
886 "catalog.schema.table.region-2",
887 "catalog.schema.table.region-3",
888 ],
889 ];
890
891 let exec_fn = move |ctx: Context| {
892 times += 1;
893 async move {
894 if times == 1 {
895 Ok(Status::Suspended {
897 subprocedures: children_ids
898 .into_iter()
899 .zip(keys)
900 .map(|(id, key_slice)| new_child_procedure(id, key_slice))
901 .collect(),
902 persist: true,
903 })
904 } else {
905 let mut all_child_done = true;
907 for id in children_ids {
908 let is_not_done = ctx
909 .provider
910 .procedure_state(id)
911 .await
912 .unwrap()
913 .map(|s| !s.is_done())
914 .unwrap_or(true);
915 if is_not_done {
916 all_child_done = false;
917 }
918 }
919 if all_child_done {
920 Ok(Status::done())
921 } else {
922 Ok(Status::Suspended {
924 subprocedures: Vec::new(),
925 persist: false,
926 })
927 }
928 }
929 }
930 .boxed()
931 };
932 let parent = ProcedureAdapter {
933 data: "parent".to_string(),
934 lock_key: LockKey::single_exclusive("catalog.schema.table"),
935 poison_keys: PoisonKeys::default(),
936 exec_fn,
937 rollback_fn: None,
938 };
939
940 let dir = create_temp_dir("parent");
941 let meta = parent.new_meta(ROOT_ID);
942 let procedure_id = meta.id;
943
944 let object_store = test_util::new_object_store(&dir);
945 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
946 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
947 let poison_manager = Arc::new(InMemoryPoisonStore::default());
948 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
949 manager_ctx.start();
950 assert!(manager_ctx.try_insert_procedure(meta));
952 runner.manager_ctx = manager_ctx.clone();
954
955 runner.run().await;
956 assert!(manager_ctx.key_lock.is_empty());
957
958 for child_id in children_ids {
960 let state = manager_ctx.state(child_id).unwrap();
961 assert!(state.is_done(), "{state:?}");
962 }
963 let state = manager_ctx.state(procedure_id).unwrap();
964 assert!(state.is_done(), "{state:?}");
965 check_files(&object_store, &procedure_store, procedure_id, &[]).await;
967
968 tokio::time::sleep(Duration::from_millis(5)).await;
969 manager_ctx.remove_outdated_meta(Duration::from_millis(1));
971 assert!(manager_ctx.state(procedure_id).is_none());
972 assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
973 for child_id in children_ids {
974 assert!(manager_ctx.state(child_id).is_none());
975 }
976 }
977
978 #[tokio::test]
979 async fn test_running_is_stopped() {
980 let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
981 let normal = ProcedureAdapter {
982 data: "normal".to_string(),
983 lock_key: LockKey::single_exclusive("catalog.schema.table"),
984 poison_keys: PoisonKeys::default(),
985 exec_fn,
986 rollback_fn: None,
987 };
988
989 let dir = create_temp_dir("test_running_is_stopped");
990 let meta = normal.new_meta(ROOT_ID);
991 let ctx = context_without_provider(meta.id);
992 let object_store = test_util::new_object_store(&dir);
993 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
994 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
995 runner.manager_ctx.start();
996
997 runner.execute_once(&ctx).await;
998 let state = runner.meta.state();
999 assert!(state.is_running(), "{state:?}");
1000 check_files(
1001 &object_store,
1002 &procedure_store,
1003 ctx.procedure_id,
1004 &["0000000000.step"],
1005 )
1006 .await;
1007
1008 runner.manager_ctx.stop();
1009 runner.execute_once(&ctx).await;
1010 let state = runner.meta.state();
1011 assert!(state.is_failed(), "{state:?}");
1012 check_files(
1014 &object_store,
1015 &procedure_store,
1016 ctx.procedure_id,
1017 &["0000000000.step"],
1018 )
1019 .await;
1020 }
1021
1022 #[tokio::test]
1023 async fn test_running_is_stopped_on_error() {
1024 let exec_fn =
1025 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1026 let normal = ProcedureAdapter {
1027 data: "fail".to_string(),
1028 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1029 poison_keys: PoisonKeys::default(),
1030 exec_fn,
1031 rollback_fn: None,
1032 };
1033
1034 let dir = create_temp_dir("test_running_is_stopped_on_error");
1035 let meta = normal.new_meta(ROOT_ID);
1036 let ctx = context_without_provider(meta.id);
1037 let object_store = test_util::new_object_store(&dir);
1038 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1039 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1040 runner.manager_ctx.stop();
1041
1042 runner.execute_once(&ctx).await;
1043 let state = runner.meta.state();
1044 assert!(state.is_failed(), "{state:?}");
1045 check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1047 }
1048
1049 #[tokio::test]
1050 async fn test_execute_on_error() {
1051 let exec_fn =
1052 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1053 let fail = ProcedureAdapter {
1054 data: "fail".to_string(),
1055 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1056 poison_keys: PoisonKeys::default(),
1057 exec_fn,
1058 rollback_fn: None,
1059 };
1060
1061 let dir = create_temp_dir("fail");
1062 let meta = fail.new_meta(ROOT_ID);
1063 let ctx = context_without_provider(meta.id);
1064 let object_store = test_util::new_object_store(&dir);
1065 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1066 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1067 runner.manager_ctx.start();
1068
1069 runner.execute_once(&ctx).await;
1070 let state = runner.meta.state();
1071 assert!(state.is_prepare_rollback(), "{state:?}");
1072
1073 runner.execute_once(&ctx).await;
1074 let state = runner.meta.state();
1075 assert!(state.is_failed(), "{state:?}");
1076 check_files(
1077 &object_store,
1078 &procedure_store,
1079 ctx.procedure_id,
1080 &["0000000000.rollback"],
1081 )
1082 .await;
1083 }
1084
1085 #[tokio::test]
1086 async fn test_execute_with_rollback_on_error() {
1087 let exec_fn =
1088 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1089 let rollback_fn = move |_| async move { Ok(()) }.boxed();
1090 let fail = ProcedureAdapter {
1091 data: "fail".to_string(),
1092 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1093 poison_keys: PoisonKeys::default(),
1094 exec_fn,
1095 rollback_fn: Some(Box::new(rollback_fn)),
1096 };
1097
1098 let dir = create_temp_dir("fail");
1099 let meta = fail.new_meta(ROOT_ID);
1100 let ctx = context_without_provider(meta.id);
1101 let object_store = test_util::new_object_store(&dir);
1102 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1103 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1104 runner.manager_ctx.start();
1105
1106 runner.execute_once(&ctx).await;
1107 let state = runner.meta.state();
1108 assert!(state.is_prepare_rollback(), "{state:?}");
1109
1110 runner.execute_once(&ctx).await;
1111 let state = runner.meta.state();
1112 assert!(state.is_rolling_back(), "{state:?}");
1113
1114 runner.execute_once(&ctx).await;
1115 let state = runner.meta.state();
1116 assert!(state.is_failed(), "{state:?}");
1117 check_files(
1118 &object_store,
1119 &procedure_store,
1120 ctx.procedure_id,
1121 &["0000000000.rollback"],
1122 )
1123 .await;
1124 }
1125
1126 #[tokio::test]
1127 async fn test_execute_on_retry_later_error() {
1128 let mut times = 0;
1129
1130 let exec_fn = move |_| {
1131 times += 1;
1132 async move {
1133 if times == 1 {
1134 Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1135 } else {
1136 Ok(Status::done())
1137 }
1138 }
1139 .boxed()
1140 };
1141
1142 let retry_later = ProcedureAdapter {
1143 data: "retry_later".to_string(),
1144 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1145 poison_keys: PoisonKeys::default(),
1146 exec_fn,
1147 rollback_fn: None,
1148 };
1149
1150 let dir = create_temp_dir("retry_later");
1151 let meta = retry_later.new_meta(ROOT_ID);
1152 let ctx = context_without_provider(meta.id);
1153 let object_store = test_util::new_object_store(&dir);
1154 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1155 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1156 runner.manager_ctx.start();
1157 runner.execute_once(&ctx).await;
1158 let state = runner.meta.state();
1159 assert!(state.is_retrying(), "{state:?}");
1160
1161 runner.execute_once(&ctx).await;
1162 let state = runner.meta.state();
1163 assert!(state.is_done(), "{state:?}");
1164 assert!(meta.state().is_done());
1165 check_files(
1166 &object_store,
1167 &procedure_store,
1168 ctx.procedure_id,
1169 &["0000000000.commit"],
1170 )
1171 .await;
1172 }
1173
1174 #[tokio::test]
1175 async fn test_execute_exceed_max_retry_later() {
1176 let exec_fn =
1177 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1178
1179 let exceed_max_retry_later = ProcedureAdapter {
1180 data: "exceed_max_retry_later".to_string(),
1181 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1182 poison_keys: PoisonKeys::default(),
1183 exec_fn,
1184 rollback_fn: None,
1185 };
1186
1187 let dir = create_temp_dir("exceed_max_retry_later");
1188 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1189 let object_store = test_util::new_object_store(&dir);
1190 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1191 let mut runner = new_runner(
1192 meta.clone(),
1193 Box::new(exceed_max_retry_later),
1194 procedure_store,
1195 );
1196 runner.manager_ctx.start();
1197
1198 runner.exponential_builder = ExponentialBuilder::default()
1199 .with_min_delay(Duration::from_millis(1))
1200 .with_max_times(3);
1201
1202 runner.execute_procedure_in_loop().await;
1204 let err = meta.state().error().unwrap().to_string();
1205 assert!(err.contains("Procedure retry exceeded max times"));
1206 }
1207
1208 #[tokio::test]
1209 async fn test_rollback_exceed_max_retry_later() {
1210 let exec_fn =
1211 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1212 let rollback_fn = move |_| {
1213 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1214 };
1215 let exceed_max_retry_later = ProcedureAdapter {
1216 data: "exceed_max_rollback".to_string(),
1217 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1218 poison_keys: PoisonKeys::default(),
1219 exec_fn,
1220 rollback_fn: Some(Box::new(rollback_fn)),
1221 };
1222
1223 let dir = create_temp_dir("exceed_max_rollback");
1224 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1225 let object_store = test_util::new_object_store(&dir);
1226 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1227 let mut runner = new_runner(
1228 meta.clone(),
1229 Box::new(exceed_max_retry_later),
1230 procedure_store,
1231 );
1232 runner.manager_ctx.start();
1233 runner.exponential_builder = ExponentialBuilder::default()
1234 .with_min_delay(Duration::from_millis(1))
1235 .with_max_times(3);
1236
1237 runner.execute_procedure_in_loop().await;
1239 let err = meta.state().error().unwrap().to_string();
1240 assert!(err.contains("Procedure rollback exceeded max times"));
1241 }
1242
1243 #[tokio::test]
1244 async fn test_rollback_after_retry_fail() {
1245 let exec_fn = move |_| {
1246 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1247 };
1248
1249 let (tx, mut rx) = mpsc::channel(1);
1250 let rollback_fn = move |_| {
1251 let tx = tx.clone();
1252 async move {
1253 tx.send(()).await.unwrap();
1254 Ok(())
1255 }
1256 .boxed()
1257 };
1258 let retry_later = ProcedureAdapter {
1259 data: "rollback_after_retry_fail".to_string(),
1260 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1261 poison_keys: PoisonKeys::default(),
1262 exec_fn,
1263 rollback_fn: Some(Box::new(rollback_fn)),
1264 };
1265
1266 let dir = create_temp_dir("retry_later");
1267 let meta = retry_later.new_meta(ROOT_ID);
1268 let ctx = context_without_provider(meta.id);
1269 let object_store = test_util::new_object_store(&dir);
1270 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1271 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1272 runner.manager_ctx.start();
1273 runner.exponential_builder = ExponentialBuilder::default()
1274 .with_min_delay(Duration::from_millis(1))
1275 .with_max_times(3);
1276 runner.execute_procedure_in_loop().await;
1278 rx.recv().await.unwrap();
1279 assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1280 check_files(
1281 &object_store,
1282 &procedure_store,
1283 ctx.procedure_id,
1284 &["0000000000.rollback"],
1285 )
1286 .await;
1287 }
1288
1289 #[tokio::test]
1290 async fn test_child_error() {
1291 let mut times = 0;
1292 let child_id = ProcedureId::random();
1293
1294 let exec_fn = move |ctx: Context| {
1295 times += 1;
1296 async move {
1297 if times == 1 {
1298 let exec_fn = |_| {
1300 async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1301 .boxed()
1302 };
1303 let fail = ProcedureAdapter {
1304 data: "fail".to_string(),
1305 lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1306 poison_keys: PoisonKeys::default(),
1307 exec_fn,
1308 rollback_fn: None,
1309 };
1310
1311 Ok(Status::Suspended {
1312 subprocedures: vec![ProcedureWithId {
1313 id: child_id,
1314 procedure: Box::new(fail),
1315 }],
1316 persist: true,
1317 })
1318 } else {
1319 let state = ctx.provider.procedure_state(child_id).await.unwrap();
1321 let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1322 if is_failed {
1323 Err(Error::from_error_ext(PlainError::new(
1325 "subprocedure failed".to_string(),
1326 StatusCode::Unexpected,
1327 )))
1328 } else {
1329 Ok(Status::Suspended {
1331 subprocedures: Vec::new(),
1332 persist: false,
1333 })
1334 }
1335 }
1336 }
1337 .boxed()
1338 };
1339 let parent = ProcedureAdapter {
1340 data: "parent".to_string(),
1341 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1342 poison_keys: PoisonKeys::default(),
1343 exec_fn,
1344 rollback_fn: None,
1345 };
1346
1347 let dir = create_temp_dir("child_err");
1348 let meta = parent.new_meta(ROOT_ID);
1349
1350 let object_store = test_util::new_object_store(&dir);
1351 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1352 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1353 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1354 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1355 manager_ctx.start();
1356 assert!(manager_ctx.try_insert_procedure(meta.clone()));
1358 runner.manager_ctx = manager_ctx.clone();
1360
1361 runner.run().await;
1363 assert!(manager_ctx.key_lock.is_empty());
1364 let err = meta.state().error().unwrap().output_msg();
1365 assert!(err.contains("subprocedure failed"), "{err}");
1366 }
1367
1368 #[tokio::test]
1369 async fn test_execute_with_clean_poisons() {
1370 common_telemetry::init_default_ut_logging();
1371 let mut times = 0;
1372 let poison_key = PoisonKey::new("table/1024");
1373 let moved_poison_key = poison_key.clone();
1374 let exec_fn = move |ctx: Context| {
1375 times += 1;
1376 let poison_key = moved_poison_key.clone();
1377 async move {
1378 if times == 1 {
1379 ctx.provider
1381 .try_put_poison(&poison_key, ctx.procedure_id)
1382 .await
1383 .unwrap();
1384
1385 Ok(Status::executing(true))
1386 } else {
1387 Ok(Status::executing_with_clean_poisons(true))
1388 }
1389 }
1390 .boxed()
1391 };
1392 let poison = ProcedureAdapter {
1393 data: "poison".to_string(),
1394 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1395 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1396 exec_fn,
1397 rollback_fn: None,
1398 };
1399
1400 let dir = create_temp_dir("clean_poisons");
1401 let meta = poison.new_meta(ROOT_ID);
1402
1403 let object_store = test_util::new_object_store(&dir);
1404 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1405 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1406
1407 let ctx = context_with_provider(
1409 meta.id,
1410 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1411 );
1412 runner
1414 .manager_ctx
1415 .procedures
1416 .write()
1417 .unwrap()
1418 .insert(meta.id, runner.meta.clone());
1419
1420 runner.manager_ctx.start();
1421 runner.execute_once(&ctx).await;
1422 let state = runner.meta.state();
1423 assert!(state.is_running(), "{state:?}");
1424
1425 let procedure_id = runner
1426 .manager_ctx
1427 .poison_manager
1428 .get_poison(&poison_key.to_string())
1429 .await
1430 .unwrap();
1431 assert!(procedure_id.is_some());
1433
1434 runner.execute_once(&ctx).await;
1435 let state = runner.meta.state();
1436 assert!(state.is_running(), "{state:?}");
1437
1438 let procedure_id = runner
1439 .manager_ctx
1440 .poison_manager
1441 .get_poison(&poison_key.to_string())
1442 .await
1443 .unwrap();
1444 assert!(procedure_id.is_none());
1446 }
1447
1448 #[tokio::test]
1449 async fn test_execute_error_with_clean_poisons() {
1450 common_telemetry::init_default_ut_logging();
1451 let mut times = 0;
1452 let poison_key = PoisonKey::new("table/1024");
1453 let moved_poison_key = poison_key.clone();
1454 let exec_fn = move |ctx: Context| {
1455 times += 1;
1456 let poison_key = moved_poison_key.clone();
1457 async move {
1458 if times == 1 {
1459 ctx.provider
1461 .try_put_poison(&poison_key, ctx.procedure_id)
1462 .await
1463 .unwrap();
1464
1465 Ok(Status::executing(true))
1466 } else {
1467 Err(Error::external_and_clean_poisons(MockError::new(
1468 StatusCode::Unexpected,
1469 )))
1470 }
1471 }
1472 .boxed()
1473 };
1474 let poison = ProcedureAdapter {
1475 data: "poison".to_string(),
1476 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1477 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1478 exec_fn,
1479 rollback_fn: None,
1480 };
1481
1482 let dir = create_temp_dir("error_with_clean_poisons");
1483 let meta = poison.new_meta(ROOT_ID);
1484
1485 let object_store = test_util::new_object_store(&dir);
1486 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1487 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1488
1489 let ctx = context_with_provider(
1491 meta.id,
1492 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1493 );
1494 runner
1496 .manager_ctx
1497 .procedures
1498 .write()
1499 .unwrap()
1500 .insert(meta.id, runner.meta.clone());
1501
1502 runner.manager_ctx.start();
1503 runner.execute_once(&ctx).await;
1504 let state = runner.meta.state();
1505 assert!(state.is_running(), "{state:?}");
1506
1507 let procedure_id = runner
1508 .manager_ctx
1509 .poison_manager
1510 .get_poison(&poison_key.to_string())
1511 .await
1512 .unwrap();
1513 assert!(procedure_id.is_some());
1515
1516 runner.execute_once(&ctx).await;
1517 let state = runner.meta.state();
1518 assert!(state.is_prepare_rollback(), "{state:?}");
1519
1520 let procedure_id = runner
1521 .manager_ctx
1522 .poison_manager
1523 .get_poison(&poison_key.to_string())
1524 .await
1525 .unwrap();
1526 assert!(procedure_id.is_none());
1528 }
1529
1530 #[tokio::test]
1531 async fn test_execute_failed_after_set_poison() {
1532 let mut times = 0;
1533 let poison_key = PoisonKey::new("table/1024");
1534 let moved_poison_key = poison_key.clone();
1535 let exec_fn = move |ctx: Context| {
1536 times += 1;
1537 let poison_key = moved_poison_key.clone();
1538 async move {
1539 if times == 1 {
1540 Ok(Status::executing(true))
1541 } else {
1542 ctx.provider
1544 .try_put_poison(&poison_key, ctx.procedure_id)
1545 .await
1546 .unwrap();
1547 Err(Error::external(MockError::new(StatusCode::Unexpected)))
1548 }
1549 }
1550 .boxed()
1551 };
1552 let poison = ProcedureAdapter {
1553 data: "poison".to_string(),
1554 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1555 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1556 exec_fn,
1557 rollback_fn: None,
1558 };
1559
1560 let dir = create_temp_dir("poison");
1561 let meta = poison.new_meta(ROOT_ID);
1562
1563 let object_store = test_util::new_object_store(&dir);
1564 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1565 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1566
1567 let ctx = context_with_provider(
1569 meta.id,
1570 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1571 );
1572 runner
1574 .manager_ctx
1575 .procedures
1576 .write()
1577 .unwrap()
1578 .insert(meta.id, runner.meta.clone());
1579
1580 runner.manager_ctx.start();
1581 runner.execute_once(&ctx).await;
1582 let state = runner.meta.state();
1583 assert!(state.is_running(), "{state:?}");
1584
1585 runner.execute_once(&ctx).await;
1586 let state = runner.meta.state();
1587 assert!(state.is_prepare_rollback(), "{state:?}");
1588 assert!(meta.state().is_prepare_rollback());
1589
1590 runner.execute_once(&ctx).await;
1591 let state = runner.meta.state();
1592 assert!(state.is_failed(), "{state:?}");
1593 assert!(meta.state().is_failed());
1594
1595 let procedure_id = runner
1597 .manager_ctx
1598 .poison_manager
1599 .get_poison(&poison_key.to_string())
1600 .await
1601 .unwrap()
1602 .unwrap();
1603
1604 assert_eq!(&procedure_id.to_string(), ROOT_ID);
1606 }
1607
1608 #[tokio::test]
1609 async fn test_execute_exceed_max_retry_after_set_poison() {
1610 common_telemetry::init_default_ut_logging();
1611 let mut times = 0;
1612 let poison_key = PoisonKey::new("table/1024");
1613 let moved_poison_key = poison_key.clone();
1614 let exec_fn = move |ctx: Context| {
1615 times += 1;
1616 let poison_key = moved_poison_key.clone();
1617 async move {
1618 if times == 1 {
1619 Ok(Status::executing(true))
1620 } else {
1621 ctx.provider
1623 .try_put_poison(&poison_key, ctx.procedure_id)
1624 .await
1625 .unwrap();
1626 Err(Error::retry_later_and_clean_poisons(MockError::new(
1627 StatusCode::Unexpected,
1628 )))
1629 }
1630 }
1631 .boxed()
1632 };
1633 let poison = ProcedureAdapter {
1634 data: "poison".to_string(),
1635 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1636 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1637 exec_fn,
1638 rollback_fn: None,
1639 };
1640
1641 let dir = create_temp_dir("exceed_max_after_set_poison");
1642 let meta = poison.new_meta(ROOT_ID);
1643 let object_store = test_util::new_object_store(&dir);
1644 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1645 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
1646 runner.manager_ctx.start();
1647 runner.exponential_builder = ExponentialBuilder::default()
1648 .with_min_delay(Duration::from_millis(1))
1649 .with_max_times(3);
1650 let ctx = context_with_provider(
1652 meta.id,
1653 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1654 );
1655 runner
1657 .manager_ctx
1658 .procedures
1659 .write()
1660 .unwrap()
1661 .insert(meta.id, runner.meta.clone());
1662 runner.execute_once_with_retry(&ctx).await;
1664 let err = meta.state().error().unwrap().clone();
1665 assert_matches!(&*err, Error::RetryTimesExceeded { .. });
1666
1667 let procedure_id = runner
1669 .manager_ctx
1670 .poison_manager
1671 .get_poison(&poison_key.to_string())
1672 .await
1673 .unwrap();
1674 assert_eq!(procedure_id, None);
1675 }
1676
1677 #[tokio::test]
1678 async fn test_execute_poisoned() {
1679 let mut times = 0;
1680 let poison_key = PoisonKey::new("table/1024");
1681 let moved_poison_key = poison_key.clone();
1682 let exec_fn = move |ctx: Context| {
1683 times += 1;
1684 let poison_key = moved_poison_key.clone();
1685 async move {
1686 if times == 1 {
1687 Ok(Status::executing(true))
1688 } else {
1689 ctx.provider
1691 .try_put_poison(&poison_key, ctx.procedure_id)
1692 .await
1693 .unwrap();
1694 Ok(Status::Poisoned {
1695 keys: PoisonKeys::new(vec![poison_key.clone()]),
1696 error: Error::external(MockError::new(StatusCode::Unexpected)),
1697 })
1698 }
1699 }
1700 .boxed()
1701 };
1702 let poison = ProcedureAdapter {
1703 data: "poison".to_string(),
1704 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1705 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1706 exec_fn,
1707 rollback_fn: None,
1708 };
1709
1710 let dir = create_temp_dir("poison");
1711 let meta = poison.new_meta(ROOT_ID);
1712
1713 let object_store = test_util::new_object_store(&dir);
1714 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1715 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1716
1717 let ctx = context_with_provider(
1719 meta.id,
1720 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1721 );
1722 runner
1724 .manager_ctx
1725 .procedures
1726 .write()
1727 .unwrap()
1728 .insert(meta.id, runner.meta.clone());
1729
1730 runner.manager_ctx.start();
1731 runner.execute_once(&ctx).await;
1732 let state = runner.meta.state();
1733 assert!(state.is_running(), "{state:?}");
1734
1735 runner.execute_once(&ctx).await;
1736 let state = runner.meta.state();
1737 assert!(state.is_poisoned(), "{state:?}");
1738 assert!(meta.state().is_poisoned());
1739 check_files(
1740 &object_store,
1741 &procedure_store,
1742 ctx.procedure_id,
1743 &["0000000000.step"],
1744 )
1745 .await;
1746
1747 let procedure_id = runner
1749 .manager_ctx
1750 .poison_manager
1751 .get_poison(&poison_key.to_string())
1752 .await
1753 .unwrap()
1754 .unwrap();
1755
1756 assert_eq!(procedure_id, ROOT_ID);
1758 }
1759
1760 fn test_procedure_with_dynamic_lock(
1761 shared_atomic_value: Arc<AtomicU64>,
1762 id: u64,
1763 ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1764 let exec_fn = move |ctx: Context| {
1765 let moved_shared_atomic_value = shared_atomic_value.clone();
1766 let moved_ctx = ctx.clone();
1767 async move {
1768 debug!("Acquiring write lock, id: {}", id);
1769 let key = StringKey::Exclusive("test_lock".to_string());
1770 let guard = moved_ctx.provider.acquire_lock(&key).await;
1771 debug!("Acquired write lock, id: {}", id);
1772 let millis = rand::rng().random_range(10..=50);
1773 tokio::time::sleep(Duration::from_millis(millis)).await;
1774 let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1775 moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1776 debug!("Dropping write lock, id: {}", id);
1777 drop(guard);
1778
1779 Ok(Status::done())
1780 }
1781 .boxed()
1782 };
1783
1784 let adapter = ProcedureAdapter {
1785 data: "dynamic_lock".to_string(),
1786 lock_key: LockKey::new_exclusive([]),
1787 poison_keys: PoisonKeys::new([]),
1788 exec_fn,
1789 rollback_fn: None,
1790 };
1791 let meta = adapter.new_meta(ROOT_ID);
1792
1793 (Box::new(adapter), meta)
1794 }
1795
1796 #[tokio::test(flavor = "multi_thread")]
1797 async fn test_execute_with_dynamic_lock() {
1798 common_telemetry::init_default_ut_logging();
1799 let shared_atomic_value = Arc::new(AtomicU64::new(0));
1800 let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1801 let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1802
1803 let dir = create_temp_dir("dynamic_lock");
1804 let object_store = test_util::new_object_store(&dir);
1805 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1806 let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
1807 let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
1808 let ctx1 = context_with_provider(
1809 meta1.id,
1810 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1811 );
1812 let ctx2 = context_with_provider(
1813 meta2.id,
1814 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1816 );
1817 let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
1818 join_all(tasks).await;
1819 assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
1820 }
1821}