1use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_event_recorder::EventRecorderRef;
21use common_telemetry::{debug, error, info};
22use rand::Rng;
23use snafu::ResultExt;
24use tokio::time;
25
26use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
27use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
28use crate::procedure::{Output, StringKey};
29use crate::rwlock::OwnedKeyRwLockGuard;
30use crate::store::{ProcedureMessage, ProcedureStore};
31use crate::{
32 BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
33};
34
35struct ProcedureGuard {
37 meta: ProcedureMetaRef,
38 manager_ctx: Arc<ManagerContext>,
39 key_guards: Vec<OwnedKeyRwLockGuard>,
40 finish: bool,
41}
42
43impl ProcedureGuard {
44 fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
46 ProcedureGuard {
47 meta,
48 manager_ctx,
49 key_guards: vec![],
50 finish: false,
51 }
52 }
53
54 fn finish(mut self) {
56 self.finish = true;
57 }
58}
59
60impl Drop for ProcedureGuard {
61 fn drop(&mut self) {
62 if !self.finish {
63 error!("Procedure {} exits unexpectedly", self.meta.id);
64
65 let err = ProcedurePanicSnafu {
69 procedure_id: self.meta.id,
70 }
71 .build();
72 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
73 }
74
75 if let Some(parent_id) = self.meta.parent_id {
77 self.manager_ctx.notify_by_subprocedure(parent_id);
78 }
79
80 while !self.key_guards.is_empty() {
82 self.key_guards.pop();
83 }
84
85 self.manager_ctx
87 .key_lock
88 .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
89 }
90}
91
92pub(crate) struct Runner {
93 pub(crate) meta: ProcedureMetaRef,
94 pub(crate) procedure: BoxedProcedure,
95 pub(crate) manager_ctx: Arc<ManagerContext>,
96 pub(crate) step: u32,
97 pub(crate) exponential_builder: ExponentialBuilder,
98 pub(crate) store: Arc<ProcedureStore>,
99 pub(crate) rolling_back: bool,
100 pub(crate) event_recorder: Option<EventRecorderRef>,
101}
102
103impl Runner {
104 pub(crate) fn running(&self) -> bool {
106 self.manager_ctx.running()
107 }
108
109 pub(crate) async fn run(mut self) {
111 let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
113
114 info!(
115 "Runner {}-{} starts",
116 self.procedure.type_name(),
117 self.meta.id
118 );
119
120 for key in self.meta.lock_key.keys_to_lock() {
123 let key_guard = match key {
125 StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
126 StringKey::Exclusive(key) => {
127 self.manager_ctx.key_lock.write(key.clone()).await.into()
128 }
129 };
130
131 guard.key_guards.push(key_guard);
132 }
133
134 self.meta.set_start_time_ms();
137 self.execute_procedure_in_loop().await;
138 self.meta.set_end_time_ms();
139
140 guard.finish();
147
148 if self.meta.parent_id.is_none() {
150 let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
151 self.manager_ctx.on_procedures_finish(&procedure_ids);
153
154 if !self.running() {
156 return;
157 }
158
159 for id in procedure_ids {
160 if let Err(e) = self.store.delete_procedure(id).await {
161 error!(
162 e;
163 "Runner {}-{} failed to delete procedure {}",
164 self.procedure.type_name(),
165 self.meta.id,
166 id,
167 );
168 }
169 }
170 }
171
172 info!(
173 "Runner {}-{} exits",
174 self.procedure.type_name(),
175 self.meta.id
176 );
177 }
178
179 async fn execute_procedure_in_loop(&mut self) {
180 let ctx = Context {
181 procedure_id: self.meta.id,
182 provider: self.manager_ctx.clone(),
183 };
184
185 self.rolling_back = false;
186 self.execute_once_with_retry(&ctx).await;
187 }
188
189 async fn execute_once_with_retry(&mut self, ctx: &Context) {
190 let mut retry = self.exponential_builder.build();
191 let mut retry_times = 0;
192
193 let mut rollback = self.exponential_builder.build();
194 let mut rollback_times = 0;
195
196 loop {
197 if !self.running() {
199 self.meta.set_state(ProcedureState::failed(Arc::new(
200 error::ManagerNotStartSnafu {}.build(),
201 )));
202 return;
203 }
204 let state = self.meta.state();
205 match state {
206 ProcedureState::Running => {}
207 ProcedureState::Retrying { error } => {
208 retry_times += 1;
209 if let Some(d) = retry.next() {
210 let millis = d.as_millis() as u64;
211 let noise = rand::rng().random_range(0..(millis / 4) + 1);
213 let d = d.add(Duration::from_millis(noise));
214
215 self.wait_on_err(d, retry_times).await;
216 } else {
217 self.meta
218 .set_state(ProcedureState::prepare_rollback(Arc::new(
219 Error::RetryTimesExceeded {
220 source: error.clone(),
221 procedure_id: self.meta.id,
222 },
223 )));
224 }
225 }
226 ProcedureState::PrepareRollback { error }
227 | ProcedureState::RollingBack { error } => {
228 rollback_times += 1;
229 if let Some(d) = rollback.next() {
230 self.wait_on_err(d, rollback_times).await;
231 } else {
232 let err = Err::<(), Arc<Error>>(error)
233 .context(RollbackTimesExceededSnafu {
234 procedure_id: self.meta.id,
235 })
236 .unwrap_err();
237 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
238 return;
239 }
240 }
241 ProcedureState::Done { .. } => return,
242 ProcedureState::Failed { .. } => return,
243 ProcedureState::Poisoned { .. } => return,
244 }
245 self.execute_once(ctx).await;
246 }
247 }
248
249 async fn clean_poisons(&mut self) -> Result<()> {
250 let mut error = None;
251 for key in self.meta.poison_keys.iter() {
252 let key = key.to_string();
253 if let Err(e) = self
254 .manager_ctx
255 .poison_manager
256 .delete_poison(key, self.meta.id.to_string())
257 .await
258 {
259 error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
260 error = Some(e);
261 }
262 }
263
264 if let Some(e) = error {
266 return Err(e);
267 }
268 Ok(())
269 }
270
271 async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
272 if self.procedure.rollback_supported() {
273 if let Err(e) = self.procedure.rollback(ctx).await {
274 self.meta
275 .set_state(ProcedureState::rolling_back(Arc::new(e)));
276 return;
277 }
278 }
279 self.meta.set_state(ProcedureState::failed(err));
280 }
281
282 async fn prepare_rollback(&mut self, err: Arc<Error>) {
283 if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
284 self.meta
285 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
286 return;
287 }
288 if self.procedure.rollback_supported() {
289 self.meta.set_state(ProcedureState::rolling_back(err));
290 } else {
291 self.meta.set_state(ProcedureState::failed(err));
292 }
293 }
294
295 async fn execute_once(&mut self, ctx: &Context) {
296 match self.meta.state() {
297 ProcedureState::Running | ProcedureState::Retrying { .. } => {
298 match self.procedure.execute(ctx).await {
299 Ok(status) => {
300 debug!(
301 "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
302 self.procedure.type_name(),
303 self.meta.id,
304 status,
305 status.need_persist(),
306 );
307
308 if !self.running() {
310 self.meta.set_state(ProcedureState::failed(Arc::new(
311 error::ManagerNotStartSnafu {}.build(),
312 )));
313 return;
314 }
315
316 if status.need_clean_poisons() {
318 if let Err(e) = self.clean_poisons().await {
319 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
320 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
321 return;
322 }
323 }
324
325 if status.need_persist() {
326 if let Err(e) = self.persist_procedure().await {
327 error!(e; "Failed to persist procedure: {}", self.meta.id);
328 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
329 return;
330 }
331 }
332
333 match status {
334 Status::Executing { .. } => {}
335 Status::Suspended { subprocedures, .. } => {
336 self.on_suspended(subprocedures).await;
337 }
338 Status::Done { output } => {
339 if let Err(e) = self.commit_procedure().await {
340 error!(e; "Failed to commit procedure: {}", self.meta.id);
341 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
342 return;
343 }
344
345 self.done(output);
346 }
347 Status::Poisoned { error, keys } => {
348 error!(
349 error;
350 "Procedure {}-{} is poisoned, keys: {:?}",
351 self.procedure.type_name(),
352 self.meta.id,
353 keys,
354 );
355 self.meta
356 .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
357 }
358 }
359 }
360 Err(e) => {
361 error!(
362 e;
363 "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
364 self.procedure.type_name(),
365 self.meta.id,
366 e.is_retry_later(),
367 e.need_clean_poisons(),
368 );
369
370 if !self.running() {
372 self.meta.set_state(ProcedureState::failed(Arc::new(
373 error::ManagerNotStartSnafu {}.build(),
374 )));
375 return;
376 }
377
378 if e.need_clean_poisons() {
379 if let Err(e) = self.clean_poisons().await {
380 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
381 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
382 return;
383 }
384 debug!(
385 "Procedure {}-{} cleaned poisons",
386 self.procedure.type_name(),
387 self.meta.id,
388 );
389 }
390
391 if e.is_retry_later() {
392 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
393 return;
394 }
395
396 self.meta
397 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
398 }
399 }
400 }
401 ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
402 ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
403 ProcedureState::Failed { .. }
404 | ProcedureState::Done { .. }
405 | ProcedureState::Poisoned { .. } => (),
406 }
407 }
408
409 fn submit_subprocedure(
411 &self,
412 procedure_id: ProcedureId,
413 procedure_state: ProcedureState,
414 procedure: BoxedProcedure,
415 ) {
416 if self.manager_ctx.contains_procedure(procedure_id) {
417 return;
419 }
420
421 let step = 0;
422
423 let meta = Arc::new(ProcedureMeta::new(
424 procedure_id,
425 procedure_state,
426 Some(self.meta.id),
427 procedure.lock_key(),
428 procedure.poison_keys(),
429 procedure.type_name(),
430 self.event_recorder.clone(),
431 procedure.user_metadata(),
432 ));
433 let runner = Runner {
434 meta: meta.clone(),
435 procedure,
436 manager_ctx: self.manager_ctx.clone(),
437 step,
438 exponential_builder: self.exponential_builder,
439 store: self.store.clone(),
440 rolling_back: false,
441 event_recorder: self.event_recorder.clone(),
442 };
443
444 assert!(
448 self.manager_ctx.try_insert_procedure(meta),
449 "Procedure {}-{} submit an existing procedure {}-{}",
450 self.procedure.type_name(),
451 self.meta.id,
452 runner.procedure.type_name(),
453 procedure_id,
454 );
455
456 self.meta.push_child(procedure_id);
458
459 let _handle = common_runtime::spawn_global(async move {
460 runner.run().await
462 });
463 }
464
465 async fn wait_on_err(&mut self, d: Duration, i: u64) {
467 info!(
468 "Procedure {}-{} retry for the {} times after {} millis",
469 self.procedure.type_name(),
470 self.meta.id,
471 i,
472 d.as_millis(),
473 );
474 time::sleep(d).await;
475 }
476
477 async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
478 let has_child = !subprocedures.is_empty();
479 for subprocedure in subprocedures {
480 info!(
481 "Procedure {}-{} submit subprocedure {}-{}",
482 self.procedure.type_name(),
483 self.meta.id,
484 subprocedure.procedure.type_name(),
485 subprocedure.id,
486 );
487
488 self.submit_subprocedure(
489 subprocedure.id,
490 ProcedureState::Running,
491 subprocedure.procedure,
492 );
493 }
494
495 info!(
496 "Procedure {}-{} is waiting for subprocedures",
497 self.procedure.type_name(),
498 self.meta.id,
499 );
500
501 if has_child {
503 self.meta.child_notify.notified().await;
504
505 info!(
506 "Procedure {}-{} is waked up",
507 self.procedure.type_name(),
508 self.meta.id,
509 );
510 }
511 }
512
513 async fn persist_procedure(&mut self) -> Result<()> {
514 let type_name = self.procedure.type_name().to_string();
515 let data = self.procedure.dump()?;
516
517 self.store
518 .store_procedure(
519 self.meta.id,
520 self.step,
521 type_name,
522 data,
523 self.meta.parent_id,
524 )
525 .await
526 .map_err(|e| {
527 error!(
528 e; "Failed to persist procedure {}-{}",
529 self.procedure.type_name(),
530 self.meta.id
531 );
532 e
533 })?;
534 self.step += 1;
535 Ok(())
536 }
537
538 async fn commit_procedure(&mut self) -> Result<()> {
539 self.store
540 .commit_procedure(self.meta.id, self.step)
541 .await
542 .map_err(|e| {
543 error!(
544 e; "Failed to commit procedure {}-{}",
545 self.procedure.type_name(),
546 self.meta.id
547 );
548 e
549 })?;
550 self.step += 1;
551 Ok(())
552 }
553
554 async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
555 let type_name = self.procedure.type_name().to_string();
557 let data = self.procedure.dump()?;
558 let message = ProcedureMessage {
559 type_name,
560 data,
561 parent_id: self.meta.parent_id,
562 step: self.step,
563 error: Some(error),
564 };
565 self.store
566 .rollback_procedure(self.meta.id, message)
567 .await
568 .map_err(|e| {
569 error!(
570 e; "Failed to write rollback key for procedure {}-{}",
571 self.procedure.type_name(),
572 self.meta.id
573 );
574 e
575 })?;
576 self.step += 1;
577 Ok(())
578 }
579
580 fn done(&self, output: Option<Output>) {
581 info!(
583 "Procedure {}-{} done",
584 self.procedure.type_name(),
585 self.meta.id,
586 );
587
588 self.meta.set_state(ProcedureState::Done { output });
590 }
591}
592
593#[cfg(test)]
594mod tests {
595 use std::assert_matches::assert_matches;
596 use std::sync::atomic::{AtomicU64, Ordering};
597 use std::sync::Arc;
598
599 use async_trait::async_trait;
600 use common_error::ext::{ErrorExt, PlainError};
601 use common_error::mock::MockError;
602 use common_error::status_code::StatusCode;
603 use common_test_util::temp_dir::create_temp_dir;
604 use futures::future::join_all;
605 use futures_util::future::BoxFuture;
606 use futures_util::FutureExt;
607 use object_store::{EntryMode, ObjectStore};
608 use tokio::sync::mpsc;
609 use tokio::sync::watch::Receiver;
610
611 use super::*;
612 use crate::local::{test_util, DynamicKeyLockGuard};
613 use crate::procedure::PoisonKeys;
614 use crate::store::proc_path;
615 use crate::test_util::InMemoryPoisonStore;
616 use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
617
618 const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
619
620 fn new_runner(
621 meta: ProcedureMetaRef,
622 procedure: BoxedProcedure,
623 store: Arc<ProcedureStore>,
624 ) -> Runner {
625 Runner {
626 meta,
627 procedure,
628 manager_ctx: Arc::new(ManagerContext::new(
629 Arc::new(InMemoryPoisonStore::default()),
630 )),
631 step: 0,
632 exponential_builder: ExponentialBuilder::default(),
633 store,
634 rolling_back: false,
635 event_recorder: None,
636 }
637 }
638
639 async fn check_files(
640 object_store: &ObjectStore,
641 procedure_store: &ProcedureStore,
642 procedure_id: ProcedureId,
643 files: &[&str],
644 ) {
645 let dir = proc_path!(procedure_store, "{procedure_id}/");
646 let lister = object_store.list(&dir).await.unwrap();
647 let mut files_in_dir: Vec<_> = lister
648 .into_iter()
649 .filter(|x| x.metadata().mode() == EntryMode::FILE)
650 .map(|de| de.name().to_string())
651 .collect();
652 files_in_dir.sort_unstable();
653 assert_eq!(files, files_in_dir);
654 }
655
656 fn context_with_provider(
657 procedure_id: ProcedureId,
658 provider: Arc<dyn ContextProvider>,
659 ) -> Context {
660 Context {
661 procedure_id,
662 provider,
663 }
664 }
665
666 fn context_without_provider(procedure_id: ProcedureId) -> Context {
667 struct MockProvider;
668
669 #[async_trait]
670 impl ContextProvider for MockProvider {
671 async fn procedure_state(
672 &self,
673 _procedure_id: ProcedureId,
674 ) -> Result<Option<ProcedureState>> {
675 unimplemented!()
676 }
677
678 async fn procedure_state_receiver(
679 &self,
680 _procedure_id: ProcedureId,
681 ) -> Result<Option<Receiver<ProcedureState>>> {
682 unimplemented!()
683 }
684
685 async fn try_put_poison(
686 &self,
687 _key: &PoisonKey,
688 _procedure_id: ProcedureId,
689 ) -> Result<()> {
690 unimplemented!()
691 }
692
693 async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
694 unimplemented!()
695 }
696 }
697
698 Context {
699 procedure_id,
700 provider: Arc::new(MockProvider),
701 }
702 }
703
704 type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
705
706 struct ProcedureAdapter<F> {
707 data: String,
708 lock_key: LockKey,
709 poison_keys: PoisonKeys,
710 exec_fn: F,
711 rollback_fn: Option<RollbackFn>,
712 }
713
714 impl<F> ProcedureAdapter<F> {
715 fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
716 let mut meta = test_util::procedure_meta_for_test();
717 meta.id = ProcedureId::parse_str(uuid).unwrap();
718 meta.lock_key = self.lock_key.clone();
719 meta.poison_keys = self.poison_keys.clone();
720
721 Arc::new(meta)
722 }
723 }
724
725 #[async_trait]
726 impl<F> Procedure for ProcedureAdapter<F>
727 where
728 F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
729 {
730 fn type_name(&self) -> &str {
731 "ProcedureAdapter"
732 }
733
734 async fn execute(&mut self, ctx: &Context) -> Result<Status> {
735 let f = (self.exec_fn)(ctx.clone());
736 f.await
737 }
738
739 async fn rollback(&mut self, ctx: &Context) -> Result<()> {
740 if let Some(f) = &mut self.rollback_fn {
741 return (f)(ctx.clone()).await;
742 }
743 Ok(())
744 }
745
746 fn rollback_supported(&self) -> bool {
747 self.rollback_fn.is_some()
748 }
749
750 fn dump(&self) -> Result<String> {
751 Ok(self.data.clone())
752 }
753
754 fn lock_key(&self) -> LockKey {
755 self.lock_key.clone()
756 }
757
758 fn poison_keys(&self) -> PoisonKeys {
759 self.poison_keys.clone()
760 }
761 }
762
763 async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
764 let mut times = 0;
765 let exec_fn = move |_| {
766 times += 1;
767 async move {
768 if times == 1 {
769 Ok(Status::executing(persist))
770 } else {
771 Ok(Status::done())
772 }
773 }
774 .boxed()
775 };
776 let normal = ProcedureAdapter {
777 data: "normal".to_string(),
778 lock_key: LockKey::single_exclusive("catalog.schema.table"),
779 poison_keys: PoisonKeys::default(),
780 exec_fn,
781 rollback_fn: None,
782 };
783
784 let dir = create_temp_dir("normal");
785 let meta = normal.new_meta(ROOT_ID);
786 let ctx = context_without_provider(meta.id);
787 let object_store = test_util::new_object_store(&dir);
788 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
789 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
790 runner.manager_ctx.start();
791
792 runner.execute_once(&ctx).await;
793 let state = runner.meta.state();
794 assert!(state.is_running(), "{state:?}");
795 check_files(
796 &object_store,
797 &procedure_store,
798 ctx.procedure_id,
799 first_files,
800 )
801 .await;
802
803 runner.execute_once(&ctx).await;
804 let state = runner.meta.state();
805 assert!(state.is_done(), "{state:?}");
806 check_files(
807 &object_store,
808 &procedure_store,
809 ctx.procedure_id,
810 second_files,
811 )
812 .await;
813 }
814
815 #[tokio::test]
816 async fn test_execute_once_normal() {
817 execute_once_normal(
818 true,
819 &["0000000000.step"],
820 &["0000000000.step", "0000000001.commit"],
821 )
822 .await;
823 }
824
825 #[tokio::test]
826 async fn test_execute_once_normal_skip_persist() {
827 execute_once_normal(false, &[], &["0000000000.commit"]).await;
828 }
829
830 #[tokio::test]
831 async fn test_on_suspend_empty() {
832 let exec_fn = move |_| {
833 async move {
834 Ok(Status::Suspended {
835 subprocedures: Vec::new(),
836 persist: false,
837 })
838 }
839 .boxed()
840 };
841 let suspend = ProcedureAdapter {
842 data: "suspend".to_string(),
843 lock_key: LockKey::single_exclusive("catalog.schema.table"),
844 poison_keys: PoisonKeys::default(),
845 exec_fn,
846 rollback_fn: None,
847 };
848
849 let dir = create_temp_dir("suspend");
850 let meta = suspend.new_meta(ROOT_ID);
851 let ctx = context_without_provider(meta.id);
852 let object_store = test_util::new_object_store(&dir);
853 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
854 let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
855 runner.manager_ctx.start();
856
857 runner.execute_once(&ctx).await;
858 let state = runner.meta.state();
859 assert!(state.is_running(), "{state:?}");
860 }
861
862 fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
863 let mut times = 0;
864 let exec_fn = move |_| {
865 times += 1;
866 async move {
867 if times == 1 {
868 time::sleep(Duration::from_millis(200)).await;
869 Ok(Status::executing(true))
870 } else {
871 Ok(Status::done())
872 }
873 }
874 .boxed()
875 };
876 let child = ProcedureAdapter {
877 data: "child".to_string(),
878 lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
879 poison_keys: PoisonKeys::default(),
880 exec_fn,
881 rollback_fn: None,
882 };
883
884 ProcedureWithId {
885 id: procedure_id,
886 procedure: Box::new(child),
887 }
888 }
889
890 #[tokio::test]
891 async fn test_on_suspend_by_subprocedures() {
892 let mut times = 0;
893 let children_ids = [ProcedureId::random(), ProcedureId::random()];
894 let keys = [
895 &[
896 "catalog.schema.table.region-0",
897 "catalog.schema.table.region-1",
898 ],
899 &[
900 "catalog.schema.table.region-2",
901 "catalog.schema.table.region-3",
902 ],
903 ];
904
905 let exec_fn = move |ctx: Context| {
906 times += 1;
907 async move {
908 if times == 1 {
909 Ok(Status::Suspended {
911 subprocedures: children_ids
912 .into_iter()
913 .zip(keys)
914 .map(|(id, key_slice)| new_child_procedure(id, key_slice))
915 .collect(),
916 persist: true,
917 })
918 } else {
919 let mut all_child_done = true;
921 for id in children_ids {
922 let is_not_done = ctx
923 .provider
924 .procedure_state(id)
925 .await
926 .unwrap()
927 .map(|s| !s.is_done())
928 .unwrap_or(true);
929 if is_not_done {
930 all_child_done = false;
931 }
932 }
933 if all_child_done {
934 Ok(Status::done())
935 } else {
936 Ok(Status::Suspended {
938 subprocedures: Vec::new(),
939 persist: false,
940 })
941 }
942 }
943 }
944 .boxed()
945 };
946 let parent = ProcedureAdapter {
947 data: "parent".to_string(),
948 lock_key: LockKey::single_exclusive("catalog.schema.table"),
949 poison_keys: PoisonKeys::default(),
950 exec_fn,
951 rollback_fn: None,
952 };
953
954 let dir = create_temp_dir("parent");
955 let meta = parent.new_meta(ROOT_ID);
956 let procedure_id = meta.id;
957
958 let object_store = test_util::new_object_store(&dir);
959 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
960 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
961 let poison_manager = Arc::new(InMemoryPoisonStore::default());
962 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
963 manager_ctx.start();
964 assert!(manager_ctx.try_insert_procedure(meta));
966 runner.manager_ctx = manager_ctx.clone();
968
969 runner.run().await;
970 assert!(manager_ctx.key_lock.is_empty());
971
972 for child_id in children_ids {
974 let state = manager_ctx.state(child_id).unwrap();
975 assert!(state.is_done(), "{state:?}");
976 }
977 let state = manager_ctx.state(procedure_id).unwrap();
978 assert!(state.is_done(), "{state:?}");
979 check_files(&object_store, &procedure_store, procedure_id, &[]).await;
981
982 tokio::time::sleep(Duration::from_millis(5)).await;
983 manager_ctx.remove_outdated_meta(Duration::from_millis(1));
985 assert!(manager_ctx.state(procedure_id).is_none());
986 assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
987 for child_id in children_ids {
988 assert!(manager_ctx.state(child_id).is_none());
989 }
990 }
991
992 #[tokio::test]
993 async fn test_running_is_stopped() {
994 let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
995 let normal = ProcedureAdapter {
996 data: "normal".to_string(),
997 lock_key: LockKey::single_exclusive("catalog.schema.table"),
998 poison_keys: PoisonKeys::default(),
999 exec_fn,
1000 rollback_fn: None,
1001 };
1002
1003 let dir = create_temp_dir("test_running_is_stopped");
1004 let meta = normal.new_meta(ROOT_ID);
1005 let ctx = context_without_provider(meta.id);
1006 let object_store = test_util::new_object_store(&dir);
1007 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1008 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1009 runner.manager_ctx.start();
1010
1011 runner.execute_once(&ctx).await;
1012 let state = runner.meta.state();
1013 assert!(state.is_running(), "{state:?}");
1014 check_files(
1015 &object_store,
1016 &procedure_store,
1017 ctx.procedure_id,
1018 &["0000000000.step"],
1019 )
1020 .await;
1021
1022 runner.manager_ctx.stop();
1023 runner.execute_once(&ctx).await;
1024 let state = runner.meta.state();
1025 assert!(state.is_failed(), "{state:?}");
1026 check_files(
1028 &object_store,
1029 &procedure_store,
1030 ctx.procedure_id,
1031 &["0000000000.step"],
1032 )
1033 .await;
1034 }
1035
1036 #[tokio::test]
1037 async fn test_running_is_stopped_on_error() {
1038 let exec_fn =
1039 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1040 let normal = ProcedureAdapter {
1041 data: "fail".to_string(),
1042 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1043 poison_keys: PoisonKeys::default(),
1044 exec_fn,
1045 rollback_fn: None,
1046 };
1047
1048 let dir = create_temp_dir("test_running_is_stopped_on_error");
1049 let meta = normal.new_meta(ROOT_ID);
1050 let ctx = context_without_provider(meta.id);
1051 let object_store = test_util::new_object_store(&dir);
1052 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1053 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1054 runner.manager_ctx.stop();
1055
1056 runner.execute_once(&ctx).await;
1057 let state = runner.meta.state();
1058 assert!(state.is_failed(), "{state:?}");
1059 check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1061 }
1062
1063 #[tokio::test]
1064 async fn test_execute_on_error() {
1065 let exec_fn =
1066 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1067 let fail = ProcedureAdapter {
1068 data: "fail".to_string(),
1069 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1070 poison_keys: PoisonKeys::default(),
1071 exec_fn,
1072 rollback_fn: None,
1073 };
1074
1075 let dir = create_temp_dir("fail");
1076 let meta = fail.new_meta(ROOT_ID);
1077 let ctx = context_without_provider(meta.id);
1078 let object_store = test_util::new_object_store(&dir);
1079 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1080 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1081 runner.manager_ctx.start();
1082
1083 runner.execute_once(&ctx).await;
1084 let state = runner.meta.state();
1085 assert!(state.is_prepare_rollback(), "{state:?}");
1086
1087 runner.execute_once(&ctx).await;
1088 let state = runner.meta.state();
1089 assert!(state.is_failed(), "{state:?}");
1090 check_files(
1091 &object_store,
1092 &procedure_store,
1093 ctx.procedure_id,
1094 &["0000000000.rollback"],
1095 )
1096 .await;
1097 }
1098
1099 #[tokio::test]
1100 async fn test_execute_with_rollback_on_error() {
1101 let exec_fn =
1102 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1103 let rollback_fn = move |_| async move { Ok(()) }.boxed();
1104 let fail = ProcedureAdapter {
1105 data: "fail".to_string(),
1106 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1107 poison_keys: PoisonKeys::default(),
1108 exec_fn,
1109 rollback_fn: Some(Box::new(rollback_fn)),
1110 };
1111
1112 let dir = create_temp_dir("fail");
1113 let meta = fail.new_meta(ROOT_ID);
1114 let ctx = context_without_provider(meta.id);
1115 let object_store = test_util::new_object_store(&dir);
1116 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1117 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1118 runner.manager_ctx.start();
1119
1120 runner.execute_once(&ctx).await;
1121 let state = runner.meta.state();
1122 assert!(state.is_prepare_rollback(), "{state:?}");
1123
1124 runner.execute_once(&ctx).await;
1125 let state = runner.meta.state();
1126 assert!(state.is_rolling_back(), "{state:?}");
1127
1128 runner.execute_once(&ctx).await;
1129 let state = runner.meta.state();
1130 assert!(state.is_failed(), "{state:?}");
1131 check_files(
1132 &object_store,
1133 &procedure_store,
1134 ctx.procedure_id,
1135 &["0000000000.rollback"],
1136 )
1137 .await;
1138 }
1139
1140 #[tokio::test]
1141 async fn test_execute_on_retry_later_error() {
1142 let mut times = 0;
1143
1144 let exec_fn = move |_| {
1145 times += 1;
1146 async move {
1147 if times == 1 {
1148 Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1149 } else {
1150 Ok(Status::done())
1151 }
1152 }
1153 .boxed()
1154 };
1155
1156 let retry_later = ProcedureAdapter {
1157 data: "retry_later".to_string(),
1158 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1159 poison_keys: PoisonKeys::default(),
1160 exec_fn,
1161 rollback_fn: None,
1162 };
1163
1164 let dir = create_temp_dir("retry_later");
1165 let meta = retry_later.new_meta(ROOT_ID);
1166 let ctx = context_without_provider(meta.id);
1167 let object_store = test_util::new_object_store(&dir);
1168 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1169 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1170 runner.manager_ctx.start();
1171 runner.execute_once(&ctx).await;
1172 let state = runner.meta.state();
1173 assert!(state.is_retrying(), "{state:?}");
1174
1175 runner.execute_once(&ctx).await;
1176 let state = runner.meta.state();
1177 assert!(state.is_done(), "{state:?}");
1178 assert!(meta.state().is_done());
1179 check_files(
1180 &object_store,
1181 &procedure_store,
1182 ctx.procedure_id,
1183 &["0000000000.commit"],
1184 )
1185 .await;
1186 }
1187
1188 #[tokio::test]
1189 async fn test_execute_exceed_max_retry_later() {
1190 let exec_fn =
1191 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1192
1193 let exceed_max_retry_later = ProcedureAdapter {
1194 data: "exceed_max_retry_later".to_string(),
1195 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1196 poison_keys: PoisonKeys::default(),
1197 exec_fn,
1198 rollback_fn: None,
1199 };
1200
1201 let dir = create_temp_dir("exceed_max_retry_later");
1202 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1203 let object_store = test_util::new_object_store(&dir);
1204 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1205 let mut runner = new_runner(
1206 meta.clone(),
1207 Box::new(exceed_max_retry_later),
1208 procedure_store,
1209 );
1210 runner.manager_ctx.start();
1211
1212 runner.exponential_builder = ExponentialBuilder::default()
1213 .with_min_delay(Duration::from_millis(1))
1214 .with_max_times(3);
1215
1216 runner.execute_procedure_in_loop().await;
1218 let err = meta.state().error().unwrap().to_string();
1219 assert!(err.contains("Procedure retry exceeded max times"));
1220 }
1221
1222 #[tokio::test]
1223 async fn test_rollback_exceed_max_retry_later() {
1224 let exec_fn =
1225 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1226 let rollback_fn = move |_| {
1227 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1228 };
1229 let exceed_max_retry_later = ProcedureAdapter {
1230 data: "exceed_max_rollback".to_string(),
1231 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1232 poison_keys: PoisonKeys::default(),
1233 exec_fn,
1234 rollback_fn: Some(Box::new(rollback_fn)),
1235 };
1236
1237 let dir = create_temp_dir("exceed_max_rollback");
1238 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1239 let object_store = test_util::new_object_store(&dir);
1240 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1241 let mut runner = new_runner(
1242 meta.clone(),
1243 Box::new(exceed_max_retry_later),
1244 procedure_store,
1245 );
1246 runner.manager_ctx.start();
1247 runner.exponential_builder = ExponentialBuilder::default()
1248 .with_min_delay(Duration::from_millis(1))
1249 .with_max_times(3);
1250
1251 runner.execute_procedure_in_loop().await;
1253 let err = meta.state().error().unwrap().to_string();
1254 assert!(err.contains("Procedure rollback exceeded max times"));
1255 }
1256
1257 #[tokio::test]
1258 async fn test_rollback_after_retry_fail() {
1259 let exec_fn = move |_| {
1260 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1261 };
1262
1263 let (tx, mut rx) = mpsc::channel(1);
1264 let rollback_fn = move |_| {
1265 let tx = tx.clone();
1266 async move {
1267 tx.send(()).await.unwrap();
1268 Ok(())
1269 }
1270 .boxed()
1271 };
1272 let retry_later = ProcedureAdapter {
1273 data: "rollback_after_retry_fail".to_string(),
1274 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1275 poison_keys: PoisonKeys::default(),
1276 exec_fn,
1277 rollback_fn: Some(Box::new(rollback_fn)),
1278 };
1279
1280 let dir = create_temp_dir("retry_later");
1281 let meta = retry_later.new_meta(ROOT_ID);
1282 let ctx = context_without_provider(meta.id);
1283 let object_store = test_util::new_object_store(&dir);
1284 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1285 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1286 runner.manager_ctx.start();
1287 runner.exponential_builder = ExponentialBuilder::default()
1288 .with_min_delay(Duration::from_millis(1))
1289 .with_max_times(3);
1290 runner.execute_procedure_in_loop().await;
1292 rx.recv().await.unwrap();
1293 assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1294 check_files(
1295 &object_store,
1296 &procedure_store,
1297 ctx.procedure_id,
1298 &["0000000000.rollback"],
1299 )
1300 .await;
1301 }
1302
1303 #[tokio::test]
1304 async fn test_child_error() {
1305 let mut times = 0;
1306 let child_id = ProcedureId::random();
1307
1308 let exec_fn = move |ctx: Context| {
1309 times += 1;
1310 async move {
1311 if times == 1 {
1312 let exec_fn = |_| {
1314 async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1315 .boxed()
1316 };
1317 let fail = ProcedureAdapter {
1318 data: "fail".to_string(),
1319 lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1320 poison_keys: PoisonKeys::default(),
1321 exec_fn,
1322 rollback_fn: None,
1323 };
1324
1325 Ok(Status::Suspended {
1326 subprocedures: vec![ProcedureWithId {
1327 id: child_id,
1328 procedure: Box::new(fail),
1329 }],
1330 persist: true,
1331 })
1332 } else {
1333 let state = ctx.provider.procedure_state(child_id).await.unwrap();
1335 let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1336 if is_failed {
1337 Err(Error::from_error_ext(PlainError::new(
1339 "subprocedure failed".to_string(),
1340 StatusCode::Unexpected,
1341 )))
1342 } else {
1343 Ok(Status::Suspended {
1345 subprocedures: Vec::new(),
1346 persist: false,
1347 })
1348 }
1349 }
1350 }
1351 .boxed()
1352 };
1353 let parent = ProcedureAdapter {
1354 data: "parent".to_string(),
1355 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1356 poison_keys: PoisonKeys::default(),
1357 exec_fn,
1358 rollback_fn: None,
1359 };
1360
1361 let dir = create_temp_dir("child_err");
1362 let meta = parent.new_meta(ROOT_ID);
1363
1364 let object_store = test_util::new_object_store(&dir);
1365 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1366 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1367 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1368 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1369 manager_ctx.start();
1370 assert!(manager_ctx.try_insert_procedure(meta.clone()));
1372 runner.manager_ctx = manager_ctx.clone();
1374
1375 runner.run().await;
1377 assert!(manager_ctx.key_lock.is_empty());
1378 let err = meta.state().error().unwrap().output_msg();
1379 assert!(err.contains("subprocedure failed"), "{err}");
1380 }
1381
1382 #[tokio::test]
1383 async fn test_execute_with_clean_poisons() {
1384 common_telemetry::init_default_ut_logging();
1385 let mut times = 0;
1386 let poison_key = PoisonKey::new("table/1024");
1387 let moved_poison_key = poison_key.clone();
1388 let exec_fn = move |ctx: Context| {
1389 times += 1;
1390 let poison_key = moved_poison_key.clone();
1391 async move {
1392 if times == 1 {
1393 ctx.provider
1395 .try_put_poison(&poison_key, ctx.procedure_id)
1396 .await
1397 .unwrap();
1398
1399 Ok(Status::executing(true))
1400 } else {
1401 Ok(Status::executing_with_clean_poisons(true))
1402 }
1403 }
1404 .boxed()
1405 };
1406 let poison = ProcedureAdapter {
1407 data: "poison".to_string(),
1408 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1409 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1410 exec_fn,
1411 rollback_fn: None,
1412 };
1413
1414 let dir = create_temp_dir("clean_poisons");
1415 let meta = poison.new_meta(ROOT_ID);
1416
1417 let object_store = test_util::new_object_store(&dir);
1418 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1419 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1420
1421 let ctx = context_with_provider(
1423 meta.id,
1424 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1425 );
1426 runner
1428 .manager_ctx
1429 .procedures
1430 .write()
1431 .unwrap()
1432 .insert(meta.id, runner.meta.clone());
1433
1434 runner.manager_ctx.start();
1435 runner.execute_once(&ctx).await;
1436 let state = runner.meta.state();
1437 assert!(state.is_running(), "{state:?}");
1438
1439 let procedure_id = runner
1440 .manager_ctx
1441 .poison_manager
1442 .get_poison(&poison_key.to_string())
1443 .await
1444 .unwrap();
1445 assert!(procedure_id.is_some());
1447
1448 runner.execute_once(&ctx).await;
1449 let state = runner.meta.state();
1450 assert!(state.is_running(), "{state:?}");
1451
1452 let procedure_id = runner
1453 .manager_ctx
1454 .poison_manager
1455 .get_poison(&poison_key.to_string())
1456 .await
1457 .unwrap();
1458 assert!(procedure_id.is_none());
1460 }
1461
1462 #[tokio::test]
1463 async fn test_execute_error_with_clean_poisons() {
1464 common_telemetry::init_default_ut_logging();
1465 let mut times = 0;
1466 let poison_key = PoisonKey::new("table/1024");
1467 let moved_poison_key = poison_key.clone();
1468 let exec_fn = move |ctx: Context| {
1469 times += 1;
1470 let poison_key = moved_poison_key.clone();
1471 async move {
1472 if times == 1 {
1473 ctx.provider
1475 .try_put_poison(&poison_key, ctx.procedure_id)
1476 .await
1477 .unwrap();
1478
1479 Ok(Status::executing(true))
1480 } else {
1481 Err(Error::external_and_clean_poisons(MockError::new(
1482 StatusCode::Unexpected,
1483 )))
1484 }
1485 }
1486 .boxed()
1487 };
1488 let poison = ProcedureAdapter {
1489 data: "poison".to_string(),
1490 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1491 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1492 exec_fn,
1493 rollback_fn: None,
1494 };
1495
1496 let dir = create_temp_dir("error_with_clean_poisons");
1497 let meta = poison.new_meta(ROOT_ID);
1498
1499 let object_store = test_util::new_object_store(&dir);
1500 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1501 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1502
1503 let ctx = context_with_provider(
1505 meta.id,
1506 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1507 );
1508 runner
1510 .manager_ctx
1511 .procedures
1512 .write()
1513 .unwrap()
1514 .insert(meta.id, runner.meta.clone());
1515
1516 runner.manager_ctx.start();
1517 runner.execute_once(&ctx).await;
1518 let state = runner.meta.state();
1519 assert!(state.is_running(), "{state:?}");
1520
1521 let procedure_id = runner
1522 .manager_ctx
1523 .poison_manager
1524 .get_poison(&poison_key.to_string())
1525 .await
1526 .unwrap();
1527 assert!(procedure_id.is_some());
1529
1530 runner.execute_once(&ctx).await;
1531 let state = runner.meta.state();
1532 assert!(state.is_prepare_rollback(), "{state:?}");
1533
1534 let procedure_id = runner
1535 .manager_ctx
1536 .poison_manager
1537 .get_poison(&poison_key.to_string())
1538 .await
1539 .unwrap();
1540 assert!(procedure_id.is_none());
1542 }
1543
1544 #[tokio::test]
1545 async fn test_execute_failed_after_set_poison() {
1546 let mut times = 0;
1547 let poison_key = PoisonKey::new("table/1024");
1548 let moved_poison_key = poison_key.clone();
1549 let exec_fn = move |ctx: Context| {
1550 times += 1;
1551 let poison_key = moved_poison_key.clone();
1552 async move {
1553 if times == 1 {
1554 Ok(Status::executing(true))
1555 } else {
1556 ctx.provider
1558 .try_put_poison(&poison_key, ctx.procedure_id)
1559 .await
1560 .unwrap();
1561 Err(Error::external(MockError::new(StatusCode::Unexpected)))
1562 }
1563 }
1564 .boxed()
1565 };
1566 let poison = ProcedureAdapter {
1567 data: "poison".to_string(),
1568 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1569 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1570 exec_fn,
1571 rollback_fn: None,
1572 };
1573
1574 let dir = create_temp_dir("poison");
1575 let meta = poison.new_meta(ROOT_ID);
1576
1577 let object_store = test_util::new_object_store(&dir);
1578 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1579 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1580
1581 let ctx = context_with_provider(
1583 meta.id,
1584 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1585 );
1586 runner
1588 .manager_ctx
1589 .procedures
1590 .write()
1591 .unwrap()
1592 .insert(meta.id, runner.meta.clone());
1593
1594 runner.manager_ctx.start();
1595 runner.execute_once(&ctx).await;
1596 let state = runner.meta.state();
1597 assert!(state.is_running(), "{state:?}");
1598
1599 runner.execute_once(&ctx).await;
1600 let state = runner.meta.state();
1601 assert!(state.is_prepare_rollback(), "{state:?}");
1602 assert!(meta.state().is_prepare_rollback());
1603
1604 runner.execute_once(&ctx).await;
1605 let state = runner.meta.state();
1606 assert!(state.is_failed(), "{state:?}");
1607 assert!(meta.state().is_failed());
1608
1609 let procedure_id = runner
1611 .manager_ctx
1612 .poison_manager
1613 .get_poison(&poison_key.to_string())
1614 .await
1615 .unwrap()
1616 .unwrap();
1617
1618 assert_eq!(&procedure_id.to_string(), ROOT_ID);
1620 }
1621
1622 #[tokio::test]
1623 async fn test_execute_exceed_max_retry_after_set_poison() {
1624 common_telemetry::init_default_ut_logging();
1625 let mut times = 0;
1626 let poison_key = PoisonKey::new("table/1024");
1627 let moved_poison_key = poison_key.clone();
1628 let exec_fn = move |ctx: Context| {
1629 times += 1;
1630 let poison_key = moved_poison_key.clone();
1631 async move {
1632 if times == 1 {
1633 Ok(Status::executing(true))
1634 } else {
1635 ctx.provider
1637 .try_put_poison(&poison_key, ctx.procedure_id)
1638 .await
1639 .unwrap();
1640 Err(Error::retry_later_and_clean_poisons(MockError::new(
1641 StatusCode::Unexpected,
1642 )))
1643 }
1644 }
1645 .boxed()
1646 };
1647 let poison = ProcedureAdapter {
1648 data: "poison".to_string(),
1649 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1650 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1651 exec_fn,
1652 rollback_fn: None,
1653 };
1654
1655 let dir = create_temp_dir("exceed_max_after_set_poison");
1656 let meta = poison.new_meta(ROOT_ID);
1657 let object_store = test_util::new_object_store(&dir);
1658 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1659 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
1660 runner.manager_ctx.start();
1661 runner.exponential_builder = ExponentialBuilder::default()
1662 .with_min_delay(Duration::from_millis(1))
1663 .with_max_times(3);
1664 let ctx = context_with_provider(
1666 meta.id,
1667 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1668 );
1669 runner
1671 .manager_ctx
1672 .procedures
1673 .write()
1674 .unwrap()
1675 .insert(meta.id, runner.meta.clone());
1676 runner.execute_once_with_retry(&ctx).await;
1678 let err = meta.state().error().unwrap().clone();
1679 assert_matches!(&*err, Error::RetryTimesExceeded { .. });
1680
1681 let procedure_id = runner
1683 .manager_ctx
1684 .poison_manager
1685 .get_poison(&poison_key.to_string())
1686 .await
1687 .unwrap();
1688 assert_eq!(procedure_id, None);
1689 }
1690
1691 #[tokio::test]
1692 async fn test_execute_poisoned() {
1693 let mut times = 0;
1694 let poison_key = PoisonKey::new("table/1024");
1695 let moved_poison_key = poison_key.clone();
1696 let exec_fn = move |ctx: Context| {
1697 times += 1;
1698 let poison_key = moved_poison_key.clone();
1699 async move {
1700 if times == 1 {
1701 Ok(Status::executing(true))
1702 } else {
1703 ctx.provider
1705 .try_put_poison(&poison_key, ctx.procedure_id)
1706 .await
1707 .unwrap();
1708 Ok(Status::Poisoned {
1709 keys: PoisonKeys::new(vec![poison_key.clone()]),
1710 error: Error::external(MockError::new(StatusCode::Unexpected)),
1711 })
1712 }
1713 }
1714 .boxed()
1715 };
1716 let poison = ProcedureAdapter {
1717 data: "poison".to_string(),
1718 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1719 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1720 exec_fn,
1721 rollback_fn: None,
1722 };
1723
1724 let dir = create_temp_dir("poison");
1725 let meta = poison.new_meta(ROOT_ID);
1726
1727 let object_store = test_util::new_object_store(&dir);
1728 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1729 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1730
1731 let ctx = context_with_provider(
1733 meta.id,
1734 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1735 );
1736 runner
1738 .manager_ctx
1739 .procedures
1740 .write()
1741 .unwrap()
1742 .insert(meta.id, runner.meta.clone());
1743
1744 runner.manager_ctx.start();
1745 runner.execute_once(&ctx).await;
1746 let state = runner.meta.state();
1747 assert!(state.is_running(), "{state:?}");
1748
1749 runner.execute_once(&ctx).await;
1750 let state = runner.meta.state();
1751 assert!(state.is_poisoned(), "{state:?}");
1752 assert!(meta.state().is_poisoned());
1753 check_files(
1754 &object_store,
1755 &procedure_store,
1756 ctx.procedure_id,
1757 &["0000000000.step"],
1758 )
1759 .await;
1760
1761 let procedure_id = runner
1763 .manager_ctx
1764 .poison_manager
1765 .get_poison(&poison_key.to_string())
1766 .await
1767 .unwrap()
1768 .unwrap();
1769
1770 assert_eq!(procedure_id, ROOT_ID);
1772 }
1773
1774 fn test_procedure_with_dynamic_lock(
1775 shared_atomic_value: Arc<AtomicU64>,
1776 id: u64,
1777 ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1778 let exec_fn = move |ctx: Context| {
1779 let moved_shared_atomic_value = shared_atomic_value.clone();
1780 let moved_ctx = ctx.clone();
1781 async move {
1782 debug!("Acquiring write lock, id: {}", id);
1783 let key = StringKey::Exclusive("test_lock".to_string());
1784 let guard = moved_ctx.provider.acquire_lock(&key).await;
1785 debug!("Acquired write lock, id: {}", id);
1786 let millis = rand::rng().random_range(10..=50);
1787 tokio::time::sleep(Duration::from_millis(millis)).await;
1788 let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1789 moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1790 debug!("Dropping write lock, id: {}", id);
1791 drop(guard);
1792
1793 Ok(Status::done())
1794 }
1795 .boxed()
1796 };
1797
1798 let adapter = ProcedureAdapter {
1799 data: "dynamic_lock".to_string(),
1800 lock_key: LockKey::new_exclusive([]),
1801 poison_keys: PoisonKeys::new([]),
1802 exec_fn,
1803 rollback_fn: None,
1804 };
1805 let meta = adapter.new_meta(ROOT_ID);
1806
1807 (Box::new(adapter), meta)
1808 }
1809
1810 #[tokio::test(flavor = "multi_thread")]
1811 async fn test_execute_with_dynamic_lock() {
1812 common_telemetry::init_default_ut_logging();
1813 let shared_atomic_value = Arc::new(AtomicU64::new(0));
1814 let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1815 let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1816
1817 let dir = create_temp_dir("dynamic_lock");
1818 let object_store = test_util::new_object_store(&dir);
1819 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1820 let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
1821 let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
1822 let ctx1 = context_with_provider(
1823 meta1.id,
1824 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1825 );
1826 let ctx2 = context_with_provider(
1827 meta2.id,
1828 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1830 );
1831 let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
1832 join_all(tasks).await;
1833 assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
1834 }
1835}