1use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_event_recorder::EventRecorderRef;
21use common_telemetry::tracing_context::{FutureExt, TracingContext};
22use common_telemetry::{debug, error, info, tracing};
23use rand::Rng;
24use snafu::ResultExt;
25use tokio::time;
26
27use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
28use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
29use crate::procedure::{Output, StringKey};
30use crate::rwlock::OwnedKeyRwLockGuard;
31use crate::store::{ProcedureMessage, ProcedureStore};
32use crate::{
33 BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
34};
35
36struct ProcedureGuard {
38 meta: ProcedureMetaRef,
39 manager_ctx: Arc<ManagerContext>,
40 key_guards: Vec<OwnedKeyRwLockGuard>,
41 finish: bool,
42}
43
44impl ProcedureGuard {
45 fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
47 ProcedureGuard {
48 meta,
49 manager_ctx,
50 key_guards: vec![],
51 finish: false,
52 }
53 }
54
55 fn finish(mut self) {
57 self.finish = true;
58 }
59}
60
61impl Drop for ProcedureGuard {
62 fn drop(&mut self) {
63 if !self.finish {
64 error!("Procedure {} exits unexpectedly", self.meta.id);
65
66 let err = ProcedurePanicSnafu {
70 procedure_id: self.meta.id,
71 }
72 .build();
73 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
74 }
75
76 if let Some(parent_id) = self.meta.parent_id {
78 self.manager_ctx.notify_by_subprocedure(parent_id);
79 }
80
81 while !self.key_guards.is_empty() {
83 self.key_guards.pop();
84 }
85
86 self.manager_ctx
88 .key_lock
89 .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
90 }
91}
92
93pub(crate) struct Runner {
94 pub(crate) meta: ProcedureMetaRef,
95 pub(crate) procedure: BoxedProcedure,
96 pub(crate) manager_ctx: Arc<ManagerContext>,
97 pub(crate) step: u32,
98 pub(crate) exponential_builder: ExponentialBuilder,
99 pub(crate) store: Arc<ProcedureStore>,
100 pub(crate) rolling_back: bool,
101 pub(crate) event_recorder: Option<EventRecorderRef>,
102}
103
104impl Runner {
105 pub(crate) fn running(&self) -> bool {
107 self.manager_ctx.running()
108 }
109
110 pub(crate) async fn run(mut self) {
112 let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
114
115 info!(
116 "Runner {}-{} starts",
117 self.procedure.type_name(),
118 self.meta.id
119 );
120
121 for key in self.meta.lock_key.keys_to_lock() {
124 let key_guard = match key {
126 StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
127 StringKey::Exclusive(key) => {
128 self.manager_ctx.key_lock.write(key.clone()).await.into()
129 }
130 };
131
132 guard.key_guards.push(key_guard);
133 }
134
135 self.meta.set_start_time_ms();
138 self.execute_procedure_in_loop().await;
139 self.meta.set_end_time_ms();
140
141 guard.finish();
148
149 if self.meta.parent_id.is_none() {
151 let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
152 self.manager_ctx.on_procedures_finish(&procedure_ids);
154
155 if !self.running() {
157 return;
158 }
159
160 for id in procedure_ids {
161 if let Err(e) = self.store.delete_procedure(id).await {
162 error!(
163 e;
164 "Runner {}-{} failed to delete procedure {}",
165 self.procedure.type_name(),
166 self.meta.id,
167 id,
168 );
169 }
170 }
171 }
172
173 info!(
174 "Runner {}-{} exits",
175 self.procedure.type_name(),
176 self.meta.id
177 );
178 }
179
180 async fn execute_procedure_in_loop(&mut self) {
181 let ctx = Context {
182 procedure_id: self.meta.id,
183 provider: self.manager_ctx.clone(),
184 };
185
186 self.rolling_back = false;
187 self.execute_once_with_retry(&ctx).await;
188 }
189
190 async fn execute_once_with_retry(&mut self, ctx: &Context) {
191 let mut retry = self.exponential_builder.build();
192 let mut retry_times = 0;
193
194 let mut rollback = self.exponential_builder.build();
195 let mut rollback_times = 0;
196
197 loop {
198 if !self.running() {
200 self.meta.set_state(ProcedureState::failed(Arc::new(
201 error::ManagerNotStartSnafu {}.build(),
202 )));
203 return;
204 }
205 let state = self.meta.state();
206 match state {
207 ProcedureState::Running => {}
208 ProcedureState::Retrying { error } => {
209 retry_times += 1;
210 if let Some(d) = retry.next() {
211 let millis = d.as_millis() as u64;
212 let noise = rand::rng().random_range(0..(millis / 4) + 1);
214 let d = d.add(Duration::from_millis(noise));
215
216 self.wait_on_err(d, retry_times).await;
217 } else {
218 self.meta
219 .set_state(ProcedureState::prepare_rollback(Arc::new(
220 Error::RetryTimesExceeded {
221 source: error.clone(),
222 procedure_id: self.meta.id,
223 },
224 )));
225 }
226 }
227 ProcedureState::PrepareRollback { error }
228 | ProcedureState::RollingBack { error } => {
229 rollback_times += 1;
230 if let Some(d) = rollback.next() {
231 self.wait_on_err(d, rollback_times).await;
232 } else {
233 let err = Err::<(), Arc<Error>>(error)
234 .context(RollbackTimesExceededSnafu {
235 procedure_id: self.meta.id,
236 })
237 .unwrap_err();
238 self.meta.set_state(ProcedureState::failed(Arc::new(err)));
239 return;
240 }
241 }
242 ProcedureState::Done { .. } => return,
243 ProcedureState::Failed { .. } => return,
244 ProcedureState::Poisoned { .. } => return,
245 }
246 self.execute_once(ctx).await;
247 }
248 }
249
250 async fn clean_poisons(&mut self) -> Result<()> {
251 let mut error = None;
252 for key in self.meta.poison_keys.iter() {
253 let key = key.to_string();
254 if let Err(e) = self
255 .manager_ctx
256 .poison_manager
257 .delete_poison(key, self.meta.id.to_string())
258 .await
259 {
260 error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
261 error = Some(e);
262 }
263 }
264
265 if let Some(e) = error {
267 return Err(e);
268 }
269 Ok(())
270 }
271
272 async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
273 if self.procedure.rollback_supported()
274 && let Err(e) = self.procedure.rollback(ctx).await
275 {
276 self.meta
277 .set_state(ProcedureState::rolling_back(Arc::new(e)));
278 return;
279 }
280 self.meta.set_state(ProcedureState::failed(err));
281 }
282
283 async fn prepare_rollback(&mut self, err: Arc<Error>) {
284 if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
285 self.meta
286 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
287 return;
288 }
289 if self.procedure.rollback_supported() {
290 self.meta.set_state(ProcedureState::rolling_back(err));
291 } else {
292 self.meta.set_state(ProcedureState::failed(err));
293 }
294 }
295
296 async fn execute_once(&mut self, ctx: &Context) {
297 match self.meta.state() {
298 ProcedureState::Running | ProcedureState::Retrying { .. } => {
299 match self.procedure.execute(ctx).await {
300 Ok(status) => {
301 debug!(
302 "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
303 self.procedure.type_name(),
304 self.meta.id,
305 status,
306 status.need_persist(),
307 );
308
309 if !self.running() {
311 self.meta.set_state(ProcedureState::failed(Arc::new(
312 error::ManagerNotStartSnafu {}.build(),
313 )));
314 return;
315 }
316
317 if status.need_clean_poisons()
319 && let Err(e) = self.clean_poisons().await
320 {
321 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
322 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
323 return;
324 }
325
326 if status.need_persist()
327 && let Err(e) = self.persist_procedure().await
328 {
329 error!(e; "Failed to persist procedure: {}", self.meta.id);
330 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
331 return;
332 }
333
334 match status {
335 Status::Executing { .. } => {
336 let prev_state = self.meta.state();
337 if !matches!(prev_state, ProcedureState::Running) {
338 info!(
339 "Set Procedure {}-{} state to running, prev_state: {:?}",
340 self.procedure.type_name(),
341 self.meta.id,
342 prev_state
343 );
344 self.meta.set_state(ProcedureState::Running);
345 }
346 }
347 Status::Suspended { subprocedures, .. } => {
348 let prev_state = self.meta.state();
349 if !matches!(prev_state, ProcedureState::Running) {
350 info!(
351 "Set Procedure {}-{} state to running, prev_state: {:?}",
352 self.procedure.type_name(),
353 self.meta.id,
354 prev_state
355 );
356 self.meta.set_state(ProcedureState::Running);
357 }
358 self.on_suspended(subprocedures).await;
359 }
360 Status::Done { output } => {
361 if let Err(e) = self.commit_procedure().await {
362 error!(e; "Failed to commit procedure: {}", self.meta.id);
363 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
364 return;
365 }
366
367 self.done(output);
368 }
369 Status::Poisoned { error, keys } => {
370 error!(
371 error;
372 "Procedure {}-{} is poisoned, keys: {:?}",
373 self.procedure.type_name(),
374 self.meta.id,
375 keys,
376 );
377 self.meta
378 .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
379 }
380 }
381 }
382 Err(e) => {
383 error!(
384 e;
385 "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
386 self.procedure.type_name(),
387 self.meta.id,
388 e.is_retry_later(),
389 e.need_clean_poisons(),
390 );
391
392 if !self.running() {
394 self.meta.set_state(ProcedureState::failed(Arc::new(
395 error::ManagerNotStartSnafu {}.build(),
396 )));
397 return;
398 }
399
400 if e.need_clean_poisons() {
401 if let Err(e) = self.clean_poisons().await {
402 error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
403 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
404 return;
405 }
406 debug!(
407 "Procedure {}-{} cleaned poisons",
408 self.procedure.type_name(),
409 self.meta.id,
410 );
411 }
412
413 if e.is_retry_later() {
414 self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
415 return;
416 }
417
418 if self.procedure.rollback_supported() {
419 self.meta
420 .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
421 } else {
422 self.meta.set_state(ProcedureState::failed(Arc::new(e)));
423 }
424 }
425 }
426 }
427 ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
428 ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
429 ProcedureState::Failed { .. }
430 | ProcedureState::Done { .. }
431 | ProcedureState::Poisoned { .. } => (),
432 }
433 }
434
435 fn submit_subprocedure(
437 &self,
438 procedure_id: ProcedureId,
439 procedure_state: ProcedureState,
440 procedure: BoxedProcedure,
441 ) {
442 if self.manager_ctx.contains_procedure(procedure_id) {
443 return;
445 }
446
447 let step = 0;
448
449 let meta = Arc::new(ProcedureMeta::new(
450 procedure_id,
451 procedure_state,
452 Some(self.meta.id),
453 procedure.lock_key(),
454 procedure.poison_keys(),
455 procedure.type_name(),
456 self.event_recorder.clone(),
457 procedure.user_metadata(),
458 ));
459 let runner = Runner {
460 meta: meta.clone(),
461 procedure,
462 manager_ctx: self.manager_ctx.clone(),
463 step,
464 exponential_builder: self.exponential_builder,
465 store: self.store.clone(),
466 rolling_back: false,
467 event_recorder: self.event_recorder.clone(),
468 };
469
470 assert!(
474 self.manager_ctx.try_insert_procedure(meta),
475 "Procedure {}-{} submit an existing procedure {}-{}",
476 self.procedure.type_name(),
477 self.meta.id,
478 runner.procedure.type_name(),
479 procedure_id,
480 );
481
482 self.meta.push_child(procedure_id);
484 let parent_id = self.meta.id;
485
486 let tracing_context = TracingContext::from_current_span();
487 let _handle = common_runtime::spawn_global(async move {
488 let span = tracing_context.attach(tracing::info_span!(
489 "LocalManager::submit_subprocedure",
490 procedure_name = %runner.meta.type_name,
491 procedure_id = %runner.meta.id,
492 parent_id = %parent_id,
493 ));
494 runner.run().trace(span).await
498 });
499 }
500
501 async fn wait_on_err(&mut self, d: Duration, i: u64) {
503 info!(
504 "Procedure {}-{} retry for the {} times after {} millis",
505 self.procedure.type_name(),
506 self.meta.id,
507 i,
508 d.as_millis(),
509 );
510 time::sleep(d).await;
511 }
512
513 async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
514 let has_child = !subprocedures.is_empty();
515 for subprocedure in subprocedures {
516 info!(
517 "Procedure {}-{} submit subprocedure {}-{}",
518 self.procedure.type_name(),
519 self.meta.id,
520 subprocedure.procedure.type_name(),
521 subprocedure.id,
522 );
523
524 self.submit_subprocedure(
525 subprocedure.id,
526 ProcedureState::Running,
527 subprocedure.procedure,
528 );
529 }
530
531 info!(
532 "Procedure {}-{} is waiting for subprocedures",
533 self.procedure.type_name(),
534 self.meta.id,
535 );
536
537 if has_child {
539 self.meta.child_notify.notified().await;
540
541 info!(
542 "Procedure {}-{} is waked up",
543 self.procedure.type_name(),
544 self.meta.id,
545 );
546 }
547 }
548
549 async fn persist_procedure(&mut self) -> Result<()> {
550 let type_name = self.procedure.type_name().to_string();
551 let data = self.procedure.dump()?;
552
553 self.store
554 .store_procedure(
555 self.meta.id,
556 self.step,
557 type_name,
558 data,
559 self.meta.parent_id,
560 )
561 .await
562 .map_err(|e| {
563 error!(
564 e; "Failed to persist procedure {}-{}",
565 self.procedure.type_name(),
566 self.meta.id
567 );
568 e
569 })?;
570 self.step += 1;
571 Ok(())
572 }
573
574 async fn commit_procedure(&mut self) -> Result<()> {
575 self.store
576 .commit_procedure(self.meta.id, self.step)
577 .await
578 .map_err(|e| {
579 error!(
580 e; "Failed to commit procedure {}-{}",
581 self.procedure.type_name(),
582 self.meta.id
583 );
584 e
585 })?;
586 self.step += 1;
587 Ok(())
588 }
589
590 async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
591 let type_name = self.procedure.type_name().to_string();
593 let data = self.procedure.dump()?;
594 let message = ProcedureMessage {
595 type_name,
596 data,
597 parent_id: self.meta.parent_id,
598 step: self.step,
599 error: Some(error),
600 };
601 self.store
602 .rollback_procedure(self.meta.id, message)
603 .await
604 .map_err(|e| {
605 error!(
606 e; "Failed to write rollback key for procedure {}-{}",
607 self.procedure.type_name(),
608 self.meta.id
609 );
610 e
611 })?;
612 self.step += 1;
613 Ok(())
614 }
615
616 fn done(&self, output: Option<Output>) {
617 info!(
619 "Procedure {}-{} done",
620 self.procedure.type_name(),
621 self.meta.id,
622 );
623
624 self.meta.set_state(ProcedureState::Done { output });
626 }
627}
628
629#[cfg(test)]
630mod tests {
631 use std::assert_matches::assert_matches;
632 use std::sync::Arc;
633 use std::sync::atomic::{AtomicU64, Ordering};
634
635 use async_trait::async_trait;
636 use common_error::ext::{ErrorExt, PlainError};
637 use common_error::mock::MockError;
638 use common_error::status_code::StatusCode;
639 use common_test_util::temp_dir::create_temp_dir;
640 use futures::future::join_all;
641 use futures_util::FutureExt;
642 use futures_util::future::BoxFuture;
643 use object_store::{EntryMode, ObjectStore};
644 use tokio::sync::mpsc;
645 use tokio::sync::watch::Receiver;
646
647 use super::*;
648 use crate::local::{DynamicKeyLockGuard, test_util};
649 use crate::procedure::PoisonKeys;
650 use crate::store::proc_path;
651 use crate::test_util::InMemoryPoisonStore;
652 use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
653
654 const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
655
656 fn new_runner(
657 meta: ProcedureMetaRef,
658 procedure: BoxedProcedure,
659 store: Arc<ProcedureStore>,
660 ) -> Runner {
661 Runner {
662 meta,
663 procedure,
664 manager_ctx: Arc::new(ManagerContext::new(
665 Arc::new(InMemoryPoisonStore::default()),
666 )),
667 step: 0,
668 exponential_builder: ExponentialBuilder::default(),
669 store,
670 rolling_back: false,
671 event_recorder: None,
672 }
673 }
674
675 async fn check_files(
676 object_store: &ObjectStore,
677 procedure_store: &ProcedureStore,
678 procedure_id: ProcedureId,
679 files: &[&str],
680 ) {
681 let dir = proc_path!(procedure_store, "{procedure_id}/");
682 let lister = object_store.list(&dir).await.unwrap();
683 let mut files_in_dir: Vec<_> = lister
684 .into_iter()
685 .filter(|x| x.metadata().mode() == EntryMode::FILE)
686 .map(|de| de.name().to_string())
687 .collect();
688 files_in_dir.sort_unstable();
689 assert_eq!(files, files_in_dir);
690 }
691
692 fn context_with_provider(
693 procedure_id: ProcedureId,
694 provider: Arc<dyn ContextProvider>,
695 ) -> Context {
696 Context {
697 procedure_id,
698 provider,
699 }
700 }
701
702 fn context_without_provider(procedure_id: ProcedureId) -> Context {
703 struct MockProvider;
704
705 #[async_trait]
706 impl ContextProvider for MockProvider {
707 async fn procedure_state(
708 &self,
709 _procedure_id: ProcedureId,
710 ) -> Result<Option<ProcedureState>> {
711 unimplemented!()
712 }
713
714 async fn procedure_state_receiver(
715 &self,
716 _procedure_id: ProcedureId,
717 ) -> Result<Option<Receiver<ProcedureState>>> {
718 unimplemented!()
719 }
720
721 async fn try_put_poison(
722 &self,
723 _key: &PoisonKey,
724 _procedure_id: ProcedureId,
725 ) -> Result<()> {
726 unimplemented!()
727 }
728
729 async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
730 unimplemented!()
731 }
732 }
733
734 Context {
735 procedure_id,
736 provider: Arc::new(MockProvider),
737 }
738 }
739
740 type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
741
742 struct ProcedureAdapter<F> {
743 data: String,
744 lock_key: LockKey,
745 poison_keys: PoisonKeys,
746 exec_fn: F,
747 rollback_fn: Option<RollbackFn>,
748 }
749
750 impl<F> ProcedureAdapter<F> {
751 fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
752 let mut meta = test_util::procedure_meta_for_test();
753 meta.id = ProcedureId::parse_str(uuid).unwrap();
754 meta.lock_key = self.lock_key.clone();
755 meta.poison_keys = self.poison_keys.clone();
756
757 Arc::new(meta)
758 }
759 }
760
761 #[async_trait]
762 impl<F> Procedure for ProcedureAdapter<F>
763 where
764 F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
765 {
766 fn type_name(&self) -> &str {
767 "ProcedureAdapter"
768 }
769
770 async fn execute(&mut self, ctx: &Context) -> Result<Status> {
771 let f = (self.exec_fn)(ctx.clone());
772 f.await
773 }
774
775 async fn rollback(&mut self, ctx: &Context) -> Result<()> {
776 if let Some(f) = &mut self.rollback_fn {
777 return (f)(ctx.clone()).await;
778 }
779 Ok(())
780 }
781
782 fn rollback_supported(&self) -> bool {
783 self.rollback_fn.is_some()
784 }
785
786 fn dump(&self) -> Result<String> {
787 Ok(self.data.clone())
788 }
789
790 fn lock_key(&self) -> LockKey {
791 self.lock_key.clone()
792 }
793
794 fn poison_keys(&self) -> PoisonKeys {
795 self.poison_keys.clone()
796 }
797 }
798
799 async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
800 let mut times = 0;
801 let exec_fn = move |_| {
802 times += 1;
803 async move {
804 if times == 1 {
805 Ok(Status::executing(persist))
806 } else {
807 Ok(Status::done())
808 }
809 }
810 .boxed()
811 };
812 let normal = ProcedureAdapter {
813 data: "normal".to_string(),
814 lock_key: LockKey::single_exclusive("catalog.schema.table"),
815 poison_keys: PoisonKeys::default(),
816 exec_fn,
817 rollback_fn: None,
818 };
819
820 let dir = create_temp_dir("normal");
821 let meta = normal.new_meta(ROOT_ID);
822 let ctx = context_without_provider(meta.id);
823 let object_store = test_util::new_object_store(&dir);
824 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
825 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
826 runner.manager_ctx.start();
827
828 runner.execute_once(&ctx).await;
829 let state = runner.meta.state();
830 assert!(state.is_running(), "{state:?}");
831 check_files(
832 &object_store,
833 &procedure_store,
834 ctx.procedure_id,
835 first_files,
836 )
837 .await;
838
839 runner.execute_once(&ctx).await;
840 let state = runner.meta.state();
841 assert!(state.is_done(), "{state:?}");
842 check_files(
843 &object_store,
844 &procedure_store,
845 ctx.procedure_id,
846 second_files,
847 )
848 .await;
849 }
850
851 #[tokio::test]
852 async fn test_execute_once_normal() {
853 execute_once_normal(
854 true,
855 &["0000000000.step"],
856 &["0000000000.step", "0000000001.commit"],
857 )
858 .await;
859 }
860
861 #[tokio::test]
862 async fn test_execute_once_normal_skip_persist() {
863 execute_once_normal(false, &[], &["0000000000.commit"]).await;
864 }
865
866 #[tokio::test]
867 async fn test_on_suspend_empty() {
868 let exec_fn = move |_| {
869 async move {
870 Ok(Status::Suspended {
871 subprocedures: Vec::new(),
872 persist: false,
873 })
874 }
875 .boxed()
876 };
877 let suspend = ProcedureAdapter {
878 data: "suspend".to_string(),
879 lock_key: LockKey::single_exclusive("catalog.schema.table"),
880 poison_keys: PoisonKeys::default(),
881 exec_fn,
882 rollback_fn: None,
883 };
884
885 let dir = create_temp_dir("suspend");
886 let meta = suspend.new_meta(ROOT_ID);
887 let ctx = context_without_provider(meta.id);
888 let object_store = test_util::new_object_store(&dir);
889 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
890 let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
891 runner.manager_ctx.start();
892
893 runner.execute_once(&ctx).await;
894 let state = runner.meta.state();
895 assert!(state.is_running(), "{state:?}");
896 }
897
898 fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
899 let mut times = 0;
900 let exec_fn = move |_| {
901 times += 1;
902 async move {
903 if times == 1 {
904 time::sleep(Duration::from_millis(200)).await;
905 Ok(Status::executing(true))
906 } else {
907 Ok(Status::done())
908 }
909 }
910 .boxed()
911 };
912 let child = ProcedureAdapter {
913 data: "child".to_string(),
914 lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
915 poison_keys: PoisonKeys::default(),
916 exec_fn,
917 rollback_fn: None,
918 };
919
920 ProcedureWithId {
921 id: procedure_id,
922 procedure: Box::new(child),
923 }
924 }
925
926 #[tokio::test]
927 async fn test_on_suspend_by_subprocedures() {
928 let mut times = 0;
929 let children_ids = [ProcedureId::random(), ProcedureId::random()];
930 let keys = [
931 &[
932 "catalog.schema.table.region-0",
933 "catalog.schema.table.region-1",
934 ],
935 &[
936 "catalog.schema.table.region-2",
937 "catalog.schema.table.region-3",
938 ],
939 ];
940
941 let exec_fn = move |ctx: Context| {
942 times += 1;
943 async move {
944 if times == 1 {
945 Ok(Status::Suspended {
947 subprocedures: children_ids
948 .into_iter()
949 .zip(keys)
950 .map(|(id, key_slice)| new_child_procedure(id, key_slice))
951 .collect(),
952 persist: true,
953 })
954 } else {
955 let mut all_child_done = true;
957 for id in children_ids {
958 let is_not_done = ctx
959 .provider
960 .procedure_state(id)
961 .await
962 .unwrap()
963 .map(|s| !s.is_done())
964 .unwrap_or(true);
965 if is_not_done {
966 all_child_done = false;
967 }
968 }
969 if all_child_done {
970 Ok(Status::done())
971 } else {
972 Ok(Status::Suspended {
974 subprocedures: Vec::new(),
975 persist: false,
976 })
977 }
978 }
979 }
980 .boxed()
981 };
982 let parent = ProcedureAdapter {
983 data: "parent".to_string(),
984 lock_key: LockKey::single_exclusive("catalog.schema.table"),
985 poison_keys: PoisonKeys::default(),
986 exec_fn,
987 rollback_fn: None,
988 };
989
990 let dir = create_temp_dir("parent");
991 let meta = parent.new_meta(ROOT_ID);
992 let procedure_id = meta.id;
993
994 let object_store = test_util::new_object_store(&dir);
995 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
996 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
997 let poison_manager = Arc::new(InMemoryPoisonStore::default());
998 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
999 manager_ctx.start();
1000 assert!(manager_ctx.try_insert_procedure(meta));
1002 runner.manager_ctx = manager_ctx.clone();
1004
1005 runner.run().await;
1006 assert!(manager_ctx.key_lock.is_empty());
1007
1008 for child_id in children_ids {
1010 let state = manager_ctx.state(child_id).unwrap();
1011 assert!(state.is_done(), "{state:?}");
1012 }
1013 let state = manager_ctx.state(procedure_id).unwrap();
1014 assert!(state.is_done(), "{state:?}");
1015 check_files(&object_store, &procedure_store, procedure_id, &[]).await;
1017
1018 tokio::time::sleep(Duration::from_millis(5)).await;
1019 manager_ctx.remove_outdated_meta(Duration::from_millis(1));
1021 assert!(manager_ctx.state(procedure_id).is_none());
1022 assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
1023 for child_id in children_ids {
1024 assert!(manager_ctx.state(child_id).is_none());
1025 }
1026 }
1027
1028 #[tokio::test]
1029 async fn test_running_is_stopped() {
1030 let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
1031 let normal = ProcedureAdapter {
1032 data: "normal".to_string(),
1033 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1034 poison_keys: PoisonKeys::default(),
1035 exec_fn,
1036 rollback_fn: None,
1037 };
1038
1039 let dir = create_temp_dir("test_running_is_stopped");
1040 let meta = normal.new_meta(ROOT_ID);
1041 let ctx = context_without_provider(meta.id);
1042 let object_store = test_util::new_object_store(&dir);
1043 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1044 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1045 runner.manager_ctx.start();
1046
1047 runner.execute_once(&ctx).await;
1048 let state = runner.meta.state();
1049 assert!(state.is_running(), "{state:?}");
1050 check_files(
1051 &object_store,
1052 &procedure_store,
1053 ctx.procedure_id,
1054 &["0000000000.step"],
1055 )
1056 .await;
1057
1058 runner.manager_ctx.stop();
1059 runner.execute_once(&ctx).await;
1060 let state = runner.meta.state();
1061 assert!(state.is_failed(), "{state:?}");
1062 check_files(
1064 &object_store,
1065 &procedure_store,
1066 ctx.procedure_id,
1067 &["0000000000.step"],
1068 )
1069 .await;
1070 }
1071
1072 #[tokio::test]
1073 async fn test_running_is_stopped_on_error() {
1074 let exec_fn =
1075 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1076 let normal = ProcedureAdapter {
1077 data: "fail".to_string(),
1078 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1079 poison_keys: PoisonKeys::default(),
1080 exec_fn,
1081 rollback_fn: None,
1082 };
1083
1084 let dir = create_temp_dir("test_running_is_stopped_on_error");
1085 let meta = normal.new_meta(ROOT_ID);
1086 let ctx = context_without_provider(meta.id);
1087 let object_store = test_util::new_object_store(&dir);
1088 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1089 let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1090 runner.manager_ctx.stop();
1091
1092 runner.execute_once(&ctx).await;
1093 let state = runner.meta.state();
1094 assert!(state.is_failed(), "{state:?}");
1095 check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1097 }
1098
1099 #[tokio::test]
1100 async fn test_execute_on_error() {
1101 let exec_fn =
1102 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1103 let fail = ProcedureAdapter {
1104 data: "fail".to_string(),
1105 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1106 poison_keys: PoisonKeys::default(),
1107 exec_fn,
1108 rollback_fn: None,
1109 };
1110
1111 let dir = create_temp_dir("fail");
1112 let meta = fail.new_meta(ROOT_ID);
1113 let ctx = context_without_provider(meta.id);
1114 let object_store = test_util::new_object_store(&dir);
1115 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1116 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1117 runner.manager_ctx.start();
1118
1119 runner.execute_once(&ctx).await;
1120 let state = runner.meta.state();
1121 assert!(state.is_failed(), "{state:?}");
1122 check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1123 }
1124
1125 #[tokio::test]
1126 async fn test_execute_with_rollback_on_error() {
1127 let exec_fn =
1128 |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1129 let rollback_fn = move |_| async move { Ok(()) }.boxed();
1130 let fail = ProcedureAdapter {
1131 data: "fail".to_string(),
1132 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1133 poison_keys: PoisonKeys::default(),
1134 exec_fn,
1135 rollback_fn: Some(Box::new(rollback_fn)),
1136 };
1137
1138 let dir = create_temp_dir("fail");
1139 let meta = fail.new_meta(ROOT_ID);
1140 let ctx = context_without_provider(meta.id);
1141 let object_store = test_util::new_object_store(&dir);
1142 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1143 let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1144 runner.manager_ctx.start();
1145
1146 runner.execute_once(&ctx).await;
1147 let state = runner.meta.state();
1148 assert!(state.is_prepare_rollback(), "{state:?}");
1149
1150 runner.execute_once(&ctx).await;
1151 let state = runner.meta.state();
1152 assert!(state.is_rolling_back(), "{state:?}");
1153
1154 runner.execute_once(&ctx).await;
1155 let state = runner.meta.state();
1156 assert!(state.is_failed(), "{state:?}");
1157 check_files(
1158 &object_store,
1159 &procedure_store,
1160 ctx.procedure_id,
1161 &["0000000000.rollback"],
1162 )
1163 .await;
1164 }
1165
1166 #[tokio::test]
1167 async fn test_execute_on_retry_later_error() {
1168 let mut times = 0;
1169
1170 let exec_fn = move |_| {
1171 times += 1;
1172 async move {
1173 if times == 1 {
1174 Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1175 } else if times == 2 {
1176 Ok(Status::executing(false))
1177 } else {
1178 Ok(Status::done())
1179 }
1180 }
1181 .boxed()
1182 };
1183
1184 let retry_later = ProcedureAdapter {
1185 data: "retry_later".to_string(),
1186 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1187 poison_keys: PoisonKeys::default(),
1188 exec_fn,
1189 rollback_fn: None,
1190 };
1191
1192 let dir = create_temp_dir("retry_later");
1193 let meta = retry_later.new_meta(ROOT_ID);
1194 let ctx = context_without_provider(meta.id);
1195 let object_store = test_util::new_object_store(&dir);
1196 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1197 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1198 runner.manager_ctx.start();
1199 runner.execute_once(&ctx).await;
1200 let state = runner.meta.state();
1201 assert!(state.is_retrying(), "{state:?}");
1202
1203 runner.execute_once(&ctx).await;
1204 let state = runner.meta.state();
1205 assert!(state.is_running(), "{state:?}");
1206
1207 runner.execute_once(&ctx).await;
1208 let state = runner.meta.state();
1209 assert!(state.is_done(), "{state:?}");
1210 assert!(meta.state().is_done());
1211 check_files(
1212 &object_store,
1213 &procedure_store,
1214 ctx.procedure_id,
1215 &["0000000000.commit"],
1216 )
1217 .await;
1218 }
1219
1220 #[tokio::test(flavor = "multi_thread")]
1221 async fn test_execute_on_retry_later_error_with_child() {
1222 common_telemetry::init_default_ut_logging();
1223 let mut times = 0;
1224 let child_id = ProcedureId::random();
1225
1226 let exec_fn = move |_| {
1227 times += 1;
1228 async move {
1229 debug!("times: {}", times);
1230 if times == 1 {
1231 Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1232 } else if times == 2 {
1233 let exec_fn = |_| {
1234 async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1235 .boxed()
1236 };
1237 let fail = ProcedureAdapter {
1238 data: "fail".to_string(),
1239 lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1240 poison_keys: PoisonKeys::default(),
1241 exec_fn,
1242 rollback_fn: None,
1243 };
1244
1245 Ok(Status::Suspended {
1246 subprocedures: vec![ProcedureWithId {
1247 id: child_id,
1248 procedure: Box::new(fail),
1249 }],
1250 persist: true,
1251 })
1252 } else {
1253 Ok(Status::done())
1254 }
1255 }
1256 .boxed()
1257 };
1258
1259 let retry_later = ProcedureAdapter {
1260 data: "retry_later".to_string(),
1261 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1262 poison_keys: PoisonKeys::default(),
1263 exec_fn,
1264 rollback_fn: None,
1265 };
1266
1267 let dir = create_temp_dir("retry_later");
1268 let meta = retry_later.new_meta(ROOT_ID);
1269 let ctx = context_without_provider(meta.id);
1270 let object_store = test_util::new_object_store(&dir);
1271 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1272 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1273 runner.manager_ctx.start();
1274 debug!("execute_once 1");
1275 runner.execute_once(&ctx).await;
1276 let state = runner.meta.state();
1277 assert!(state.is_retrying(), "{state:?}");
1278
1279 let moved_meta = meta.clone();
1280 tokio::spawn(async move {
1281 moved_meta.child_notify.notify_one();
1282 });
1283 runner.execute_once(&ctx).await;
1284 let state = runner.meta.state();
1285 assert!(state.is_running(), "{state:?}");
1286
1287 runner.execute_once(&ctx).await;
1288 let state = runner.meta.state();
1289 assert!(state.is_done(), "{state:?}");
1290 assert!(meta.state().is_done());
1291 check_files(
1292 &object_store,
1293 &procedure_store,
1294 ctx.procedure_id,
1295 &["0000000000.step", "0000000001.commit"],
1296 )
1297 .await;
1298 }
1299
1300 #[tokio::test]
1301 async fn test_execute_exceed_max_retry_later() {
1302 let exec_fn =
1303 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1304
1305 let exceed_max_retry_later = ProcedureAdapter {
1306 data: "exceed_max_retry_later".to_string(),
1307 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1308 poison_keys: PoisonKeys::default(),
1309 exec_fn,
1310 rollback_fn: None,
1311 };
1312
1313 let dir = create_temp_dir("exceed_max_retry_later");
1314 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1315 let object_store = test_util::new_object_store(&dir);
1316 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1317 let mut runner = new_runner(
1318 meta.clone(),
1319 Box::new(exceed_max_retry_later),
1320 procedure_store,
1321 );
1322 runner.manager_ctx.start();
1323
1324 runner.exponential_builder = ExponentialBuilder::default()
1325 .with_min_delay(Duration::from_millis(1))
1326 .with_max_times(3);
1327
1328 runner.execute_procedure_in_loop().await;
1330 let err = meta.state().error().unwrap().to_string();
1331 assert!(err.contains("Procedure retry exceeded max times"));
1332 }
1333
1334 #[tokio::test]
1335 async fn test_rollback_exceed_max_retry_later() {
1336 let exec_fn =
1337 |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1338 let rollback_fn = move |_| {
1339 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1340 };
1341 let exceed_max_retry_later = ProcedureAdapter {
1342 data: "exceed_max_rollback".to_string(),
1343 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1344 poison_keys: PoisonKeys::default(),
1345 exec_fn,
1346 rollback_fn: Some(Box::new(rollback_fn)),
1347 };
1348
1349 let dir = create_temp_dir("exceed_max_rollback");
1350 let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1351 let object_store = test_util::new_object_store(&dir);
1352 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1353 let mut runner = new_runner(
1354 meta.clone(),
1355 Box::new(exceed_max_retry_later),
1356 procedure_store,
1357 );
1358 runner.manager_ctx.start();
1359 runner.exponential_builder = ExponentialBuilder::default()
1360 .with_min_delay(Duration::from_millis(1))
1361 .with_max_times(3);
1362
1363 runner.execute_procedure_in_loop().await;
1365 let err = meta.state().error().unwrap().to_string();
1366 assert!(err.contains("Procedure rollback exceeded max times"));
1367 }
1368
1369 #[tokio::test]
1370 async fn test_rollback_after_retry_fail() {
1371 let exec_fn = move |_| {
1372 async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1373 };
1374
1375 let (tx, mut rx) = mpsc::channel(1);
1376 let rollback_fn = move |_| {
1377 let tx = tx.clone();
1378 async move {
1379 tx.send(()).await.unwrap();
1380 Ok(())
1381 }
1382 .boxed()
1383 };
1384 let retry_later = ProcedureAdapter {
1385 data: "rollback_after_retry_fail".to_string(),
1386 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1387 poison_keys: PoisonKeys::default(),
1388 exec_fn,
1389 rollback_fn: Some(Box::new(rollback_fn)),
1390 };
1391
1392 let dir = create_temp_dir("retry_later");
1393 let meta = retry_later.new_meta(ROOT_ID);
1394 let ctx = context_without_provider(meta.id);
1395 let object_store = test_util::new_object_store(&dir);
1396 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1397 let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1398 runner.manager_ctx.start();
1399 runner.exponential_builder = ExponentialBuilder::default()
1400 .with_min_delay(Duration::from_millis(1))
1401 .with_max_times(3);
1402 runner.execute_procedure_in_loop().await;
1404 rx.recv().await.unwrap();
1405 assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1406 check_files(
1407 &object_store,
1408 &procedure_store,
1409 ctx.procedure_id,
1410 &["0000000000.rollback"],
1411 )
1412 .await;
1413 }
1414
1415 #[tokio::test]
1416 async fn test_child_error() {
1417 let mut times = 0;
1418 let child_id = ProcedureId::random();
1419 common_telemetry::init_default_ut_logging();
1420 let exec_fn = move |ctx: Context| {
1421 times += 1;
1422 async move {
1423 if times == 1 {
1424 let exec_fn = |_| {
1426 async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1427 .boxed()
1428 };
1429 let fail = ProcedureAdapter {
1430 data: "fail".to_string(),
1431 lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1432 poison_keys: PoisonKeys::default(),
1433 exec_fn,
1434 rollback_fn: None,
1435 };
1436
1437 Ok(Status::Suspended {
1438 subprocedures: vec![ProcedureWithId {
1439 id: child_id,
1440 procedure: Box::new(fail),
1441 }],
1442 persist: true,
1443 })
1444 } else {
1445 let state = ctx.provider.procedure_state(child_id).await.unwrap();
1447 let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1448 if is_failed {
1449 Err(Error::from_error_ext(PlainError::new(
1451 "subprocedure failed".to_string(),
1452 StatusCode::Unexpected,
1453 )))
1454 } else {
1455 Ok(Status::Suspended {
1457 subprocedures: Vec::new(),
1458 persist: false,
1459 })
1460 }
1461 }
1462 }
1463 .boxed()
1464 };
1465 let parent = ProcedureAdapter {
1466 data: "parent".to_string(),
1467 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1468 poison_keys: PoisonKeys::default(),
1469 exec_fn,
1470 rollback_fn: None,
1471 };
1472
1473 let dir = create_temp_dir("child_err");
1474 let meta = parent.new_meta(ROOT_ID);
1475
1476 let object_store = test_util::new_object_store(&dir);
1477 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1478 let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1479 let poison_manager = Arc::new(InMemoryPoisonStore::default());
1480 let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1481 manager_ctx.start();
1482 assert!(manager_ctx.try_insert_procedure(meta.clone()));
1484 runner.manager_ctx = manager_ctx.clone();
1486
1487 runner.run().await;
1489 assert!(manager_ctx.key_lock.is_empty());
1490 let err = meta.state().error().unwrap().output_msg();
1491 assert!(err.contains("subprocedure failed"), "{err}");
1492 }
1493
1494 #[tokio::test]
1495 async fn test_execute_with_clean_poisons() {
1496 common_telemetry::init_default_ut_logging();
1497 let mut times = 0;
1498 let poison_key = PoisonKey::new("table/1024");
1499 let moved_poison_key = poison_key.clone();
1500 let exec_fn = move |ctx: Context| {
1501 times += 1;
1502 let poison_key = moved_poison_key.clone();
1503 async move {
1504 if times == 1 {
1505 ctx.provider
1507 .try_put_poison(&poison_key, ctx.procedure_id)
1508 .await
1509 .unwrap();
1510
1511 Ok(Status::executing(true))
1512 } else {
1513 Ok(Status::executing_with_clean_poisons(true))
1514 }
1515 }
1516 .boxed()
1517 };
1518 let poison = ProcedureAdapter {
1519 data: "poison".to_string(),
1520 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1521 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1522 exec_fn,
1523 rollback_fn: None,
1524 };
1525
1526 let dir = create_temp_dir("clean_poisons");
1527 let meta = poison.new_meta(ROOT_ID);
1528
1529 let object_store = test_util::new_object_store(&dir);
1530 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1531 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1532
1533 let ctx = context_with_provider(
1535 meta.id,
1536 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1537 );
1538 runner
1540 .manager_ctx
1541 .procedures
1542 .write()
1543 .unwrap()
1544 .insert(meta.id, runner.meta.clone());
1545
1546 runner.manager_ctx.start();
1547 runner.execute_once(&ctx).await;
1548 let state = runner.meta.state();
1549 assert!(state.is_running(), "{state:?}");
1550
1551 let procedure_id = runner
1552 .manager_ctx
1553 .poison_manager
1554 .get_poison(&poison_key.to_string())
1555 .await
1556 .unwrap();
1557 assert!(procedure_id.is_some());
1559
1560 runner.execute_once(&ctx).await;
1561 let state = runner.meta.state();
1562 assert!(state.is_running(), "{state:?}");
1563
1564 let procedure_id = runner
1565 .manager_ctx
1566 .poison_manager
1567 .get_poison(&poison_key.to_string())
1568 .await
1569 .unwrap();
1570 assert!(procedure_id.is_none());
1572 }
1573
1574 #[tokio::test]
1575 async fn test_execute_error_with_clean_poisons() {
1576 common_telemetry::init_default_ut_logging();
1577 let mut times = 0;
1578 let poison_key = PoisonKey::new("table/1024");
1579 let moved_poison_key = poison_key.clone();
1580 let exec_fn = move |ctx: Context| {
1581 times += 1;
1582 let poison_key = moved_poison_key.clone();
1583 async move {
1584 if times == 1 {
1585 ctx.provider
1587 .try_put_poison(&poison_key, ctx.procedure_id)
1588 .await
1589 .unwrap();
1590
1591 Ok(Status::executing(true))
1592 } else {
1593 Err(Error::external_and_clean_poisons(MockError::new(
1594 StatusCode::Unexpected,
1595 )))
1596 }
1597 }
1598 .boxed()
1599 };
1600 let poison = ProcedureAdapter {
1601 data: "poison".to_string(),
1602 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1603 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1604 exec_fn,
1605 rollback_fn: None,
1606 };
1607
1608 let dir = create_temp_dir("error_with_clean_poisons");
1609 let meta = poison.new_meta(ROOT_ID);
1610
1611 let object_store = test_util::new_object_store(&dir);
1612 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1613 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1614
1615 let ctx = context_with_provider(
1617 meta.id,
1618 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1619 );
1620 runner
1622 .manager_ctx
1623 .procedures
1624 .write()
1625 .unwrap()
1626 .insert(meta.id, runner.meta.clone());
1627
1628 runner.manager_ctx.start();
1629 runner.execute_once(&ctx).await;
1630 let state = runner.meta.state();
1631 assert!(state.is_running(), "{state:?}");
1632
1633 let procedure_id = runner
1634 .manager_ctx
1635 .poison_manager
1636 .get_poison(&poison_key.to_string())
1637 .await
1638 .unwrap();
1639 assert!(procedure_id.is_some());
1641
1642 runner.execute_once(&ctx).await;
1643 let state = runner.meta.state();
1644 assert!(state.is_failed(), "{state:?}");
1645
1646 let procedure_id = runner
1647 .manager_ctx
1648 .poison_manager
1649 .get_poison(&poison_key.to_string())
1650 .await
1651 .unwrap();
1652 assert!(procedure_id.is_none());
1654 }
1655
1656 #[tokio::test]
1657 async fn test_execute_failed_after_set_poison() {
1658 let mut times = 0;
1659 let poison_key = PoisonKey::new("table/1024");
1660 let moved_poison_key = poison_key.clone();
1661 let exec_fn = move |ctx: Context| {
1662 times += 1;
1663 let poison_key = moved_poison_key.clone();
1664 async move {
1665 if times == 1 {
1666 Ok(Status::executing(true))
1667 } else {
1668 ctx.provider
1670 .try_put_poison(&poison_key, ctx.procedure_id)
1671 .await
1672 .unwrap();
1673 Err(Error::external(MockError::new(StatusCode::Unexpected)))
1674 }
1675 }
1676 .boxed()
1677 };
1678 let poison = ProcedureAdapter {
1679 data: "poison".to_string(),
1680 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1681 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1682 exec_fn,
1683 rollback_fn: None,
1684 };
1685
1686 let dir = create_temp_dir("poison");
1687 let meta = poison.new_meta(ROOT_ID);
1688
1689 let object_store = test_util::new_object_store(&dir);
1690 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1691 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1692
1693 let ctx = context_with_provider(
1695 meta.id,
1696 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1697 );
1698 runner
1700 .manager_ctx
1701 .procedures
1702 .write()
1703 .unwrap()
1704 .insert(meta.id, runner.meta.clone());
1705
1706 runner.manager_ctx.start();
1707 runner.execute_once(&ctx).await;
1708 let state = runner.meta.state();
1709 assert!(state.is_running(), "{state:?}");
1710
1711 runner.execute_once(&ctx).await;
1712 let state = runner.meta.state();
1713 assert!(state.is_failed(), "{state:?}");
1714 assert!(meta.state().is_failed());
1715
1716 let procedure_id = runner
1718 .manager_ctx
1719 .poison_manager
1720 .get_poison(&poison_key.to_string())
1721 .await
1722 .unwrap()
1723 .unwrap();
1724
1725 assert_eq!(&procedure_id.clone(), ROOT_ID);
1727 }
1728
1729 #[tokio::test]
1730 async fn test_execute_exceed_max_retry_after_set_poison() {
1731 common_telemetry::init_default_ut_logging();
1732 let mut times = 0;
1733 let poison_key = PoisonKey::new("table/1024");
1734 let moved_poison_key = poison_key.clone();
1735 let exec_fn = move |ctx: Context| {
1736 times += 1;
1737 let poison_key = moved_poison_key.clone();
1738 async move {
1739 if times == 1 {
1740 Ok(Status::executing(true))
1741 } else {
1742 ctx.provider
1744 .try_put_poison(&poison_key, ctx.procedure_id)
1745 .await
1746 .unwrap();
1747 Err(Error::retry_later_and_clean_poisons(MockError::new(
1748 StatusCode::Unexpected,
1749 )))
1750 }
1751 }
1752 .boxed()
1753 };
1754 let poison = ProcedureAdapter {
1755 data: "poison".to_string(),
1756 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1757 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1758 exec_fn,
1759 rollback_fn: None,
1760 };
1761
1762 let dir = create_temp_dir("exceed_max_after_set_poison");
1763 let meta = poison.new_meta(ROOT_ID);
1764 let object_store = test_util::new_object_store(&dir);
1765 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1766 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
1767 runner.manager_ctx.start();
1768 runner.exponential_builder = ExponentialBuilder::default()
1769 .with_min_delay(Duration::from_millis(1))
1770 .with_max_times(3);
1771 let ctx = context_with_provider(
1773 meta.id,
1774 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1775 );
1776 runner
1778 .manager_ctx
1779 .procedures
1780 .write()
1781 .unwrap()
1782 .insert(meta.id, runner.meta.clone());
1783 runner.execute_once_with_retry(&ctx).await;
1785 let err = meta.state().error().unwrap().clone();
1786 assert_matches!(&*err, Error::RetryTimesExceeded { .. });
1787
1788 let procedure_id = runner
1790 .manager_ctx
1791 .poison_manager
1792 .get_poison(&poison_key.to_string())
1793 .await
1794 .unwrap();
1795 assert_eq!(procedure_id, None);
1796 }
1797
1798 #[tokio::test]
1799 async fn test_execute_poisoned() {
1800 let mut times = 0;
1801 let poison_key = PoisonKey::new("table/1024");
1802 let moved_poison_key = poison_key.clone();
1803 let exec_fn = move |ctx: Context| {
1804 times += 1;
1805 let poison_key = moved_poison_key.clone();
1806 async move {
1807 if times == 1 {
1808 Ok(Status::executing(true))
1809 } else {
1810 ctx.provider
1812 .try_put_poison(&poison_key, ctx.procedure_id)
1813 .await
1814 .unwrap();
1815 Ok(Status::Poisoned {
1816 keys: PoisonKeys::new(vec![poison_key.clone()]),
1817 error: Error::external(MockError::new(StatusCode::Unexpected)),
1818 })
1819 }
1820 }
1821 .boxed()
1822 };
1823 let poison = ProcedureAdapter {
1824 data: "poison".to_string(),
1825 lock_key: LockKey::single_exclusive("catalog.schema.table"),
1826 poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1827 exec_fn,
1828 rollback_fn: None,
1829 };
1830
1831 let dir = create_temp_dir("poison");
1832 let meta = poison.new_meta(ROOT_ID);
1833
1834 let object_store = test_util::new_object_store(&dir);
1835 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1836 let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1837
1838 let ctx = context_with_provider(
1840 meta.id,
1841 runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1842 );
1843 runner
1845 .manager_ctx
1846 .procedures
1847 .write()
1848 .unwrap()
1849 .insert(meta.id, runner.meta.clone());
1850
1851 runner.manager_ctx.start();
1852 runner.execute_once(&ctx).await;
1853 let state = runner.meta.state();
1854 assert!(state.is_running(), "{state:?}");
1855
1856 runner.execute_once(&ctx).await;
1857 let state = runner.meta.state();
1858 assert!(state.is_poisoned(), "{state:?}");
1859 assert!(meta.state().is_poisoned());
1860 check_files(
1861 &object_store,
1862 &procedure_store,
1863 ctx.procedure_id,
1864 &["0000000000.step"],
1865 )
1866 .await;
1867
1868 let procedure_id = runner
1870 .manager_ctx
1871 .poison_manager
1872 .get_poison(&poison_key.to_string())
1873 .await
1874 .unwrap()
1875 .unwrap();
1876
1877 assert_eq!(procedure_id, ROOT_ID);
1879 }
1880
1881 fn test_procedure_with_dynamic_lock(
1882 shared_atomic_value: Arc<AtomicU64>,
1883 id: u64,
1884 ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1885 let exec_fn = move |ctx: Context| {
1886 let moved_shared_atomic_value = shared_atomic_value.clone();
1887 let moved_ctx = ctx.clone();
1888 async move {
1889 debug!("Acquiring write lock, id: {}", id);
1890 let key = StringKey::Exclusive("test_lock".to_string());
1891 let guard = moved_ctx.provider.acquire_lock(&key).await;
1892 debug!("Acquired write lock, id: {}", id);
1893 let millis = rand::rng().random_range(10..=50);
1894 tokio::time::sleep(Duration::from_millis(millis)).await;
1895 let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1896 moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1897 debug!("Dropping write lock, id: {}", id);
1898 drop(guard);
1899
1900 Ok(Status::done())
1901 }
1902 .boxed()
1903 };
1904
1905 let adapter = ProcedureAdapter {
1906 data: "dynamic_lock".to_string(),
1907 lock_key: LockKey::new_exclusive([]),
1908 poison_keys: PoisonKeys::new([]),
1909 exec_fn,
1910 rollback_fn: None,
1911 };
1912 let meta = adapter.new_meta(ROOT_ID);
1913
1914 (Box::new(adapter), meta)
1915 }
1916
1917 #[tokio::test(flavor = "multi_thread")]
1918 async fn test_execute_with_dynamic_lock() {
1919 common_telemetry::init_default_ut_logging();
1920 let shared_atomic_value = Arc::new(AtomicU64::new(0));
1921 let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1922 let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1923
1924 let dir = create_temp_dir("dynamic_lock");
1925 let object_store = test_util::new_object_store(&dir);
1926 let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1927 let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
1928 let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
1929 let ctx1 = context_with_provider(
1930 meta1.id,
1931 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1932 );
1933 let ctx2 = context_with_provider(
1934 meta2.id,
1935 runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1937 );
1938 let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
1939 join_all(tasks).await;
1940 assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
1941 }
1942}