1use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_telemetry::{debug, error, info};
21use rand::Rng;
22use snafu::ResultExt;
23use tokio::time;
24
25use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
26use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
27use crate::procedure::{Output, StringKey};
28use crate::rwlock::OwnedKeyRwLockGuard;
29use crate::store::{ProcedureMessage, ProcedureStore};
30use crate::{
31 BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
32};
33
34struct ProcedureGuard {
36 meta: ProcedureMetaRef,
37 manager_ctx: Arc<ManagerContext>,
38 key_guards: Vec<OwnedKeyRwLockGuard>,
39 finish: bool,
40}
41
42impl ProcedureGuard {
43 fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
45 ProcedureGuard {
46 meta,
47 manager_ctx,
48 key_guards: vec![],
49 finish: false,
50 }
51 }
52
53 fn finish(mut self) {
55 self.finish = true;
56 }
57}
58
59impl Drop for ProcedureGuard {
60 fn drop(&mut self) {
61 if !self.finish {
62 error!("Procedure {} exits unexpectedly", self.meta.id);
63
64 let err = ProcedurePanicSnafu {
68 procedure_id: self.meta.id,
69 }
70 .build();
71 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
72 }
73
74 if let Some(parent_id) = self.meta.parent_id {
76 self.manager_ctx.notify_by_subprocedure(parent_id);
77 }
78
79 while !self.key_guards.is_empty() {
81 self.key_guards.pop();
82 }
83
84 self.manager_ctx
86 .key_lock
87 .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
88 }
89}
90
91pub(crate) struct Runner {
92 pub(crate) meta: ProcedureMetaRef,
93 pub(crate) procedure: BoxedProcedure,
94 pub(crate) manager_ctx: Arc<ManagerContext>,
95 pub(crate) step: u32,
96 pub(crate) exponential_builder: ExponentialBuilder,
97 pub(crate) store: Arc<ProcedureStore>,
98 pub(crate) rolling_back: bool,
99}
100
101impl Runner {
102 pub(crate) fn running(&self) -> bool {
104 self.manager_ctx.running()
105 }
106
107 pub(crate) async fn run(mut self) {
109 let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
111
112 info!(
113 "Runner {}-{} starts",
114 self.procedure.type_name(),
115 self.meta.id
116 );
117
118 for key in self.meta.lock_key.keys_to_lock() {
121 let key_guard = match key {
123 StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
124 StringKey::Exclusive(key) => {
125 self.manager_ctx.key_lock.write(key.clone()).await.into()
126 }
127 };
128
129 guard.key_guards.push(key_guard);
130 }
131
132 self.meta.set_start_time_ms();
135 self.execute_procedure_in_loop().await;
136 self.meta.set_end_time_ms();
137
138 guard.finish();
145
146 if self.meta.parent_id.is_none() {
148 let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
149 self.manager_ctx.on_procedures_finish(&procedure_ids);
151
152 if !self.running() {
154 return;
155 }
156
157 for id in procedure_ids {
158 if let Err(e) = self.store.delete_procedure(id).await {
159 error!(
160 e;
161 "Runner {}-{} failed to delete procedure {}",
162 self.procedure.type_name(),
163 self.meta.id,
164 id,
165 );
166 }
167 }
168 }
169
170 info!(
171 "Runner {}-{} exits",
172 self.procedure.type_name(),
173 self.meta.id
174 );
175 }
176
177 async fn execute_procedure_in_loop(&mut self) {
178 let ctx = Context {
179 procedure_id: self.meta.id,
180 provider: self.manager_ctx.clone(),
181 };
182
183 self.rolling_back = false;
184 self.execute_once_with_retry(&ctx).await;
185 }
186
187 async fn execute_once_with_retry(&mut self, ctx: &Context) {
188 let mut retry = self.exponential_builder.build();
189 let mut retry_times = 0;
190
191 let mut rollback = self.exponential_builder.build();
192 let mut rollback_times = 0;
193
194 loop {
195 if !self.running() {
197 self.meta.set_state(ProcedureState::failed(Arc::new(
198 error::ManagerNotStartSnafu {}.build(),
199 )));
200 return;
201 }
202 let state = self.meta.state();
203 match state {
204 ProcedureState::Running => {}
205 ProcedureState::Retrying { error } => {
206 retry_times += 1;
207 if let Some(d) = retry.next() {
208 let millis = d.as_millis() as u64;
209 let noise = rand::rng().random_range(0..(millis / 4) + 1);
211 let d = d.add(Duration::from_millis(noise));
212
213 self.wait_on_err(d, retry_times).await;
214 } else {
215 self.meta
216 .set_state(ProcedureState::prepare_rollback(Arc::new(
217 Error::RetryTimesExceeded {
218 source: error.clone(),
219 procedure_id: self.meta.id,
220 },
221 )));
222 }
223 }
224 ProcedureState::PrepareRollback { error }
225 | ProcedureState::RollingBack { error } => {
226 rollback_times += 1;
227 if let Some(d) = rollback.next() {
228 self.wait_on_err(d, rollback_times).await;
229 } else {
230 let err = Err::<(), Arc<Error>>(error)
231 .context(RollbackTimesExceededSnafu {
232 procedure_id: self.meta.id,
233 })
234 .unwrap_err();
235 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
236 return;
237 }
238 }
239 ProcedureState::Done { .. } => return,
240 ProcedureState::Failed { .. } => return,
241 ProcedureState::Poisoned { .. } => return,
242 }
243 self.execute_once(ctx).await;
244 }
245 }
246
247 async fn clean_poisons(&mut self) -> Result<()> {
248 let mut error = None;
249 for key in self.meta.poison_keys.iter() {
250 let key = key.to_string();
251 if let Err(e) = self
252 .manager_ctx
253 .poison_manager
254 .delete_poison(key, self.meta.id.to_string())
255 .await
256 {
257 error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
258 error = Some(e);
259 }
260 }
261
262 if let Some(e) = error {
264 return Err(e);
265 }
266 Ok(())
267 }
268
269 async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
270 if self.procedure.rollback_supported() {
271 if let Err(e) = self.procedure.rollback(ctx).await {
272 self.meta
273 .set_state(ProcedureState::rolling_back(Arc::new(e)));
274 return;
275 }
276 }
277 self.meta.set_state(ProcedureState::failed(err));
278 }
279
280 async fn prepare_rollback(&mut self, err: Arc<Error>) {
281 if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
282 self.meta
283 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
284 return;
285 }
286 if self.procedure.rollback_supported() {
287 self.meta.set_state(ProcedureState::rolling_back(err));
288 } else {
289 self.meta.set_state(ProcedureState::failed(err));
290 }
291 }
292
293 async fn execute_once(&mut self, ctx: &Context) {
294 match self.meta.state() {
295 ProcedureState::Running | ProcedureState::Retrying { .. } => {
296 match self.procedure.execute(ctx).await {
297 Ok(status) => {
298 debug!(
299 "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
300 self.procedure.type_name(),
301 self.meta.id,
302 status,
303 status.need_persist(),
304 );
305
306 if !self.running() {
308 self.meta.set_state(ProcedureState::failed(Arc::new(
309 error::ManagerNotStartSnafu {}.build(),
310 )));
311 return;
312 }
313
314 if status.need_clean_poisons() {
316 if let Err(e) = self.clean_poisons().await {
317 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
318 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
319 return;
320 }
321 }
322
323 if status.need_persist() {
324 if let Err(e) = self.persist_procedure().await {
325 error!(e; "Failed to persist procedure: {}", self.meta.id);
326 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
327 return;
328 }
329 }
330
331 match status {
332 Status::Executing { .. } => {}
333 Status::Suspended { subprocedures, .. } => {
334 self.on_suspended(subprocedures).await;
335 }
336 Status::Done { output } => {
337 if let Err(e) = self.commit_procedure().await {
338 error!(e; "Failed to commit procedure: {}", self.meta.id);
339 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
340 return;
341 }
342
343 self.done(output);
344 }
345 Status::Poisoned { error, keys } => {
346 error!(
347 error;
348 "Procedure {}-{} is poisoned, keys: {:?}",
349 self.procedure.type_name(),
350 self.meta.id,
351 keys,
352 );
353 self.meta
354 .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
355 }
356 }
357 }
358 Err(e) => {
359 error!(
360 e;
361 "Failed to execute procedure {}-{}, retry: {}",
362 self.procedure.type_name(),
363 self.meta.id,
364 e.is_retry_later(),
365 );
366
367 if !self.running() {
369 self.meta.set_state(ProcedureState::failed(Arc::new(
370 error::ManagerNotStartSnafu {}.build(),
371 )));
372 return;
373 }
374
375 if e.need_clean_poisons() {
376 if let Err(e) = self.clean_poisons().await {
377 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
378 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
379 return;
380 }
381 }
382
383 if e.is_retry_later() {
384 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
385 return;
386 }
387
388 self.meta
389 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
390 }
391 }
392 }
393 ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
394 ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
395 ProcedureState::Failed { .. }
396 | ProcedureState::Done { .. }
397 | ProcedureState::Poisoned { .. } => (),
398 }
399 }
400
401 fn submit_subprocedure(
403 &self,
404 procedure_id: ProcedureId,
405 procedure_state: ProcedureState,
406 procedure: BoxedProcedure,
407 ) {
408 if self.manager_ctx.contains_procedure(procedure_id) {
409 return;
411 }
412
413 let step = 0;
414
415 let meta = Arc::new(ProcedureMeta::new(
416 procedure_id,
417 procedure_state,
418 Some(self.meta.id),
419 procedure.lock_key(),
420 procedure.poison_keys(),
421 procedure.type_name(),
422 ));
423 let runner = Runner {
424 meta: meta.clone(),
425 procedure,
426 manager_ctx: self.manager_ctx.clone(),
427 step,
428 exponential_builder: self.exponential_builder,
429 store: self.store.clone(),
430 rolling_back: false,
431 };
432
433 assert!(
437 self.manager_ctx.try_insert_procedure(meta),
438 "Procedure {}-{} submit an existing procedure {}-{}",
439 self.procedure.type_name(),
440 self.meta.id,
441 runner.procedure.type_name(),
442 procedure_id,
443 );
444
445 self.meta.push_child(procedure_id);
447
448 let _handle = common_runtime::spawn_global(async move {
449 runner.run().await
451 });
452 }
453
454 async fn wait_on_err(&mut self, d: Duration, i: u64) {
456 info!(
457 "Procedure {}-{} retry for the {} times after {} millis",
458 self.procedure.type_name(),
459 self.meta.id,
460 i,
461 d.as_millis(),
462 );
463 time::sleep(d).await;
464 }
465
466 async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
467 let has_child = !subprocedures.is_empty();
468 for subprocedure in subprocedures {
469 info!(
470 "Procedure {}-{} submit subprocedure {}-{}",
471 self.procedure.type_name(),
472 self.meta.id,
473 subprocedure.procedure.type_name(),
474 subprocedure.id,
475 );
476
477 self.submit_subprocedure(
478 subprocedure.id,
479 ProcedureState::Running,
480 subprocedure.procedure,
481 );
482 }
483
484 info!(
485 "Procedure {}-{} is waiting for subprocedures",
486 self.procedure.type_name(),
487 self.meta.id,
488 );
489
490 if has_child {
492 self.meta.child_notify.notified().await;
493
494 info!(
495 "Procedure {}-{} is waked up",
496 self.procedure.type_name(),
497 self.meta.id,
498 );
499 }
500 }
501
502 async fn persist_procedure(&mut self) -> Result<()> {
503 let type_name = self.procedure.type_name().to_string();
504 let data = self.procedure.dump()?;
505
506 self.store
507 .store_procedure(
508 self.meta.id,
509 self.step,
510 type_name,
511 data,
512 self.meta.parent_id,
513 )
514 .await
515 .map_err(|e| {
516 error!(
517 e; "Failed to persist procedure {}-{}",
518 self.procedure.type_name(),
519 self.meta.id
520 );
521 e
522 })?;
523 self.step += 1;
524 Ok(())
525 }
526
527 async fn commit_procedure(&mut self) -> Result<()> {
528 self.store
529 .commit_procedure(self.meta.id, self.step)
530 .await
531 .map_err(|e| {
532 error!(
533 e; "Failed to commit procedure {}-{}",
534 self.procedure.type_name(),
535 self.meta.id
536 );
537 e
538 })?;
539 self.step += 1;
540 Ok(())
541 }
542
543 async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
544 let type_name = self.procedure.type_name().to_string();
546 let data = self.procedure.dump()?;
547 let message = ProcedureMessage {
548 type_name,
549 data,
550 parent_id: self.meta.parent_id,
551 step: self.step,
552 error: Some(error),
553 };
554 self.store
555 .rollback_procedure(self.meta.id, message)
556 .await
557 .map_err(|e| {
558 error!(
559 e; "Failed to write rollback key for procedure {}-{}",
560 self.procedure.type_name(),
561 self.meta.id
562 );
563 e
564 })?;
565 self.step += 1;
566 Ok(())
567 }
568
569 fn done(&self, output: Option<Output>) {
570 info!(
572 "Procedure {}-{} done",
573 self.procedure.type_name(),
574 self.meta.id,
575 );
576
577 self.meta.set_state(ProcedureState::Done { output });
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use std::sync::atomic::{AtomicU64, Ordering};
585 use std::sync::Arc;
586
587 use async_trait::async_trait;
588 use common_error::ext::{ErrorExt, PlainError};
589 use common_error::mock::MockError;
590 use common_error::status_code::StatusCode;
591 use common_test_util::temp_dir::create_temp_dir;
592 use futures::future::join_all;
593 use futures_util::future::BoxFuture;
594 use futures_util::FutureExt;
595 use object_store::{EntryMode, ObjectStore};
596 use tokio::sync::mpsc;
597
598 use super::*;
599 use crate::local::{test_util, DynamicKeyLockGuard};
600 use crate::procedure::PoisonKeys;
601 use crate::store::proc_path;
602 use crate::test_util::InMemoryPoisonStore;
603 use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
604
605 const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
606
607 fn new_runner(
608 meta: ProcedureMetaRef,
609 procedure: BoxedProcedure,
610 store: Arc<ProcedureStore>,
611 ) -> Runner {
612 Runner {
613 meta,
614 procedure,
615 manager_ctx: Arc::new(ManagerContext::new(
616 Arc::new(InMemoryPoisonStore::default()),
617 )),
618 step: 0,
619 exponential_builder: ExponentialBuilder::default(),
620 store,
621 rolling_back: false,
622 }
623 }
624
625 async fn check_files(
626 object_store: &ObjectStore,
627 procedure_store: &ProcedureStore,
628 procedure_id: ProcedureId,
629 files: &[&str],
630 ) {
631 let dir = proc_path!(procedure_store, "{procedure_id}/");
632 let lister = object_store.list(&dir).await.unwrap();
633 let mut files_in_dir: Vec<_> = lister
634 .into_iter()
635 .filter(|x| x.metadata().mode() == EntryMode::FILE)
636 .map(|de| de.name().to_string())
637 .collect();
638 files_in_dir.sort_unstable();
639 assert_eq!(files, files_in_dir);
640 }
641
642 fn context_with_provider(
643 procedure_id: ProcedureId,
644 provider: Arc<dyn ContextProvider>,
645 ) -> Context {
646 Context {
647 procedure_id,
648 provider,
649 }
650 }
651
652 fn context_without_provider(procedure_id: ProcedureId) -> Context {
653 struct MockProvider;
654
655 #[async_trait]
656 impl ContextProvider for MockProvider {
657 async fn procedure_state(
658 &self,
659 _procedure_id: ProcedureId,
660 ) -> Result<Option<ProcedureState>> {
661 unimplemented!()
662 }
663
664 async fn try_put_poison(
665 &self,
666 _key: &PoisonKey,
667 _procedure_id: ProcedureId,
668 ) -> Result<()> {
669 unimplemented!()
670 }
671
672 async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
673 unimplemented!()
674 }
675 }
676
677 Context {
678 procedure_id,
679 provider: Arc::new(MockProvider),
680 }
681 }
682
683 type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
684
685 struct ProcedureAdapter<F> {
686 data: String,
687 lock_key: LockKey,
688 poison_keys: PoisonKeys,
689 exec_fn: F,
690 rollback_fn: Option<RollbackFn>,
691 }
692
693 impl<F> ProcedureAdapter<F> {
694 fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
695 let mut meta = test_util::procedure_meta_for_test();
696 meta.id = ProcedureId::parse_str(uuid).unwrap();
697 meta.lock_key = self.lock_key.clone();
698 meta.poison_keys = self.poison_keys.clone();
699
700 Arc::new(meta)
701 }
702 }
703
704 #[async_trait]
705 impl<F> Procedure for ProcedureAdapter<F>
706 where
707 F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
708 {
709 fn type_name(&self) -> &str {
710 "ProcedureAdapter"
711 }
712
713 async fn execute(&mut self, ctx: &Context) -> Result<Status> {
714 let f = (self.exec_fn)(ctx.clone());
715 f.await
716 }
717
718 async fn rollback(&mut self, ctx: &Context) -> Result<()> {
719 if let Some(f) = &mut self.rollback_fn {
720 return (f)(ctx.clone()).await;
721 }
722 Ok(())
723 }
724
725 fn rollback_supported(&self) -> bool {
726 self.rollback_fn.is_some()
727 }
728
729 fn dump(&self) -> Result<String> {
730 Ok(self.data.clone())
731 }
732
733 fn lock_key(&self) -> LockKey {
734 self.lock_key.clone()
735 }
736
737 fn poison_keys(&self) -> PoisonKeys {
738 self.poison_keys.clone()
739 }
740 }
741
742 async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
743 let mut times = 0;
744 let exec_fn = move |_| {
745 times += 1;
746 async move {
747 if times == 1 {
748 Ok(Status::executing(persist))
749 } else {
750 Ok(Status::done())
751 }
752 }
753 .boxed()
754 };
755 let normal = ProcedureAdapter {
756 data: "normal".to_string(),
757 lock_key: LockKey::single_exclusive("catalog.schema.table"),
758 poison_keys: PoisonKeys::default(),
759 exec_fn,
760 rollback_fn: None,
761 };
762
763 let dir = create_temp_dir("normal");
764 let meta = normal.new_meta(ROOT_ID);
765 let ctx = context_without_provider(meta.id);
766 let object_store = test_util::new_object_store(&dir);
767 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
768 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
769 runner.manager_ctx.start();
770
771 runner.execute_once(&ctx).await;
772 let state = runner.meta.state();
773 assert!(state.is_running(), "{state:?}");
774 check_files(
775 &object_store,
776 &procedure_store,
777 ctx.procedure_id,
778 first_files,
779 )
780 .await;
781
782 runner.execute_once(&ctx).await;
783 let state = runner.meta.state();
784 assert!(state.is_done(), "{state:?}");
785 check_files(
786 &object_store,
787 &procedure_store,
788 ctx.procedure_id,
789 second_files,
790 )
791 .await;
792 }
793
794 #[tokio::test]
795 async fn test_execute_once_normal() {
796 execute_once_normal(
797 true,
798 &["0000000000.step"],
799 &["0000000000.step", "0000000001.commit"],
800 )
801 .await;
802 }
803
804 #[tokio::test]
805 async fn test_execute_once_normal_skip_persist() {
806 execute_once_normal(false, &[], &["0000000000.commit"]).await;
807 }
808
809 #[tokio::test]
810 async fn test_on_suspend_empty() {
811 let exec_fn = move |_| {
812 async move {
813 Ok(Status::Suspended {
814 subprocedures: Vec::new(),
815 persist: false,
816 })
817 }
818 .boxed()
819 };
820 let suspend = ProcedureAdapter {
821 data: "suspend".to_string(),
822 lock_key: LockKey::single_exclusive("catalog.schema.table"),
823 poison_keys: PoisonKeys::default(),
824 exec_fn,
825 rollback_fn: None,
826 };
827
828 let dir = create_temp_dir("suspend");
829 let meta = suspend.new_meta(ROOT_ID);
830 let ctx = context_without_provider(meta.id);
831 let object_store = test_util::new_object_store(&dir);
832 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
833 let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
834 runner.manager_ctx.start();
835
836 runner.execute_once(&ctx).await;
837 let state = runner.meta.state();
838 assert!(state.is_running(), "{state:?}");
839 }
840
841 fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
842 let mut times = 0;
843 let exec_fn = move |_| {
844 times += 1;
845 async move {
846 if times == 1 {
847 time::sleep(Duration::from_millis(200)).await;
848 Ok(Status::executing(true))
849 } else {
850 Ok(Status::done())
851 }
852 }
853 .boxed()
854 };
855 let child = ProcedureAdapter {
856 data: "child".to_string(),
857 lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
858 poison_keys: PoisonKeys::default(),
859 exec_fn,
860 rollback_fn: None,
861 };
862
863 ProcedureWithId {
864 id: procedure_id,
865 procedure: Box::new(child),
866 }
867 }
868
869 #[tokio::test]
870 async fn test_on_suspend_by_subprocedures() {
871 let mut times = 0;
872 let children_ids = [ProcedureId::random(), ProcedureId::random()];
873 let keys = [
874 &[
875 "catalog.schema.table.region-0",
876 "catalog.schema.table.region-1",
877 ],
878 &[
879 "catalog.schema.table.region-2",
880 "catalog.schema.table.region-3",
881 ],
882 ];
883
884 let exec_fn = move |ctx: Context| {
885 times += 1;
886 async move {
887 if times == 1 {
888 Ok(Status::Suspended {
890 subprocedures: children_ids
891 .into_iter()
892 .zip(keys)
893 .map(|(id, key_slice)| new_child_procedure(id, key_slice))
894 .collect(),
895 persist: true,
896 })
897 } else {
898 let mut all_child_done = true;
900 for id in children_ids {
901 let is_not_done = ctx
902 .provider
903 .procedure_state(id)
904 .await
905 .unwrap()
906 .map(|s| !s.is_done())
907 .unwrap_or(true);
908 if is_not_done {
909 all_child_done = false;
910 }
911 }
912 if all_child_done {
913 Ok(Status::done())
914 } else {
915 Ok(Status::Suspended {
917 subprocedures: Vec::new(),
918 persist: false,
919 })
920 }
921 }
922 }
923 .boxed()
924 };
925 let parent = ProcedureAdapter {
926 data: "parent".to_string(),
927 lock_key: LockKey::single_exclusive("catalog.schema.table"),
928 poison_keys: PoisonKeys::default(),
929 exec_fn,
930 rollback_fn: None,
931 };
932
933 let dir = create_temp_dir("parent");
934 let meta = parent.new_meta(ROOT_ID);
935 let procedure_id = meta.id;
936
937 let object_store = test_util::new_object_store(&dir);
938 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
939 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
940 let poison_manager = Arc::new(InMemoryPoisonStore::default());
941 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
942 manager_ctx.start();
943 assert!(manager_ctx.try_insert_procedure(meta));
945 runner.manager_ctx = manager_ctx.clone();
947
948 runner.run().await;
949 assert!(manager_ctx.key_lock.is_empty());
950
951 for child_id in children_ids {
953 let state = manager_ctx.state(child_id).unwrap();
954 assert!(state.is_done(), "{state:?}");
955 }
956 let state = manager_ctx.state(procedure_id).unwrap();
957 assert!(state.is_done(), "{state:?}");
958 check_files(&object_store, &procedure_store, procedure_id, &[]).await;
960
961 tokio::time::sleep(Duration::from_millis(5)).await;
962 manager_ctx.remove_outdated_meta(Duration::from_millis(1));
964 assert!(manager_ctx.state(procedure_id).is_none());
965 assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
966 for child_id in children_ids {
967 assert!(manager_ctx.state(child_id).is_none());
968 }
969 }
970
971 #[tokio::test]
972 async fn test_running_is_stopped() {
973 let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
974 let normal = ProcedureAdapter {
975 data: "normal".to_string(),
976 lock_key: LockKey::single_exclusive("catalog.schema.table"),
977 poison_keys: PoisonKeys::default(),
978 exec_fn,
979 rollback_fn: None,
980 };
981
982 let dir = create_temp_dir("test_running_is_stopped");
983 let meta = normal.new_meta(ROOT_ID);
984 let ctx = context_without_provider(meta.id);
985 let object_store = test_util::new_object_store(&dir);
986 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
987 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
988 runner.manager_ctx.start();
989
990 runner.execute_once(&ctx).await;
991 let state = runner.meta.state();
992 assert!(state.is_running(), "{state:?}");
993 check_files(
994 &object_store,
995 &procedure_store,
996 ctx.procedure_id,
997 &["0000000000.step"],
998 )
999 .await;
1000
1001 runner.manager_ctx.stop();
1002 runner.execute_once(&ctx).await;
1003 let state = runner.meta.state();
1004 assert!(state.is_failed(), "{state:?}");
1005 check_files(
1007 &object_store,
1008 &procedure_store,
1009 ctx.procedure_id,
1010 &["0000000000.step"],
1011 )
1012 .await;
1013 }
1014
1015 #[tokio::test]
1016 async fn test_running_is_stopped_on_error() {
1017 let exec_fn =
1018 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1019 let normal = ProcedureAdapter {
1020 data: "fail".to_string(),
1021 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1022 poison_keys: PoisonKeys::default(),
1023 exec_fn,
1024 rollback_fn: None,
1025 };
1026
1027 let dir = create_temp_dir("test_running_is_stopped_on_error");
1028 let meta = normal.new_meta(ROOT_ID);
1029 let ctx = context_without_provider(meta.id);
1030 let object_store = test_util::new_object_store(&dir);
1031 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1032 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1033 runner.manager_ctx.stop();
1034
1035 runner.execute_once(&ctx).await;
1036 let state = runner.meta.state();
1037 assert!(state.is_failed(), "{state:?}");
1038 check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1040 }
1041
1042 #[tokio::test]
1043 async fn test_execute_on_error() {
1044 let exec_fn =
1045 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1046 let fail = ProcedureAdapter {
1047 data: "fail".to_string(),
1048 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1049 poison_keys: PoisonKeys::default(),
1050 exec_fn,
1051 rollback_fn: None,
1052 };
1053
1054 let dir = create_temp_dir("fail");
1055 let meta = fail.new_meta(ROOT_ID);
1056 let ctx = context_without_provider(meta.id);
1057 let object_store = test_util::new_object_store(&dir);
1058 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1059 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1060 runner.manager_ctx.start();
1061
1062 runner.execute_once(&ctx).await;
1063 let state = runner.meta.state();
1064 assert!(state.is_prepare_rollback(), "{state:?}");
1065
1066 runner.execute_once(&ctx).await;
1067 let state = runner.meta.state();
1068 assert!(state.is_failed(), "{state:?}");
1069 check_files(
1070 &object_store,
1071 &procedure_store,
1072 ctx.procedure_id,
1073 &["0000000000.rollback"],
1074 )
1075 .await;
1076 }
1077
1078 #[tokio::test]
1079 async fn test_execute_with_rollback_on_error() {
1080 let exec_fn =
1081 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1082 let rollback_fn = move |_| async move { Ok(()) }.boxed();
1083 let fail = ProcedureAdapter {
1084 data: "fail".to_string(),
1085 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1086 poison_keys: PoisonKeys::default(),
1087 exec_fn,
1088 rollback_fn: Some(Box::new(rollback_fn)),
1089 };
1090
1091 let dir = create_temp_dir("fail");
1092 let meta = fail.new_meta(ROOT_ID);
1093 let ctx = context_without_provider(meta.id);
1094 let object_store = test_util::new_object_store(&dir);
1095 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1096 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1097 runner.manager_ctx.start();
1098
1099 runner.execute_once(&ctx).await;
1100 let state = runner.meta.state();
1101 assert!(state.is_prepare_rollback(), "{state:?}");
1102
1103 runner.execute_once(&ctx).await;
1104 let state = runner.meta.state();
1105 assert!(state.is_rolling_back(), "{state:?}");
1106
1107 runner.execute_once(&ctx).await;
1108 let state = runner.meta.state();
1109 assert!(state.is_failed(), "{state:?}");
1110 check_files(
1111 &object_store,
1112 &procedure_store,
1113 ctx.procedure_id,
1114 &["0000000000.rollback"],
1115 )
1116 .await;
1117 }
1118
1119 #[tokio::test]
1120 async fn test_execute_on_retry_later_error() {
1121 let mut times = 0;
1122
1123 let exec_fn = move |_| {
1124 times += 1;
1125 async move {
1126 if times == 1 {
1127 Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1128 } else {
1129 Ok(Status::done())
1130 }
1131 }
1132 .boxed()
1133 };
1134
1135 let retry_later = ProcedureAdapter {
1136 data: "retry_later".to_string(),
1137 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1138 poison_keys: PoisonKeys::default(),
1139 exec_fn,
1140 rollback_fn: None,
1141 };
1142
1143 let dir = create_temp_dir("retry_later");
1144 let meta = retry_later.new_meta(ROOT_ID);
1145 let ctx = context_without_provider(meta.id);
1146 let object_store = test_util::new_object_store(&dir);
1147 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1148 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1149 runner.manager_ctx.start();
1150 runner.execute_once(&ctx).await;
1151 let state = runner.meta.state();
1152 assert!(state.is_retrying(), "{state:?}");
1153
1154 runner.execute_once(&ctx).await;
1155 let state = runner.meta.state();
1156 assert!(state.is_done(), "{state:?}");
1157 assert!(meta.state().is_done());
1158 check_files(
1159 &object_store,
1160 &procedure_store,
1161 ctx.procedure_id,
1162 &["0000000000.commit"],
1163 )
1164 .await;
1165 }
1166
1167 #[tokio::test]
1168 async fn test_execute_exceed_max_retry_later() {
1169 let exec_fn =
1170 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1171
1172 let exceed_max_retry_later = ProcedureAdapter {
1173 data: "exceed_max_retry_later".to_string(),
1174 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1175 poison_keys: PoisonKeys::default(),
1176 exec_fn,
1177 rollback_fn: None,
1178 };
1179
1180 let dir = create_temp_dir("exceed_max_retry_later");
1181 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1182 let object_store = test_util::new_object_store(&dir);
1183 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1184 let mut runner = new_runner(
1185 meta.clone(),
1186 Box::new(exceed_max_retry_later),
1187 procedure_store,
1188 );
1189 runner.manager_ctx.start();
1190
1191 runner.exponential_builder = ExponentialBuilder::default()
1192 .with_min_delay(Duration::from_millis(1))
1193 .with_max_times(3);
1194
1195 runner.execute_procedure_in_loop().await;
1197 let err = meta.state().error().unwrap().to_string();
1198 assert!(err.contains("Procedure retry exceeded max times"));
1199 }
1200
1201 #[tokio::test]
1202 async fn test_rollback_exceed_max_retry_later() {
1203 let exec_fn =
1204 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1205 let rollback_fn = move |_| {
1206 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1207 };
1208 let exceed_max_retry_later = ProcedureAdapter {
1209 data: "exceed_max_rollback".to_string(),
1210 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1211 poison_keys: PoisonKeys::default(),
1212 exec_fn,
1213 rollback_fn: Some(Box::new(rollback_fn)),
1214 };
1215
1216 let dir = create_temp_dir("exceed_max_rollback");
1217 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1218 let object_store = test_util::new_object_store(&dir);
1219 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1220 let mut runner = new_runner(
1221 meta.clone(),
1222 Box::new(exceed_max_retry_later),
1223 procedure_store,
1224 );
1225 runner.manager_ctx.start();
1226 runner.exponential_builder = ExponentialBuilder::default()
1227 .with_min_delay(Duration::from_millis(1))
1228 .with_max_times(3);
1229
1230 runner.execute_procedure_in_loop().await;
1232 let err = meta.state().error().unwrap().to_string();
1233 assert!(err.contains("Procedure rollback exceeded max times"));
1234 }
1235
1236 #[tokio::test]
1237 async fn test_rollback_after_retry_fail() {
1238 let exec_fn = move |_| {
1239 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1240 };
1241
1242 let (tx, mut rx) = mpsc::channel(1);
1243 let rollback_fn = move |_| {
1244 let tx = tx.clone();
1245 async move {
1246 tx.send(()).await.unwrap();
1247 Ok(())
1248 }
1249 .boxed()
1250 };
1251 let retry_later = ProcedureAdapter {
1252 data: "rollback_after_retry_fail".to_string(),
1253 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1254 poison_keys: PoisonKeys::default(),
1255 exec_fn,
1256 rollback_fn: Some(Box::new(rollback_fn)),
1257 };
1258
1259 let dir = create_temp_dir("retry_later");
1260 let meta = retry_later.new_meta(ROOT_ID);
1261 let ctx = context_without_provider(meta.id);
1262 let object_store = test_util::new_object_store(&dir);
1263 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1264 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1265 runner.manager_ctx.start();
1266 runner.exponential_builder = ExponentialBuilder::default()
1267 .with_min_delay(Duration::from_millis(1))
1268 .with_max_times(3);
1269 runner.execute_procedure_in_loop().await;
1271 rx.recv().await.unwrap();
1272 assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1273 check_files(
1274 &object_store,
1275 &procedure_store,
1276 ctx.procedure_id,
1277 &["0000000000.rollback"],
1278 )
1279 .await;
1280 }
1281
1282 #[tokio::test]
1283 async fn test_child_error() {
1284 let mut times = 0;
1285 let child_id = ProcedureId::random();
1286
1287 let exec_fn = move |ctx: Context| {
1288 times += 1;
1289 async move {
1290 if times == 1 {
1291 let exec_fn = |_| {
1293 async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1294 .boxed()
1295 };
1296 let fail = ProcedureAdapter {
1297 data: "fail".to_string(),
1298 lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1299 poison_keys: PoisonKeys::default(),
1300 exec_fn,
1301 rollback_fn: None,
1302 };
1303
1304 Ok(Status::Suspended {
1305 subprocedures: vec![ProcedureWithId {
1306 id: child_id,
1307 procedure: Box::new(fail),
1308 }],
1309 persist: true,
1310 })
1311 } else {
1312 let state = ctx.provider.procedure_state(child_id).await.unwrap();
1314 let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1315 if is_failed {
1316 Err(Error::from_error_ext(PlainError::new(
1318 "subprocedure failed".to_string(),
1319 StatusCode::Unexpected,
1320 )))
1321 } else {
1322 Ok(Status::Suspended {
1324 subprocedures: Vec::new(),
1325 persist: false,
1326 })
1327 }
1328 }
1329 }
1330 .boxed()
1331 };
1332 let parent = ProcedureAdapter {
1333 data: "parent".to_string(),
1334 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1335 poison_keys: PoisonKeys::default(),
1336 exec_fn,
1337 rollback_fn: None,
1338 };
1339
1340 let dir = create_temp_dir("child_err");
1341 let meta = parent.new_meta(ROOT_ID);
1342
1343 let object_store = test_util::new_object_store(&dir);
1344 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1345 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1346 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1347 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1348 manager_ctx.start();
1349 assert!(manager_ctx.try_insert_procedure(meta.clone()));
1351 runner.manager_ctx = manager_ctx.clone();
1353
1354 runner.run().await;
1356 assert!(manager_ctx.key_lock.is_empty());
1357 let err = meta.state().error().unwrap().output_msg();
1358 assert!(err.contains("subprocedure failed"), "{err}");
1359 }
1360
1361 #[tokio::test]
1362 async fn test_execute_with_clean_poisons() {
1363 common_telemetry::init_default_ut_logging();
1364 let mut times = 0;
1365 let poison_key = PoisonKey::new("table/1024");
1366 let moved_poison_key = poison_key.clone();
1367 let exec_fn = move |ctx: Context| {
1368 times += 1;
1369 let poison_key = moved_poison_key.clone();
1370 async move {
1371 if times == 1 {
1372 ctx.provider
1374 .try_put_poison(&poison_key, ctx.procedure_id)
1375 .await
1376 .unwrap();
1377
1378 Ok(Status::executing(true))
1379 } else {
1380 Ok(Status::executing_with_clean_poisons(true))
1381 }
1382 }
1383 .boxed()
1384 };
1385 let poison = ProcedureAdapter {
1386 data: "poison".to_string(),
1387 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1388 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1389 exec_fn,
1390 rollback_fn: None,
1391 };
1392
1393 let dir = create_temp_dir("clean_poisons");
1394 let meta = poison.new_meta(ROOT_ID);
1395
1396 let object_store = test_util::new_object_store(&dir);
1397 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1398 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1399
1400 let ctx = context_with_provider(
1402 meta.id,
1403 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1404 );
1405 runner
1407 .manager_ctx
1408 .procedures
1409 .write()
1410 .unwrap()
1411 .insert(meta.id, runner.meta.clone());
1412
1413 runner.manager_ctx.start();
1414 runner.execute_once(&ctx).await;
1415 let state = runner.meta.state();
1416 assert!(state.is_running(), "{state:?}");
1417
1418 let procedure_id = runner
1419 .manager_ctx
1420 .poison_manager
1421 .get_poison(&poison_key.to_string())
1422 .await
1423 .unwrap();
1424 assert!(procedure_id.is_some());
1426
1427 runner.execute_once(&ctx).await;
1428 let state = runner.meta.state();
1429 assert!(state.is_running(), "{state:?}");
1430
1431 let procedure_id = runner
1432 .manager_ctx
1433 .poison_manager
1434 .get_poison(&poison_key.to_string())
1435 .await
1436 .unwrap();
1437 assert!(procedure_id.is_none());
1439 }
1440
1441 #[tokio::test]
1442 async fn test_execute_error_with_clean_poisons() {
1443 common_telemetry::init_default_ut_logging();
1444 let mut times = 0;
1445 let poison_key = PoisonKey::new("table/1024");
1446 let moved_poison_key = poison_key.clone();
1447 let exec_fn = move |ctx: Context| {
1448 times += 1;
1449 let poison_key = moved_poison_key.clone();
1450 async move {
1451 if times == 1 {
1452 ctx.provider
1454 .try_put_poison(&poison_key, ctx.procedure_id)
1455 .await
1456 .unwrap();
1457
1458 Ok(Status::executing(true))
1459 } else {
1460 Err(Error::external_and_clean_poisons(MockError::new(
1461 StatusCode::Unexpected,
1462 )))
1463 }
1464 }
1465 .boxed()
1466 };
1467 let poison = ProcedureAdapter {
1468 data: "poison".to_string(),
1469 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1470 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1471 exec_fn,
1472 rollback_fn: None,
1473 };
1474
1475 let dir = create_temp_dir("error_with_clean_poisons");
1476 let meta = poison.new_meta(ROOT_ID);
1477
1478 let object_store = test_util::new_object_store(&dir);
1479 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1480 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1481
1482 let ctx = context_with_provider(
1484 meta.id,
1485 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1486 );
1487 runner
1489 .manager_ctx
1490 .procedures
1491 .write()
1492 .unwrap()
1493 .insert(meta.id, runner.meta.clone());
1494
1495 runner.manager_ctx.start();
1496 runner.execute_once(&ctx).await;
1497 let state = runner.meta.state();
1498 assert!(state.is_running(), "{state:?}");
1499
1500 let procedure_id = runner
1501 .manager_ctx
1502 .poison_manager
1503 .get_poison(&poison_key.to_string())
1504 .await
1505 .unwrap();
1506 assert!(procedure_id.is_some());
1508
1509 runner.execute_once(&ctx).await;
1510 let state = runner.meta.state();
1511 assert!(state.is_prepare_rollback(), "{state:?}");
1512
1513 let procedure_id = runner
1514 .manager_ctx
1515 .poison_manager
1516 .get_poison(&poison_key.to_string())
1517 .await
1518 .unwrap();
1519 assert!(procedure_id.is_none());
1521 }
1522
1523 #[tokio::test]
1524 async fn test_execute_failed_after_set_poison() {
1525 let mut times = 0;
1526 let poison_key = PoisonKey::new("table/1024");
1527 let moved_poison_key = poison_key.clone();
1528 let exec_fn = move |ctx: Context| {
1529 times += 1;
1530 let poison_key = moved_poison_key.clone();
1531 async move {
1532 if times == 1 {
1533 Ok(Status::executing(true))
1534 } else {
1535 ctx.provider
1537 .try_put_poison(&poison_key, ctx.procedure_id)
1538 .await
1539 .unwrap();
1540 Err(Error::external(MockError::new(StatusCode::Unexpected)))
1541 }
1542 }
1543 .boxed()
1544 };
1545 let poison = ProcedureAdapter {
1546 data: "poison".to_string(),
1547 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1548 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1549 exec_fn,
1550 rollback_fn: None,
1551 };
1552
1553 let dir = create_temp_dir("poison");
1554 let meta = poison.new_meta(ROOT_ID);
1555
1556 let object_store = test_util::new_object_store(&dir);
1557 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1558 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1559
1560 let ctx = context_with_provider(
1562 meta.id,
1563 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1564 );
1565 runner
1567 .manager_ctx
1568 .procedures
1569 .write()
1570 .unwrap()
1571 .insert(meta.id, runner.meta.clone());
1572
1573 runner.manager_ctx.start();
1574 runner.execute_once(&ctx).await;
1575 let state = runner.meta.state();
1576 assert!(state.is_running(), "{state:?}");
1577
1578 runner.execute_once(&ctx).await;
1579 let state = runner.meta.state();
1580 assert!(state.is_prepare_rollback(), "{state:?}");
1581 assert!(meta.state().is_prepare_rollback());
1582
1583 runner.execute_once(&ctx).await;
1584 let state = runner.meta.state();
1585 assert!(state.is_failed(), "{state:?}");
1586 assert!(meta.state().is_failed());
1587
1588 let procedure_id = runner
1590 .manager_ctx
1591 .poison_manager
1592 .get_poison(&poison_key.to_string())
1593 .await
1594 .unwrap()
1595 .unwrap();
1596
1597 assert_eq!(&procedure_id.to_string(), ROOT_ID);
1599 }
1600
1601 #[tokio::test]
1602 async fn test_execute_poisoned() {
1603 let mut times = 0;
1604 let poison_key = PoisonKey::new("table/1024");
1605 let moved_poison_key = poison_key.clone();
1606 let exec_fn = move |ctx: Context| {
1607 times += 1;
1608 let poison_key = moved_poison_key.clone();
1609 async move {
1610 if times == 1 {
1611 Ok(Status::executing(true))
1612 } else {
1613 ctx.provider
1615 .try_put_poison(&poison_key, ctx.procedure_id)
1616 .await
1617 .unwrap();
1618 Ok(Status::Poisoned {
1619 keys: PoisonKeys::new(vec![poison_key.clone()]),
1620 error: Error::external(MockError::new(StatusCode::Unexpected)),
1621 })
1622 }
1623 }
1624 .boxed()
1625 };
1626 let poison = ProcedureAdapter {
1627 data: "poison".to_string(),
1628 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1629 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1630 exec_fn,
1631 rollback_fn: None,
1632 };
1633
1634 let dir = create_temp_dir("poison");
1635 let meta = poison.new_meta(ROOT_ID);
1636
1637 let object_store = test_util::new_object_store(&dir);
1638 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1639 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1640
1641 let ctx = context_with_provider(
1643 meta.id,
1644 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1645 );
1646 runner
1648 .manager_ctx
1649 .procedures
1650 .write()
1651 .unwrap()
1652 .insert(meta.id, runner.meta.clone());
1653
1654 runner.manager_ctx.start();
1655 runner.execute_once(&ctx).await;
1656 let state = runner.meta.state();
1657 assert!(state.is_running(), "{state:?}");
1658
1659 runner.execute_once(&ctx).await;
1660 let state = runner.meta.state();
1661 assert!(state.is_poisoned(), "{state:?}");
1662 assert!(meta.state().is_poisoned());
1663 check_files(
1664 &object_store,
1665 &procedure_store,
1666 ctx.procedure_id,
1667 &["0000000000.step"],
1668 )
1669 .await;
1670
1671 let procedure_id = runner
1673 .manager_ctx
1674 .poison_manager
1675 .get_poison(&poison_key.to_string())
1676 .await
1677 .unwrap()
1678 .unwrap();
1679
1680 assert_eq!(procedure_id, ROOT_ID);
1682 }
1683
1684 fn test_procedure_with_dynamic_lock(
1685 shared_atomic_value: Arc<AtomicU64>,
1686 id: u64,
1687 ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1688 let exec_fn = move |ctx: Context| {
1689 let moved_shared_atomic_value = shared_atomic_value.clone();
1690 let moved_ctx = ctx.clone();
1691 async move {
1692 debug!("Acquiring write lock, id: {}", id);
1693 let key = StringKey::Exclusive("test_lock".to_string());
1694 let guard = moved_ctx.provider.acquire_lock(&key).await;
1695 debug!("Acquired write lock, id: {}", id);
1696 let millis = rand::rng().random_range(10..=50);
1697 tokio::time::sleep(Duration::from_millis(millis)).await;
1698 let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1699 moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1700 debug!("Dropping write lock, id: {}", id);
1701 drop(guard);
1702
1703 Ok(Status::done())
1704 }
1705 .boxed()
1706 };
1707
1708 let adapter = ProcedureAdapter {
1709 data: "dynamic_lock".to_string(),
1710 lock_key: LockKey::new_exclusive([]),
1711 poison_keys: PoisonKeys::new([]),
1712 exec_fn,
1713 rollback_fn: None,
1714 };
1715 let meta = adapter.new_meta(ROOT_ID);
1716
1717 (Box::new(adapter), meta)
1718 }
1719
1720 #[tokio::test(flavor = "multi_thread")]
1721 async fn test_execute_with_dynamic_lock() {
1722 common_telemetry::init_default_ut_logging();
1723 let shared_atomic_value = Arc::new(AtomicU64::new(0));
1724 let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1725 let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1726
1727 let dir = create_temp_dir("dynamic_lock");
1728 let object_store = test_util::new_object_store(&dir);
1729 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1730 let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
1731 let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
1732 let ctx1 = context_with_provider(
1733 meta1.id,
1734 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1735 );
1736 let ctx2 = context_with_provider(
1737 meta2.id,
1738 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1740 );
1741 let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
1742 join_all(tasks).await;
1743 assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
1744 }
1745}