1use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_telemetry::{debug, error, info};
21use rand::Rng;
22use snafu::ResultExt;
23use tokio::time;
24
25use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
26use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
27use crate::procedure::{Output, StringKey};
28use crate::rwlock::OwnedKeyRwLockGuard;
29use crate::store::{ProcedureMessage, ProcedureStore};
30use crate::{
31 BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
32};
33
34struct ProcedureGuard {
36 meta: ProcedureMetaRef,
37 manager_ctx: Arc<ManagerContext>,
38 key_guards: Vec<OwnedKeyRwLockGuard>,
39 finish: bool,
40}
41
42impl ProcedureGuard {
43 fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
45 ProcedureGuard {
46 meta,
47 manager_ctx,
48 key_guards: vec![],
49 finish: false,
50 }
51 }
52
53 fn finish(mut self) {
55 self.finish = true;
56 }
57}
58
59impl Drop for ProcedureGuard {
60 fn drop(&mut self) {
61 if !self.finish {
62 error!("Procedure {} exits unexpectedly", self.meta.id);
63
64 let err = ProcedurePanicSnafu {
68 procedure_id: self.meta.id,
69 }
70 .build();
71 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
72 }
73
74 if let Some(parent_id) = self.meta.parent_id {
76 self.manager_ctx.notify_by_subprocedure(parent_id);
77 }
78
79 while !self.key_guards.is_empty() {
81 self.key_guards.pop();
82 }
83
84 self.manager_ctx
86 .key_lock
87 .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
88 }
89}
90
91pub(crate) struct Runner {
92 pub(crate) meta: ProcedureMetaRef,
93 pub(crate) procedure: BoxedProcedure,
94 pub(crate) manager_ctx: Arc<ManagerContext>,
95 pub(crate) step: u32,
96 pub(crate) exponential_builder: ExponentialBuilder,
97 pub(crate) store: Arc<ProcedureStore>,
98 pub(crate) rolling_back: bool,
99}
100
101impl Runner {
102 pub(crate) fn running(&self) -> bool {
104 self.manager_ctx.running()
105 }
106
107 pub(crate) async fn run(mut self) {
109 let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
111
112 info!(
113 "Runner {}-{} starts",
114 self.procedure.type_name(),
115 self.meta.id
116 );
117
118 for key in self.meta.lock_key.keys_to_lock() {
121 let key_guard = match key {
123 StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
124 StringKey::Exclusive(key) => {
125 self.manager_ctx.key_lock.write(key.clone()).await.into()
126 }
127 };
128
129 guard.key_guards.push(key_guard);
130 }
131
132 self.meta.set_start_time_ms();
135 self.execute_procedure_in_loop().await;
136 self.meta.set_end_time_ms();
137
138 guard.finish();
145
146 if self.meta.parent_id.is_none() {
148 let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
149 self.manager_ctx.on_procedures_finish(&procedure_ids);
151
152 if !self.running() {
154 return;
155 }
156
157 for id in procedure_ids {
158 if let Err(e) = self.store.delete_procedure(id).await {
159 error!(
160 e;
161 "Runner {}-{} failed to delete procedure {}",
162 self.procedure.type_name(),
163 self.meta.id,
164 id,
165 );
166 }
167 }
168 }
169
170 info!(
171 "Runner {}-{} exits",
172 self.procedure.type_name(),
173 self.meta.id
174 );
175 }
176
177 async fn execute_procedure_in_loop(&mut self) {
178 let ctx = Context {
179 procedure_id: self.meta.id,
180 provider: self.manager_ctx.clone(),
181 };
182
183 self.rolling_back = false;
184 self.execute_once_with_retry(&ctx).await;
185 }
186
187 async fn execute_once_with_retry(&mut self, ctx: &Context) {
188 let mut retry = self.exponential_builder.build();
189 let mut retry_times = 0;
190
191 let mut rollback = self.exponential_builder.build();
192 let mut rollback_times = 0;
193
194 loop {
195 if !self.running() {
197 self.meta.set_state(ProcedureState::failed(Arc::new(
198 error::ManagerNotStartSnafu {}.build(),
199 )));
200 return;
201 }
202 let state = self.meta.state();
203 match state {
204 ProcedureState::Running => {}
205 ProcedureState::Retrying { error } => {
206 retry_times += 1;
207 if let Some(d) = retry.next() {
208 let millis = d.as_millis() as u64;
209 let noise = rand::rng().random_range(0..(millis / 4) + 1);
211 let d = d.add(Duration::from_millis(noise));
212
213 self.wait_on_err(d, retry_times).await;
214 } else {
215 self.meta
216 .set_state(ProcedureState::prepare_rollback(Arc::new(
217 Error::RetryTimesExceeded {
218 source: error.clone(),
219 procedure_id: self.meta.id,
220 },
221 )));
222 }
223 }
224 ProcedureState::PrepareRollback { error }
225 | ProcedureState::RollingBack { error } => {
226 rollback_times += 1;
227 if let Some(d) = rollback.next() {
228 self.wait_on_err(d, rollback_times).await;
229 } else {
230 let err = Err::<(), Arc<Error>>(error)
231 .context(RollbackTimesExceededSnafu {
232 procedure_id: self.meta.id,
233 })
234 .unwrap_err();
235 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
236 return;
237 }
238 }
239 ProcedureState::Done { .. } => return,
240 ProcedureState::Failed { .. } => return,
241 ProcedureState::Poisoned { .. } => return,
242 }
243 self.execute_once(ctx).await;
244 }
245 }
246
247 async fn clean_poisons(&mut self) -> Result<()> {
248 let mut error = None;
249 for key in self.meta.poison_keys.iter() {
250 let key = key.to_string();
251 if let Err(e) = self
252 .manager_ctx
253 .poison_manager
254 .delete_poison(key, self.meta.id.to_string())
255 .await
256 {
257 error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
258 error = Some(e);
259 }
260 }
261
262 if let Some(e) = error {
264 return Err(e);
265 }
266 Ok(())
267 }
268
269 async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
270 if self.procedure.rollback_supported() {
271 if let Err(e) = self.procedure.rollback(ctx).await {
272 self.meta
273 .set_state(ProcedureState::rolling_back(Arc::new(e)));
274 return;
275 }
276 }
277 self.meta.set_state(ProcedureState::failed(err));
278 }
279
280 async fn prepare_rollback(&mut self, err: Arc<Error>) {
281 if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
282 self.meta
283 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
284 return;
285 }
286 if self.procedure.rollback_supported() {
287 self.meta.set_state(ProcedureState::rolling_back(err));
288 } else {
289 self.meta.set_state(ProcedureState::failed(err));
290 }
291 }
292
293 async fn execute_once(&mut self, ctx: &Context) {
294 match self.meta.state() {
295 ProcedureState::Running | ProcedureState::Retrying { .. } => {
296 match self.procedure.execute(ctx).await {
297 Ok(status) => {
298 debug!(
299 "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
300 self.procedure.type_name(),
301 self.meta.id,
302 status,
303 status.need_persist(),
304 );
305
306 if !self.running() {
308 self.meta.set_state(ProcedureState::failed(Arc::new(
309 error::ManagerNotStartSnafu {}.build(),
310 )));
311 return;
312 }
313
314 if status.need_clean_poisons() {
316 if let Err(e) = self.clean_poisons().await {
317 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
318 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
319 return;
320 }
321 }
322
323 if status.need_persist() {
324 if let Err(e) = self.persist_procedure().await {
325 error!(e; "Failed to persist procedure: {}", self.meta.id);
326 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
327 return;
328 }
329 }
330
331 match status {
332 Status::Executing { .. } => {}
333 Status::Suspended { subprocedures, .. } => {
334 self.on_suspended(subprocedures).await;
335 }
336 Status::Done { output } => {
337 if let Err(e) = self.commit_procedure().await {
338 error!(e; "Failed to commit procedure: {}", self.meta.id);
339 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
340 return;
341 }
342
343 self.done(output);
344 }
345 Status::Poisoned { error, keys } => {
346 error!(
347 error;
348 "Procedure {}-{} is poisoned, keys: {:?}",
349 self.procedure.type_name(),
350 self.meta.id,
351 keys,
352 );
353 self.meta
354 .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
355 }
356 }
357 }
358 Err(e) => {
359 error!(
360 e;
361 "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
362 self.procedure.type_name(),
363 self.meta.id,
364 e.is_retry_later(),
365 e.need_clean_poisons(),
366 );
367
368 if !self.running() {
370 self.meta.set_state(ProcedureState::failed(Arc::new(
371 error::ManagerNotStartSnafu {}.build(),
372 )));
373 return;
374 }
375
376 if e.need_clean_poisons() {
377 if let Err(e) = self.clean_poisons().await {
378 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
379 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
380 return;
381 }
382 debug!(
383 "Procedure {}-{} cleaned poisons",
384 self.procedure.type_name(),
385 self.meta.id,
386 );
387 }
388
389 if e.is_retry_later() {
390 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
391 return;
392 }
393
394 self.meta
395 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
396 }
397 }
398 }
399 ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
400 ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
401 ProcedureState::Failed { .. }
402 | ProcedureState::Done { .. }
403 | ProcedureState::Poisoned { .. } => (),
404 }
405 }
406
407 fn submit_subprocedure(
409 &self,
410 procedure_id: ProcedureId,
411 procedure_state: ProcedureState,
412 procedure: BoxedProcedure,
413 ) {
414 if self.manager_ctx.contains_procedure(procedure_id) {
415 return;
417 }
418
419 let step = 0;
420
421 let meta = Arc::new(ProcedureMeta::new(
422 procedure_id,
423 procedure_state,
424 Some(self.meta.id),
425 procedure.lock_key(),
426 procedure.poison_keys(),
427 procedure.type_name(),
428 ));
429 let runner = Runner {
430 meta: meta.clone(),
431 procedure,
432 manager_ctx: self.manager_ctx.clone(),
433 step,
434 exponential_builder: self.exponential_builder,
435 store: self.store.clone(),
436 rolling_back: false,
437 };
438
439 assert!(
443 self.manager_ctx.try_insert_procedure(meta),
444 "Procedure {}-{} submit an existing procedure {}-{}",
445 self.procedure.type_name(),
446 self.meta.id,
447 runner.procedure.type_name(),
448 procedure_id,
449 );
450
451 self.meta.push_child(procedure_id);
453
454 let _handle = common_runtime::spawn_global(async move {
455 runner.run().await
457 });
458 }
459
460 async fn wait_on_err(&mut self, d: Duration, i: u64) {
462 info!(
463 "Procedure {}-{} retry for the {} times after {} millis",
464 self.procedure.type_name(),
465 self.meta.id,
466 i,
467 d.as_millis(),
468 );
469 time::sleep(d).await;
470 }
471
472 async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
473 let has_child = !subprocedures.is_empty();
474 for subprocedure in subprocedures {
475 info!(
476 "Procedure {}-{} submit subprocedure {}-{}",
477 self.procedure.type_name(),
478 self.meta.id,
479 subprocedure.procedure.type_name(),
480 subprocedure.id,
481 );
482
483 self.submit_subprocedure(
484 subprocedure.id,
485 ProcedureState::Running,
486 subprocedure.procedure,
487 );
488 }
489
490 info!(
491 "Procedure {}-{} is waiting for subprocedures",
492 self.procedure.type_name(),
493 self.meta.id,
494 );
495
496 if has_child {
498 self.meta.child_notify.notified().await;
499
500 info!(
501 "Procedure {}-{} is waked up",
502 self.procedure.type_name(),
503 self.meta.id,
504 );
505 }
506 }
507
508 async fn persist_procedure(&mut self) -> Result<()> {
509 let type_name = self.procedure.type_name().to_string();
510 let data = self.procedure.dump()?;
511
512 self.store
513 .store_procedure(
514 self.meta.id,
515 self.step,
516 type_name,
517 data,
518 self.meta.parent_id,
519 )
520 .await
521 .map_err(|e| {
522 error!(
523 e; "Failed to persist procedure {}-{}",
524 self.procedure.type_name(),
525 self.meta.id
526 );
527 e
528 })?;
529 self.step += 1;
530 Ok(())
531 }
532
533 async fn commit_procedure(&mut self) -> Result<()> {
534 self.store
535 .commit_procedure(self.meta.id, self.step)
536 .await
537 .map_err(|e| {
538 error!(
539 e; "Failed to commit procedure {}-{}",
540 self.procedure.type_name(),
541 self.meta.id
542 );
543 e
544 })?;
545 self.step += 1;
546 Ok(())
547 }
548
549 async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
550 let type_name = self.procedure.type_name().to_string();
552 let data = self.procedure.dump()?;
553 let message = ProcedureMessage {
554 type_name,
555 data,
556 parent_id: self.meta.parent_id,
557 step: self.step,
558 error: Some(error),
559 };
560 self.store
561 .rollback_procedure(self.meta.id, message)
562 .await
563 .map_err(|e| {
564 error!(
565 e; "Failed to write rollback key for procedure {}-{}",
566 self.procedure.type_name(),
567 self.meta.id
568 );
569 e
570 })?;
571 self.step += 1;
572 Ok(())
573 }
574
575 fn done(&self, output: Option<Output>) {
576 info!(
578 "Procedure {}-{} done",
579 self.procedure.type_name(),
580 self.meta.id,
581 );
582
583 self.meta.set_state(ProcedureState::Done { output });
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use std::assert_matches::assert_matches;
591 use std::sync::atomic::{AtomicU64, Ordering};
592 use std::sync::Arc;
593
594 use async_trait::async_trait;
595 use common_error::ext::{ErrorExt, PlainError};
596 use common_error::mock::MockError;
597 use common_error::status_code::StatusCode;
598 use common_test_util::temp_dir::create_temp_dir;
599 use futures::future::join_all;
600 use futures_util::future::BoxFuture;
601 use futures_util::FutureExt;
602 use object_store::{EntryMode, ObjectStore};
603 use tokio::sync::mpsc;
604 use tokio::sync::watch::Receiver;
605
606 use super::*;
607 use crate::local::{test_util, DynamicKeyLockGuard};
608 use crate::procedure::PoisonKeys;
609 use crate::store::proc_path;
610 use crate::test_util::InMemoryPoisonStore;
611 use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
612
613 const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
614
615 fn new_runner(
616 meta: ProcedureMetaRef,
617 procedure: BoxedProcedure,
618 store: Arc<ProcedureStore>,
619 ) -> Runner {
620 Runner {
621 meta,
622 procedure,
623 manager_ctx: Arc::new(ManagerContext::new(
624 Arc::new(InMemoryPoisonStore::default()),
625 )),
626 step: 0,
627 exponential_builder: ExponentialBuilder::default(),
628 store,
629 rolling_back: false,
630 }
631 }
632
633 async fn check_files(
634 object_store: &ObjectStore,
635 procedure_store: &ProcedureStore,
636 procedure_id: ProcedureId,
637 files: &[&str],
638 ) {
639 let dir = proc_path!(procedure_store, "{procedure_id}/");
640 let lister = object_store.list(&dir).await.unwrap();
641 let mut files_in_dir: Vec<_> = lister
642 .into_iter()
643 .filter(|x| x.metadata().mode() == EntryMode::FILE)
644 .map(|de| de.name().to_string())
645 .collect();
646 files_in_dir.sort_unstable();
647 assert_eq!(files, files_in_dir);
648 }
649
650 fn context_with_provider(
651 procedure_id: ProcedureId,
652 provider: Arc<dyn ContextProvider>,
653 ) -> Context {
654 Context {
655 procedure_id,
656 provider,
657 }
658 }
659
660 fn context_without_provider(procedure_id: ProcedureId) -> Context {
661 struct MockProvider;
662
663 #[async_trait]
664 impl ContextProvider for MockProvider {
665 async fn procedure_state(
666 &self,
667 _procedure_id: ProcedureId,
668 ) -> Result<Option<ProcedureState>> {
669 unimplemented!()
670 }
671
672 async fn procedure_state_receiver(
673 &self,
674 _procedure_id: ProcedureId,
675 ) -> Result<Option<Receiver<ProcedureState>>> {
676 unimplemented!()
677 }
678
679 async fn try_put_poison(
680 &self,
681 _key: &PoisonKey,
682 _procedure_id: ProcedureId,
683 ) -> Result<()> {
684 unimplemented!()
685 }
686
687 async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
688 unimplemented!()
689 }
690 }
691
692 Context {
693 procedure_id,
694 provider: Arc::new(MockProvider),
695 }
696 }
697
698 type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
699
700 struct ProcedureAdapter<F> {
701 data: String,
702 lock_key: LockKey,
703 poison_keys: PoisonKeys,
704 exec_fn: F,
705 rollback_fn: Option<RollbackFn>,
706 }
707
708 impl<F> ProcedureAdapter<F> {
709 fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
710 let mut meta = test_util::procedure_meta_for_test();
711 meta.id = ProcedureId::parse_str(uuid).unwrap();
712 meta.lock_key = self.lock_key.clone();
713 meta.poison_keys = self.poison_keys.clone();
714
715 Arc::new(meta)
716 }
717 }
718
719 #[async_trait]
720 impl<F> Procedure for ProcedureAdapter<F>
721 where
722 F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
723 {
724 fn type_name(&self) -> &str {
725 "ProcedureAdapter"
726 }
727
728 async fn execute(&mut self, ctx: &Context) -> Result<Status> {
729 let f = (self.exec_fn)(ctx.clone());
730 f.await
731 }
732
733 async fn rollback(&mut self, ctx: &Context) -> Result<()> {
734 if let Some(f) = &mut self.rollback_fn {
735 return (f)(ctx.clone()).await;
736 }
737 Ok(())
738 }
739
740 fn rollback_supported(&self) -> bool {
741 self.rollback_fn.is_some()
742 }
743
744 fn dump(&self) -> Result<String> {
745 Ok(self.data.clone())
746 }
747
748 fn lock_key(&self) -> LockKey {
749 self.lock_key.clone()
750 }
751
752 fn poison_keys(&self) -> PoisonKeys {
753 self.poison_keys.clone()
754 }
755 }
756
757 async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
758 let mut times = 0;
759 let exec_fn = move |_| {
760 times += 1;
761 async move {
762 if times == 1 {
763 Ok(Status::executing(persist))
764 } else {
765 Ok(Status::done())
766 }
767 }
768 .boxed()
769 };
770 let normal = ProcedureAdapter {
771 data: "normal".to_string(),
772 lock_key: LockKey::single_exclusive("catalog.schema.table"),
773 poison_keys: PoisonKeys::default(),
774 exec_fn,
775 rollback_fn: None,
776 };
777
778 let dir = create_temp_dir("normal");
779 let meta = normal.new_meta(ROOT_ID);
780 let ctx = context_without_provider(meta.id);
781 let object_store = test_util::new_object_store(&dir);
782 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
783 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
784 runner.manager_ctx.start();
785
786 runner.execute_once(&ctx).await;
787 let state = runner.meta.state();
788 assert!(state.is_running(), "{state:?}");
789 check_files(
790 &object_store,
791 &procedure_store,
792 ctx.procedure_id,
793 first_files,
794 )
795 .await;
796
797 runner.execute_once(&ctx).await;
798 let state = runner.meta.state();
799 assert!(state.is_done(), "{state:?}");
800 check_files(
801 &object_store,
802 &procedure_store,
803 ctx.procedure_id,
804 second_files,
805 )
806 .await;
807 }
808
809 #[tokio::test]
810 async fn test_execute_once_normal() {
811 execute_once_normal(
812 true,
813 &["0000000000.step"],
814 &["0000000000.step", "0000000001.commit"],
815 )
816 .await;
817 }
818
819 #[tokio::test]
820 async fn test_execute_once_normal_skip_persist() {
821 execute_once_normal(false, &[], &["0000000000.commit"]).await;
822 }
823
824 #[tokio::test]
825 async fn test_on_suspend_empty() {
826 let exec_fn = move |_| {
827 async move {
828 Ok(Status::Suspended {
829 subprocedures: Vec::new(),
830 persist: false,
831 })
832 }
833 .boxed()
834 };
835 let suspend = ProcedureAdapter {
836 data: "suspend".to_string(),
837 lock_key: LockKey::single_exclusive("catalog.schema.table"),
838 poison_keys: PoisonKeys::default(),
839 exec_fn,
840 rollback_fn: None,
841 };
842
843 let dir = create_temp_dir("suspend");
844 let meta = suspend.new_meta(ROOT_ID);
845 let ctx = context_without_provider(meta.id);
846 let object_store = test_util::new_object_store(&dir);
847 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
848 let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
849 runner.manager_ctx.start();
850
851 runner.execute_once(&ctx).await;
852 let state = runner.meta.state();
853 assert!(state.is_running(), "{state:?}");
854 }
855
856 fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
857 let mut times = 0;
858 let exec_fn = move |_| {
859 times += 1;
860 async move {
861 if times == 1 {
862 time::sleep(Duration::from_millis(200)).await;
863 Ok(Status::executing(true))
864 } else {
865 Ok(Status::done())
866 }
867 }
868 .boxed()
869 };
870 let child = ProcedureAdapter {
871 data: "child".to_string(),
872 lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
873 poison_keys: PoisonKeys::default(),
874 exec_fn,
875 rollback_fn: None,
876 };
877
878 ProcedureWithId {
879 id: procedure_id,
880 procedure: Box::new(child),
881 }
882 }
883
884 #[tokio::test]
885 async fn test_on_suspend_by_subprocedures() {
886 let mut times = 0;
887 let children_ids = [ProcedureId::random(), ProcedureId::random()];
888 let keys = [
889 &[
890 "catalog.schema.table.region-0",
891 "catalog.schema.table.region-1",
892 ],
893 &[
894 "catalog.schema.table.region-2",
895 "catalog.schema.table.region-3",
896 ],
897 ];
898
899 let exec_fn = move |ctx: Context| {
900 times += 1;
901 async move {
902 if times == 1 {
903 Ok(Status::Suspended {
905 subprocedures: children_ids
906 .into_iter()
907 .zip(keys)
908 .map(|(id, key_slice)| new_child_procedure(id, key_slice))
909 .collect(),
910 persist: true,
911 })
912 } else {
913 let mut all_child_done = true;
915 for id in children_ids {
916 let is_not_done = ctx
917 .provider
918 .procedure_state(id)
919 .await
920 .unwrap()
921 .map(|s| !s.is_done())
922 .unwrap_or(true);
923 if is_not_done {
924 all_child_done = false;
925 }
926 }
927 if all_child_done {
928 Ok(Status::done())
929 } else {
930 Ok(Status::Suspended {
932 subprocedures: Vec::new(),
933 persist: false,
934 })
935 }
936 }
937 }
938 .boxed()
939 };
940 let parent = ProcedureAdapter {
941 data: "parent".to_string(),
942 lock_key: LockKey::single_exclusive("catalog.schema.table"),
943 poison_keys: PoisonKeys::default(),
944 exec_fn,
945 rollback_fn: None,
946 };
947
948 let dir = create_temp_dir("parent");
949 let meta = parent.new_meta(ROOT_ID);
950 let procedure_id = meta.id;
951
952 let object_store = test_util::new_object_store(&dir);
953 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
954 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
955 let poison_manager = Arc::new(InMemoryPoisonStore::default());
956 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
957 manager_ctx.start();
958 assert!(manager_ctx.try_insert_procedure(meta));
960 runner.manager_ctx = manager_ctx.clone();
962
963 runner.run().await;
964 assert!(manager_ctx.key_lock.is_empty());
965
966 for child_id in children_ids {
968 let state = manager_ctx.state(child_id).unwrap();
969 assert!(state.is_done(), "{state:?}");
970 }
971 let state = manager_ctx.state(procedure_id).unwrap();
972 assert!(state.is_done(), "{state:?}");
973 check_files(&object_store, &procedure_store, procedure_id, &[]).await;
975
976 tokio::time::sleep(Duration::from_millis(5)).await;
977 manager_ctx.remove_outdated_meta(Duration::from_millis(1));
979 assert!(manager_ctx.state(procedure_id).is_none());
980 assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
981 for child_id in children_ids {
982 assert!(manager_ctx.state(child_id).is_none());
983 }
984 }
985
986 #[tokio::test]
987 async fn test_running_is_stopped() {
988 let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
989 let normal = ProcedureAdapter {
990 data: "normal".to_string(),
991 lock_key: LockKey::single_exclusive("catalog.schema.table"),
992 poison_keys: PoisonKeys::default(),
993 exec_fn,
994 rollback_fn: None,
995 };
996
997 let dir = create_temp_dir("test_running_is_stopped");
998 let meta = normal.new_meta(ROOT_ID);
999 let ctx = context_without_provider(meta.id);
1000 let object_store = test_util::new_object_store(&dir);
1001 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1002 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1003 runner.manager_ctx.start();
1004
1005 runner.execute_once(&ctx).await;
1006 let state = runner.meta.state();
1007 assert!(state.is_running(), "{state:?}");
1008 check_files(
1009 &object_store,
1010 &procedure_store,
1011 ctx.procedure_id,
1012 &["0000000000.step"],
1013 )
1014 .await;
1015
1016 runner.manager_ctx.stop();
1017 runner.execute_once(&ctx).await;
1018 let state = runner.meta.state();
1019 assert!(state.is_failed(), "{state:?}");
1020 check_files(
1022 &object_store,
1023 &procedure_store,
1024 ctx.procedure_id,
1025 &["0000000000.step"],
1026 )
1027 .await;
1028 }
1029
1030 #[tokio::test]
1031 async fn test_running_is_stopped_on_error() {
1032 let exec_fn =
1033 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1034 let normal = ProcedureAdapter {
1035 data: "fail".to_string(),
1036 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1037 poison_keys: PoisonKeys::default(),
1038 exec_fn,
1039 rollback_fn: None,
1040 };
1041
1042 let dir = create_temp_dir("test_running_is_stopped_on_error");
1043 let meta = normal.new_meta(ROOT_ID);
1044 let ctx = context_without_provider(meta.id);
1045 let object_store = test_util::new_object_store(&dir);
1046 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1047 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1048 runner.manager_ctx.stop();
1049
1050 runner.execute_once(&ctx).await;
1051 let state = runner.meta.state();
1052 assert!(state.is_failed(), "{state:?}");
1053 check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1055 }
1056
1057 #[tokio::test]
1058 async fn test_execute_on_error() {
1059 let exec_fn =
1060 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1061 let fail = ProcedureAdapter {
1062 data: "fail".to_string(),
1063 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1064 poison_keys: PoisonKeys::default(),
1065 exec_fn,
1066 rollback_fn: None,
1067 };
1068
1069 let dir = create_temp_dir("fail");
1070 let meta = fail.new_meta(ROOT_ID);
1071 let ctx = context_without_provider(meta.id);
1072 let object_store = test_util::new_object_store(&dir);
1073 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1074 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1075 runner.manager_ctx.start();
1076
1077 runner.execute_once(&ctx).await;
1078 let state = runner.meta.state();
1079 assert!(state.is_prepare_rollback(), "{state:?}");
1080
1081 runner.execute_once(&ctx).await;
1082 let state = runner.meta.state();
1083 assert!(state.is_failed(), "{state:?}");
1084 check_files(
1085 &object_store,
1086 &procedure_store,
1087 ctx.procedure_id,
1088 &["0000000000.rollback"],
1089 )
1090 .await;
1091 }
1092
1093 #[tokio::test]
1094 async fn test_execute_with_rollback_on_error() {
1095 let exec_fn =
1096 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1097 let rollback_fn = move |_| async move { Ok(()) }.boxed();
1098 let fail = ProcedureAdapter {
1099 data: "fail".to_string(),
1100 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1101 poison_keys: PoisonKeys::default(),
1102 exec_fn,
1103 rollback_fn: Some(Box::new(rollback_fn)),
1104 };
1105
1106 let dir = create_temp_dir("fail");
1107 let meta = fail.new_meta(ROOT_ID);
1108 let ctx = context_without_provider(meta.id);
1109 let object_store = test_util::new_object_store(&dir);
1110 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1111 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1112 runner.manager_ctx.start();
1113
1114 runner.execute_once(&ctx).await;
1115 let state = runner.meta.state();
1116 assert!(state.is_prepare_rollback(), "{state:?}");
1117
1118 runner.execute_once(&ctx).await;
1119 let state = runner.meta.state();
1120 assert!(state.is_rolling_back(), "{state:?}");
1121
1122 runner.execute_once(&ctx).await;
1123 let state = runner.meta.state();
1124 assert!(state.is_failed(), "{state:?}");
1125 check_files(
1126 &object_store,
1127 &procedure_store,
1128 ctx.procedure_id,
1129 &["0000000000.rollback"],
1130 )
1131 .await;
1132 }
1133
1134 #[tokio::test]
1135 async fn test_execute_on_retry_later_error() {
1136 let mut times = 0;
1137
1138 let exec_fn = move |_| {
1139 times += 1;
1140 async move {
1141 if times == 1 {
1142 Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1143 } else {
1144 Ok(Status::done())
1145 }
1146 }
1147 .boxed()
1148 };
1149
1150 let retry_later = ProcedureAdapter {
1151 data: "retry_later".to_string(),
1152 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1153 poison_keys: PoisonKeys::default(),
1154 exec_fn,
1155 rollback_fn: None,
1156 };
1157
1158 let dir = create_temp_dir("retry_later");
1159 let meta = retry_later.new_meta(ROOT_ID);
1160 let ctx = context_without_provider(meta.id);
1161 let object_store = test_util::new_object_store(&dir);
1162 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1163 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1164 runner.manager_ctx.start();
1165 runner.execute_once(&ctx).await;
1166 let state = runner.meta.state();
1167 assert!(state.is_retrying(), "{state:?}");
1168
1169 runner.execute_once(&ctx).await;
1170 let state = runner.meta.state();
1171 assert!(state.is_done(), "{state:?}");
1172 assert!(meta.state().is_done());
1173 check_files(
1174 &object_store,
1175 &procedure_store,
1176 ctx.procedure_id,
1177 &["0000000000.commit"],
1178 )
1179 .await;
1180 }
1181
1182 #[tokio::test]
1183 async fn test_execute_exceed_max_retry_later() {
1184 let exec_fn =
1185 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1186
1187 let exceed_max_retry_later = ProcedureAdapter {
1188 data: "exceed_max_retry_later".to_string(),
1189 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1190 poison_keys: PoisonKeys::default(),
1191 exec_fn,
1192 rollback_fn: None,
1193 };
1194
1195 let dir = create_temp_dir("exceed_max_retry_later");
1196 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1197 let object_store = test_util::new_object_store(&dir);
1198 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1199 let mut runner = new_runner(
1200 meta.clone(),
1201 Box::new(exceed_max_retry_later),
1202 procedure_store,
1203 );
1204 runner.manager_ctx.start();
1205
1206 runner.exponential_builder = ExponentialBuilder::default()
1207 .with_min_delay(Duration::from_millis(1))
1208 .with_max_times(3);
1209
1210 runner.execute_procedure_in_loop().await;
1212 let err = meta.state().error().unwrap().to_string();
1213 assert!(err.contains("Procedure retry exceeded max times"));
1214 }
1215
1216 #[tokio::test]
1217 async fn test_rollback_exceed_max_retry_later() {
1218 let exec_fn =
1219 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1220 let rollback_fn = move |_| {
1221 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1222 };
1223 let exceed_max_retry_later = ProcedureAdapter {
1224 data: "exceed_max_rollback".to_string(),
1225 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1226 poison_keys: PoisonKeys::default(),
1227 exec_fn,
1228 rollback_fn: Some(Box::new(rollback_fn)),
1229 };
1230
1231 let dir = create_temp_dir("exceed_max_rollback");
1232 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1233 let object_store = test_util::new_object_store(&dir);
1234 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1235 let mut runner = new_runner(
1236 meta.clone(),
1237 Box::new(exceed_max_retry_later),
1238 procedure_store,
1239 );
1240 runner.manager_ctx.start();
1241 runner.exponential_builder = ExponentialBuilder::default()
1242 .with_min_delay(Duration::from_millis(1))
1243 .with_max_times(3);
1244
1245 runner.execute_procedure_in_loop().await;
1247 let err = meta.state().error().unwrap().to_string();
1248 assert!(err.contains("Procedure rollback exceeded max times"));
1249 }
1250
1251 #[tokio::test]
1252 async fn test_rollback_after_retry_fail() {
1253 let exec_fn = move |_| {
1254 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1255 };
1256
1257 let (tx, mut rx) = mpsc::channel(1);
1258 let rollback_fn = move |_| {
1259 let tx = tx.clone();
1260 async move {
1261 tx.send(()).await.unwrap();
1262 Ok(())
1263 }
1264 .boxed()
1265 };
1266 let retry_later = ProcedureAdapter {
1267 data: "rollback_after_retry_fail".to_string(),
1268 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1269 poison_keys: PoisonKeys::default(),
1270 exec_fn,
1271 rollback_fn: Some(Box::new(rollback_fn)),
1272 };
1273
1274 let dir = create_temp_dir("retry_later");
1275 let meta = retry_later.new_meta(ROOT_ID);
1276 let ctx = context_without_provider(meta.id);
1277 let object_store = test_util::new_object_store(&dir);
1278 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1279 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1280 runner.manager_ctx.start();
1281 runner.exponential_builder = ExponentialBuilder::default()
1282 .with_min_delay(Duration::from_millis(1))
1283 .with_max_times(3);
1284 runner.execute_procedure_in_loop().await;
1286 rx.recv().await.unwrap();
1287 assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1288 check_files(
1289 &object_store,
1290 &procedure_store,
1291 ctx.procedure_id,
1292 &["0000000000.rollback"],
1293 )
1294 .await;
1295 }
1296
1297 #[tokio::test]
1298 async fn test_child_error() {
1299 let mut times = 0;
1300 let child_id = ProcedureId::random();
1301
1302 let exec_fn = move |ctx: Context| {
1303 times += 1;
1304 async move {
1305 if times == 1 {
1306 let exec_fn = |_| {
1308 async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1309 .boxed()
1310 };
1311 let fail = ProcedureAdapter {
1312 data: "fail".to_string(),
1313 lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1314 poison_keys: PoisonKeys::default(),
1315 exec_fn,
1316 rollback_fn: None,
1317 };
1318
1319 Ok(Status::Suspended {
1320 subprocedures: vec![ProcedureWithId {
1321 id: child_id,
1322 procedure: Box::new(fail),
1323 }],
1324 persist: true,
1325 })
1326 } else {
1327 let state = ctx.provider.procedure_state(child_id).await.unwrap();
1329 let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1330 if is_failed {
1331 Err(Error::from_error_ext(PlainError::new(
1333 "subprocedure failed".to_string(),
1334 StatusCode::Unexpected,
1335 )))
1336 } else {
1337 Ok(Status::Suspended {
1339 subprocedures: Vec::new(),
1340 persist: false,
1341 })
1342 }
1343 }
1344 }
1345 .boxed()
1346 };
1347 let parent = ProcedureAdapter {
1348 data: "parent".to_string(),
1349 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1350 poison_keys: PoisonKeys::default(),
1351 exec_fn,
1352 rollback_fn: None,
1353 };
1354
1355 let dir = create_temp_dir("child_err");
1356 let meta = parent.new_meta(ROOT_ID);
1357
1358 let object_store = test_util::new_object_store(&dir);
1359 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1360 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1361 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1362 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1363 manager_ctx.start();
1364 assert!(manager_ctx.try_insert_procedure(meta.clone()));
1366 runner.manager_ctx = manager_ctx.clone();
1368
1369 runner.run().await;
1371 assert!(manager_ctx.key_lock.is_empty());
1372 let err = meta.state().error().unwrap().output_msg();
1373 assert!(err.contains("subprocedure failed"), "{err}");
1374 }
1375
1376 #[tokio::test]
1377 async fn test_execute_with_clean_poisons() {
1378 common_telemetry::init_default_ut_logging();
1379 let mut times = 0;
1380 let poison_key = PoisonKey::new("table/1024");
1381 let moved_poison_key = poison_key.clone();
1382 let exec_fn = move |ctx: Context| {
1383 times += 1;
1384 let poison_key = moved_poison_key.clone();
1385 async move {
1386 if times == 1 {
1387 ctx.provider
1389 .try_put_poison(&poison_key, ctx.procedure_id)
1390 .await
1391 .unwrap();
1392
1393 Ok(Status::executing(true))
1394 } else {
1395 Ok(Status::executing_with_clean_poisons(true))
1396 }
1397 }
1398 .boxed()
1399 };
1400 let poison = ProcedureAdapter {
1401 data: "poison".to_string(),
1402 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1403 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1404 exec_fn,
1405 rollback_fn: None,
1406 };
1407
1408 let dir = create_temp_dir("clean_poisons");
1409 let meta = poison.new_meta(ROOT_ID);
1410
1411 let object_store = test_util::new_object_store(&dir);
1412 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1413 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1414
1415 let ctx = context_with_provider(
1417 meta.id,
1418 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1419 );
1420 runner
1422 .manager_ctx
1423 .procedures
1424 .write()
1425 .unwrap()
1426 .insert(meta.id, runner.meta.clone());
1427
1428 runner.manager_ctx.start();
1429 runner.execute_once(&ctx).await;
1430 let state = runner.meta.state();
1431 assert!(state.is_running(), "{state:?}");
1432
1433 let procedure_id = runner
1434 .manager_ctx
1435 .poison_manager
1436 .get_poison(&poison_key.to_string())
1437 .await
1438 .unwrap();
1439 assert!(procedure_id.is_some());
1441
1442 runner.execute_once(&ctx).await;
1443 let state = runner.meta.state();
1444 assert!(state.is_running(), "{state:?}");
1445
1446 let procedure_id = runner
1447 .manager_ctx
1448 .poison_manager
1449 .get_poison(&poison_key.to_string())
1450 .await
1451 .unwrap();
1452 assert!(procedure_id.is_none());
1454 }
1455
1456 #[tokio::test]
1457 async fn test_execute_error_with_clean_poisons() {
1458 common_telemetry::init_default_ut_logging();
1459 let mut times = 0;
1460 let poison_key = PoisonKey::new("table/1024");
1461 let moved_poison_key = poison_key.clone();
1462 let exec_fn = move |ctx: Context| {
1463 times += 1;
1464 let poison_key = moved_poison_key.clone();
1465 async move {
1466 if times == 1 {
1467 ctx.provider
1469 .try_put_poison(&poison_key, ctx.procedure_id)
1470 .await
1471 .unwrap();
1472
1473 Ok(Status::executing(true))
1474 } else {
1475 Err(Error::external_and_clean_poisons(MockError::new(
1476 StatusCode::Unexpected,
1477 )))
1478 }
1479 }
1480 .boxed()
1481 };
1482 let poison = ProcedureAdapter {
1483 data: "poison".to_string(),
1484 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1485 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1486 exec_fn,
1487 rollback_fn: None,
1488 };
1489
1490 let dir = create_temp_dir("error_with_clean_poisons");
1491 let meta = poison.new_meta(ROOT_ID);
1492
1493 let object_store = test_util::new_object_store(&dir);
1494 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1495 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1496
1497 let ctx = context_with_provider(
1499 meta.id,
1500 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1501 );
1502 runner
1504 .manager_ctx
1505 .procedures
1506 .write()
1507 .unwrap()
1508 .insert(meta.id, runner.meta.clone());
1509
1510 runner.manager_ctx.start();
1511 runner.execute_once(&ctx).await;
1512 let state = runner.meta.state();
1513 assert!(state.is_running(), "{state:?}");
1514
1515 let procedure_id = runner
1516 .manager_ctx
1517 .poison_manager
1518 .get_poison(&poison_key.to_string())
1519 .await
1520 .unwrap();
1521 assert!(procedure_id.is_some());
1523
1524 runner.execute_once(&ctx).await;
1525 let state = runner.meta.state();
1526 assert!(state.is_prepare_rollback(), "{state:?}");
1527
1528 let procedure_id = runner
1529 .manager_ctx
1530 .poison_manager
1531 .get_poison(&poison_key.to_string())
1532 .await
1533 .unwrap();
1534 assert!(procedure_id.is_none());
1536 }
1537
1538 #[tokio::test]
1539 async fn test_execute_failed_after_set_poison() {
1540 let mut times = 0;
1541 let poison_key = PoisonKey::new("table/1024");
1542 let moved_poison_key = poison_key.clone();
1543 let exec_fn = move |ctx: Context| {
1544 times += 1;
1545 let poison_key = moved_poison_key.clone();
1546 async move {
1547 if times == 1 {
1548 Ok(Status::executing(true))
1549 } else {
1550 ctx.provider
1552 .try_put_poison(&poison_key, ctx.procedure_id)
1553 .await
1554 .unwrap();
1555 Err(Error::external(MockError::new(StatusCode::Unexpected)))
1556 }
1557 }
1558 .boxed()
1559 };
1560 let poison = ProcedureAdapter {
1561 data: "poison".to_string(),
1562 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1563 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1564 exec_fn,
1565 rollback_fn: None,
1566 };
1567
1568 let dir = create_temp_dir("poison");
1569 let meta = poison.new_meta(ROOT_ID);
1570
1571 let object_store = test_util::new_object_store(&dir);
1572 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1573 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1574
1575 let ctx = context_with_provider(
1577 meta.id,
1578 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1579 );
1580 runner
1582 .manager_ctx
1583 .procedures
1584 .write()
1585 .unwrap()
1586 .insert(meta.id, runner.meta.clone());
1587
1588 runner.manager_ctx.start();
1589 runner.execute_once(&ctx).await;
1590 let state = runner.meta.state();
1591 assert!(state.is_running(), "{state:?}");
1592
1593 runner.execute_once(&ctx).await;
1594 let state = runner.meta.state();
1595 assert!(state.is_prepare_rollback(), "{state:?}");
1596 assert!(meta.state().is_prepare_rollback());
1597
1598 runner.execute_once(&ctx).await;
1599 let state = runner.meta.state();
1600 assert!(state.is_failed(), "{state:?}");
1601 assert!(meta.state().is_failed());
1602
1603 let procedure_id = runner
1605 .manager_ctx
1606 .poison_manager
1607 .get_poison(&poison_key.to_string())
1608 .await
1609 .unwrap()
1610 .unwrap();
1611
1612 assert_eq!(&procedure_id.to_string(), ROOT_ID);
1614 }
1615
1616 #[tokio::test]
1617 async fn test_execute_exceed_max_retry_after_set_poison() {
1618 common_telemetry::init_default_ut_logging();
1619 let mut times = 0;
1620 let poison_key = PoisonKey::new("table/1024");
1621 let moved_poison_key = poison_key.clone();
1622 let exec_fn = move |ctx: Context| {
1623 times += 1;
1624 let poison_key = moved_poison_key.clone();
1625 async move {
1626 if times == 1 {
1627 Ok(Status::executing(true))
1628 } else {
1629 ctx.provider
1631 .try_put_poison(&poison_key, ctx.procedure_id)
1632 .await
1633 .unwrap();
1634 Err(Error::retry_later_and_clean_poisons(MockError::new(
1635 StatusCode::Unexpected,
1636 )))
1637 }
1638 }
1639 .boxed()
1640 };
1641 let poison = ProcedureAdapter {
1642 data: "poison".to_string(),
1643 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1644 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1645 exec_fn,
1646 rollback_fn: None,
1647 };
1648
1649 let dir = create_temp_dir("exceed_max_after_set_poison");
1650 let meta = poison.new_meta(ROOT_ID);
1651 let object_store = test_util::new_object_store(&dir);
1652 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1653 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
1654 runner.manager_ctx.start();
1655 runner.exponential_builder = ExponentialBuilder::default()
1656 .with_min_delay(Duration::from_millis(1))
1657 .with_max_times(3);
1658 let ctx = context_with_provider(
1660 meta.id,
1661 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1662 );
1663 runner
1665 .manager_ctx
1666 .procedures
1667 .write()
1668 .unwrap()
1669 .insert(meta.id, runner.meta.clone());
1670 runner.execute_once_with_retry(&ctx).await;
1672 let err = meta.state().error().unwrap().clone();
1673 assert_matches!(&*err, Error::RetryTimesExceeded { .. });
1674
1675 let procedure_id = runner
1677 .manager_ctx
1678 .poison_manager
1679 .get_poison(&poison_key.to_string())
1680 .await
1681 .unwrap();
1682 assert_eq!(procedure_id, None);
1683 }
1684
1685 #[tokio::test]
1686 async fn test_execute_poisoned() {
1687 let mut times = 0;
1688 let poison_key = PoisonKey::new("table/1024");
1689 let moved_poison_key = poison_key.clone();
1690 let exec_fn = move |ctx: Context| {
1691 times += 1;
1692 let poison_key = moved_poison_key.clone();
1693 async move {
1694 if times == 1 {
1695 Ok(Status::executing(true))
1696 } else {
1697 ctx.provider
1699 .try_put_poison(&poison_key, ctx.procedure_id)
1700 .await
1701 .unwrap();
1702 Ok(Status::Poisoned {
1703 keys: PoisonKeys::new(vec![poison_key.clone()]),
1704 error: Error::external(MockError::new(StatusCode::Unexpected)),
1705 })
1706 }
1707 }
1708 .boxed()
1709 };
1710 let poison = ProcedureAdapter {
1711 data: "poison".to_string(),
1712 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1713 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1714 exec_fn,
1715 rollback_fn: None,
1716 };
1717
1718 let dir = create_temp_dir("poison");
1719 let meta = poison.new_meta(ROOT_ID);
1720
1721 let object_store = test_util::new_object_store(&dir);
1722 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1723 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1724
1725 let ctx = context_with_provider(
1727 meta.id,
1728 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1729 );
1730 runner
1732 .manager_ctx
1733 .procedures
1734 .write()
1735 .unwrap()
1736 .insert(meta.id, runner.meta.clone());
1737
1738 runner.manager_ctx.start();
1739 runner.execute_once(&ctx).await;
1740 let state = runner.meta.state();
1741 assert!(state.is_running(), "{state:?}");
1742
1743 runner.execute_once(&ctx).await;
1744 let state = runner.meta.state();
1745 assert!(state.is_poisoned(), "{state:?}");
1746 assert!(meta.state().is_poisoned());
1747 check_files(
1748 &object_store,
1749 &procedure_store,
1750 ctx.procedure_id,
1751 &["0000000000.step"],
1752 )
1753 .await;
1754
1755 let procedure_id = runner
1757 .manager_ctx
1758 .poison_manager
1759 .get_poison(&poison_key.to_string())
1760 .await
1761 .unwrap()
1762 .unwrap();
1763
1764 assert_eq!(procedure_id, ROOT_ID);
1766 }
1767
1768 fn test_procedure_with_dynamic_lock(
1769 shared_atomic_value: Arc<AtomicU64>,
1770 id: u64,
1771 ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1772 let exec_fn = move |ctx: Context| {
1773 let moved_shared_atomic_value = shared_atomic_value.clone();
1774 let moved_ctx = ctx.clone();
1775 async move {
1776 debug!("Acquiring write lock, id: {}", id);
1777 let key = StringKey::Exclusive("test_lock".to_string());
1778 let guard = moved_ctx.provider.acquire_lock(&key).await;
1779 debug!("Acquired write lock, id: {}", id);
1780 let millis = rand::rng().random_range(10..=50);
1781 tokio::time::sleep(Duration::from_millis(millis)).await;
1782 let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1783 moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1784 debug!("Dropping write lock, id: {}", id);
1785 drop(guard);
1786
1787 Ok(Status::done())
1788 }
1789 .boxed()
1790 };
1791
1792 let adapter = ProcedureAdapter {
1793 data: "dynamic_lock".to_string(),
1794 lock_key: LockKey::new_exclusive([]),
1795 poison_keys: PoisonKeys::new([]),
1796 exec_fn,
1797 rollback_fn: None,
1798 };
1799 let meta = adapter.new_meta(ROOT_ID);
1800
1801 (Box::new(adapter), meta)
1802 }
1803
1804 #[tokio::test(flavor = "multi_thread")]
1805 async fn test_execute_with_dynamic_lock() {
1806 common_telemetry::init_default_ut_logging();
1807 let shared_atomic_value = Arc::new(AtomicU64::new(0));
1808 let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1809 let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1810
1811 let dir = create_temp_dir("dynamic_lock");
1812 let object_store = test_util::new_object_store(&dir);
1813 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1814 let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
1815 let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
1816 let ctx1 = context_with_provider(
1817 meta1.id,
1818 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1819 );
1820 let ctx2 = context_with_provider(
1821 meta2.id,
1822 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1824 );
1825 let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
1826 join_all(tasks).await;
1827 assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
1828 }
1829}