common_procedure/local/
runner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_telemetry::{debug, error, info};
21use rand::Rng;
22use snafu::ResultExt;
23use tokio::time;
24
25use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
26use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
27use crate::procedure::{Output, StringKey};
28use crate::rwlock::OwnedKeyRwLockGuard;
29use crate::store::{ProcedureMessage, ProcedureStore};
30use crate::{
31    BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
32};
33
34/// A guard to cleanup procedure state.
35struct ProcedureGuard {
36    meta: ProcedureMetaRef,
37    manager_ctx: Arc<ManagerContext>,
38    key_guards: Vec<OwnedKeyRwLockGuard>,
39    finish: bool,
40}
41
42impl ProcedureGuard {
43    /// Returns a new [ProcedureGuard].
44    fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
45        ProcedureGuard {
46            meta,
47            manager_ctx,
48            key_guards: vec![],
49            finish: false,
50        }
51    }
52
53    /// The procedure is finished successfully.
54    fn finish(mut self) {
55        self.finish = true;
56    }
57}
58
59impl Drop for ProcedureGuard {
60    fn drop(&mut self) {
61        if !self.finish {
62            error!("Procedure {} exits unexpectedly", self.meta.id);
63
64            // Set state to failed. This is useful in test as runtime may not abort when the runner task panics.
65            // See https://github.com/tokio-rs/tokio/issues/2002 .
66            // We set set_panic_hook() in the application's main function. But our tests don't have this panic hook.
67            let err = ProcedurePanicSnafu {
68                procedure_id: self.meta.id,
69            }
70            .build();
71            self.meta.set_state(ProcedureState::failed(Arc::new(err)));
72        }
73
74        // Notify parent procedure.
75        if let Some(parent_id) = self.meta.parent_id {
76            self.manager_ctx.notify_by_subprocedure(parent_id);
77        }
78
79        // Drops the key guards in the reverse order.
80        while !self.key_guards.is_empty() {
81            self.key_guards.pop();
82        }
83
84        // Clean the staled locks.
85        self.manager_ctx
86            .key_lock
87            .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
88    }
89}
90
91pub(crate) struct Runner {
92    pub(crate) meta: ProcedureMetaRef,
93    pub(crate) procedure: BoxedProcedure,
94    pub(crate) manager_ctx: Arc<ManagerContext>,
95    pub(crate) step: u32,
96    pub(crate) exponential_builder: ExponentialBuilder,
97    pub(crate) store: Arc<ProcedureStore>,
98    pub(crate) rolling_back: bool,
99}
100
101impl Runner {
102    /// Return `ProcedureManager` is running.
103    pub(crate) fn running(&self) -> bool {
104        self.manager_ctx.running()
105    }
106
107    /// Run the procedure.
108    pub(crate) async fn run(mut self) {
109        // Ensure we can update the procedure state.
110        let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
111
112        info!(
113            "Runner {}-{} starts",
114            self.procedure.type_name(),
115            self.meta.id
116        );
117
118        // TODO(yingwen): Detect recursive locking (and deadlock) if possible. Maybe we could detect
119        // recursive locking by adding a root procedure id to the meta.
120        for key in self.meta.lock_key.keys_to_lock() {
121            // Acquire lock for each key.
122            let key_guard = match key {
123                StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
124                StringKey::Exclusive(key) => {
125                    self.manager_ctx.key_lock.write(key.clone()).await.into()
126                }
127            };
128
129            guard.key_guards.push(key_guard);
130        }
131
132        // Execute the procedure. We need to release the lock whenever the execution
133        // is successful or fail.
134        self.meta.set_start_time_ms();
135        self.execute_procedure_in_loop().await;
136        self.meta.set_end_time_ms();
137
138        // We can't remove the metadata of the procedure now as users and its parent might
139        // need to query its state.
140        // TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store
141        // so we don't need to always store the metadata in memory after the procedure is done.
142
143        // Release locks and notify parent procedure.
144        guard.finish();
145
146        // If this is the root procedure, clean up message cache.
147        if self.meta.parent_id.is_none() {
148            let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
149            // Clean resources.
150            self.manager_ctx.on_procedures_finish(&procedure_ids);
151
152            // If `ProcedureManager` is stopped, it stops the current task immediately without deleting the procedure.
153            if !self.running() {
154                return;
155            }
156
157            for id in procedure_ids {
158                if let Err(e) = self.store.delete_procedure(id).await {
159                    error!(
160                        e;
161                        "Runner {}-{} failed to delete procedure {}",
162                        self.procedure.type_name(),
163                        self.meta.id,
164                        id,
165                    );
166                }
167            }
168        }
169
170        info!(
171            "Runner {}-{} exits",
172            self.procedure.type_name(),
173            self.meta.id
174        );
175    }
176
177    async fn execute_procedure_in_loop(&mut self) {
178        let ctx = Context {
179            procedure_id: self.meta.id,
180            provider: self.manager_ctx.clone(),
181        };
182
183        self.rolling_back = false;
184        self.execute_once_with_retry(&ctx).await;
185    }
186
187    async fn execute_once_with_retry(&mut self, ctx: &Context) {
188        let mut retry = self.exponential_builder.build();
189        let mut retry_times = 0;
190
191        let mut rollback = self.exponential_builder.build();
192        let mut rollback_times = 0;
193
194        loop {
195            // Don't store state if `ProcedureManager` is stopped.
196            if !self.running() {
197                self.meta.set_state(ProcedureState::failed(Arc::new(
198                    error::ManagerNotStartSnafu {}.build(),
199                )));
200                return;
201            }
202            let state = self.meta.state();
203            match state {
204                ProcedureState::Running => {}
205                ProcedureState::Retrying { error } => {
206                    retry_times += 1;
207                    if let Some(d) = retry.next() {
208                        let millis = d.as_millis() as u64;
209                        // Add random noise to the retry delay to avoid retry storms.
210                        let noise = rand::rng().random_range(0..(millis / 4) + 1);
211                        let d = d.add(Duration::from_millis(noise));
212
213                        self.wait_on_err(d, retry_times).await;
214                    } else {
215                        self.meta
216                            .set_state(ProcedureState::prepare_rollback(Arc::new(
217                                Error::RetryTimesExceeded {
218                                    source: error.clone(),
219                                    procedure_id: self.meta.id,
220                                },
221                            )));
222                    }
223                }
224                ProcedureState::PrepareRollback { error }
225                | ProcedureState::RollingBack { error } => {
226                    rollback_times += 1;
227                    if let Some(d) = rollback.next() {
228                        self.wait_on_err(d, rollback_times).await;
229                    } else {
230                        let err = Err::<(), Arc<Error>>(error)
231                            .context(RollbackTimesExceededSnafu {
232                                procedure_id: self.meta.id,
233                            })
234                            .unwrap_err();
235                        self.meta.set_state(ProcedureState::failed(Arc::new(err)));
236                        return;
237                    }
238                }
239                ProcedureState::Done { .. } => return,
240                ProcedureState::Failed { .. } => return,
241                ProcedureState::Poisoned { .. } => return,
242            }
243            self.execute_once(ctx).await;
244        }
245    }
246
247    async fn clean_poisons(&mut self) -> Result<()> {
248        let mut error = None;
249        for key in self.meta.poison_keys.iter() {
250            let key = key.to_string();
251            if let Err(e) = self
252                .manager_ctx
253                .poison_manager
254                .delete_poison(key, self.meta.id.to_string())
255                .await
256            {
257                error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
258                error = Some(e);
259            }
260        }
261
262        // returns the last error if any.
263        if let Some(e) = error {
264            return Err(e);
265        }
266        Ok(())
267    }
268
269    async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
270        if self.procedure.rollback_supported() {
271            if let Err(e) = self.procedure.rollback(ctx).await {
272                self.meta
273                    .set_state(ProcedureState::rolling_back(Arc::new(e)));
274                return;
275            }
276        }
277        self.meta.set_state(ProcedureState::failed(err));
278    }
279
280    async fn prepare_rollback(&mut self, err: Arc<Error>) {
281        if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
282            self.meta
283                .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
284            return;
285        }
286        if self.procedure.rollback_supported() {
287            self.meta.set_state(ProcedureState::rolling_back(err));
288        } else {
289            self.meta.set_state(ProcedureState::failed(err));
290        }
291    }
292
293    async fn execute_once(&mut self, ctx: &Context) {
294        match self.meta.state() {
295            ProcedureState::Running | ProcedureState::Retrying { .. } => {
296                match self.procedure.execute(ctx).await {
297                    Ok(status) => {
298                        debug!(
299                            "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
300                            self.procedure.type_name(),
301                            self.meta.id,
302                            status,
303                            status.need_persist(),
304                        );
305
306                        // Don't store state if `ProcedureManager` is stopped.
307                        if !self.running() {
308                            self.meta.set_state(ProcedureState::failed(Arc::new(
309                                error::ManagerNotStartSnafu {}.build(),
310                            )));
311                            return;
312                        }
313
314                        // Cleans poisons before persist.
315                        if status.need_clean_poisons() {
316                            if let Err(e) = self.clean_poisons().await {
317                                error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
318                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
319                                return;
320                            }
321                        }
322
323                        if status.need_persist() {
324                            if let Err(e) = self.persist_procedure().await {
325                                error!(e; "Failed to persist procedure: {}", self.meta.id);
326                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
327                                return;
328                            }
329                        }
330
331                        match status {
332                            Status::Executing { .. } => {}
333                            Status::Suspended { subprocedures, .. } => {
334                                self.on_suspended(subprocedures).await;
335                            }
336                            Status::Done { output } => {
337                                if let Err(e) = self.commit_procedure().await {
338                                    error!(e; "Failed to commit procedure: {}", self.meta.id);
339                                    self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
340                                    return;
341                                }
342
343                                self.done(output);
344                            }
345                            Status::Poisoned { error, keys } => {
346                                error!(
347                                    error;
348                                    "Procedure {}-{} is poisoned, keys: {:?}",
349                                    self.procedure.type_name(),
350                                    self.meta.id,
351                                    keys,
352                                );
353                                self.meta
354                                    .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
355                            }
356                        }
357                    }
358                    Err(e) => {
359                        error!(
360                            e;
361                            "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
362                            self.procedure.type_name(),
363                            self.meta.id,
364                            e.is_retry_later(),
365                            e.need_clean_poisons(),
366                        );
367
368                        // Don't store state if `ProcedureManager` is stopped.
369                        if !self.running() {
370                            self.meta.set_state(ProcedureState::failed(Arc::new(
371                                error::ManagerNotStartSnafu {}.build(),
372                            )));
373                            return;
374                        }
375
376                        if e.need_clean_poisons() {
377                            if let Err(e) = self.clean_poisons().await {
378                                error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
379                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
380                                return;
381                            }
382                            debug!(
383                                "Procedure {}-{} cleaned poisons",
384                                self.procedure.type_name(),
385                                self.meta.id,
386                            );
387                        }
388
389                        if e.is_retry_later() {
390                            self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
391                            return;
392                        }
393
394                        self.meta
395                            .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
396                    }
397                }
398            }
399            ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
400            ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
401            ProcedureState::Failed { .. }
402            | ProcedureState::Done { .. }
403            | ProcedureState::Poisoned { .. } => (),
404        }
405    }
406
407    /// Submit a subprocedure with specific `procedure_id`.
408    fn submit_subprocedure(
409        &self,
410        procedure_id: ProcedureId,
411        procedure_state: ProcedureState,
412        procedure: BoxedProcedure,
413    ) {
414        if self.manager_ctx.contains_procedure(procedure_id) {
415            // If the parent has already submitted this procedure, don't submit it again.
416            return;
417        }
418
419        let step = 0;
420
421        let meta = Arc::new(ProcedureMeta::new(
422            procedure_id,
423            procedure_state,
424            Some(self.meta.id),
425            procedure.lock_key(),
426            procedure.poison_keys(),
427            procedure.type_name(),
428        ));
429        let runner = Runner {
430            meta: meta.clone(),
431            procedure,
432            manager_ctx: self.manager_ctx.clone(),
433            step,
434            exponential_builder: self.exponential_builder,
435            store: self.store.clone(),
436            rolling_back: false,
437        };
438
439        // Insert the procedure. We already check the procedure existence before inserting
440        // so we add an assertion to ensure the procedure id is unique and no other procedures
441        // using the same procedure id.
442        assert!(
443            self.manager_ctx.try_insert_procedure(meta),
444            "Procedure {}-{} submit an existing procedure {}-{}",
445            self.procedure.type_name(),
446            self.meta.id,
447            runner.procedure.type_name(),
448            procedure_id,
449        );
450
451        // Add the id of the subprocedure to the metadata.
452        self.meta.push_child(procedure_id);
453
454        let _handle = common_runtime::spawn_global(async move {
455            // Run the root procedure.
456            runner.run().await
457        });
458    }
459
460    /// Extend the retry time to wait for the next retry.
461    async fn wait_on_err(&mut self, d: Duration, i: u64) {
462        info!(
463            "Procedure {}-{} retry for the {} times after {} millis",
464            self.procedure.type_name(),
465            self.meta.id,
466            i,
467            d.as_millis(),
468        );
469        time::sleep(d).await;
470    }
471
472    async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
473        let has_child = !subprocedures.is_empty();
474        for subprocedure in subprocedures {
475            info!(
476                "Procedure {}-{} submit subprocedure {}-{}",
477                self.procedure.type_name(),
478                self.meta.id,
479                subprocedure.procedure.type_name(),
480                subprocedure.id,
481            );
482
483            self.submit_subprocedure(
484                subprocedure.id,
485                ProcedureState::Running,
486                subprocedure.procedure,
487            );
488        }
489
490        info!(
491            "Procedure {}-{} is waiting for subprocedures",
492            self.procedure.type_name(),
493            self.meta.id,
494        );
495
496        // Wait for subprocedures.
497        if has_child {
498            self.meta.child_notify.notified().await;
499
500            info!(
501                "Procedure {}-{} is waked up",
502                self.procedure.type_name(),
503                self.meta.id,
504            );
505        }
506    }
507
508    async fn persist_procedure(&mut self) -> Result<()> {
509        let type_name = self.procedure.type_name().to_string();
510        let data = self.procedure.dump()?;
511
512        self.store
513            .store_procedure(
514                self.meta.id,
515                self.step,
516                type_name,
517                data,
518                self.meta.parent_id,
519            )
520            .await
521            .map_err(|e| {
522                error!(
523                    e; "Failed to persist procedure {}-{}",
524                    self.procedure.type_name(),
525                    self.meta.id
526                );
527                e
528            })?;
529        self.step += 1;
530        Ok(())
531    }
532
533    async fn commit_procedure(&mut self) -> Result<()> {
534        self.store
535            .commit_procedure(self.meta.id, self.step)
536            .await
537            .map_err(|e| {
538                error!(
539                    e; "Failed to commit procedure {}-{}",
540                    self.procedure.type_name(),
541                    self.meta.id
542                );
543                e
544            })?;
545        self.step += 1;
546        Ok(())
547    }
548
549    async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
550        // Persists procedure state
551        let type_name = self.procedure.type_name().to_string();
552        let data = self.procedure.dump()?;
553        let message = ProcedureMessage {
554            type_name,
555            data,
556            parent_id: self.meta.parent_id,
557            step: self.step,
558            error: Some(error),
559        };
560        self.store
561            .rollback_procedure(self.meta.id, message)
562            .await
563            .map_err(|e| {
564                error!(
565                    e; "Failed to write rollback key for procedure {}-{}",
566                    self.procedure.type_name(),
567                    self.meta.id
568                );
569                e
570            })?;
571        self.step += 1;
572        Ok(())
573    }
574
575    fn done(&self, output: Option<Output>) {
576        // TODO(yingwen): Add files to remove list.
577        info!(
578            "Procedure {}-{} done",
579            self.procedure.type_name(),
580            self.meta.id,
581        );
582
583        // Mark the state of this procedure to done.
584        self.meta.set_state(ProcedureState::Done { output });
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use std::assert_matches::assert_matches;
591    use std::sync::atomic::{AtomicU64, Ordering};
592    use std::sync::Arc;
593
594    use async_trait::async_trait;
595    use common_error::ext::{ErrorExt, PlainError};
596    use common_error::mock::MockError;
597    use common_error::status_code::StatusCode;
598    use common_test_util::temp_dir::create_temp_dir;
599    use futures::future::join_all;
600    use futures_util::future::BoxFuture;
601    use futures_util::FutureExt;
602    use object_store::{EntryMode, ObjectStore};
603    use tokio::sync::mpsc;
604
605    use super::*;
606    use crate::local::{test_util, DynamicKeyLockGuard};
607    use crate::procedure::PoisonKeys;
608    use crate::store::proc_path;
609    use crate::test_util::InMemoryPoisonStore;
610    use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
611
612    const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
613
614    fn new_runner(
615        meta: ProcedureMetaRef,
616        procedure: BoxedProcedure,
617        store: Arc<ProcedureStore>,
618    ) -> Runner {
619        Runner {
620            meta,
621            procedure,
622            manager_ctx: Arc::new(ManagerContext::new(
623                Arc::new(InMemoryPoisonStore::default()),
624            )),
625            step: 0,
626            exponential_builder: ExponentialBuilder::default(),
627            store,
628            rolling_back: false,
629        }
630    }
631
632    async fn check_files(
633        object_store: &ObjectStore,
634        procedure_store: &ProcedureStore,
635        procedure_id: ProcedureId,
636        files: &[&str],
637    ) {
638        let dir = proc_path!(procedure_store, "{procedure_id}/");
639        let lister = object_store.list(&dir).await.unwrap();
640        let mut files_in_dir: Vec<_> = lister
641            .into_iter()
642            .filter(|x| x.metadata().mode() == EntryMode::FILE)
643            .map(|de| de.name().to_string())
644            .collect();
645        files_in_dir.sort_unstable();
646        assert_eq!(files, files_in_dir);
647    }
648
649    fn context_with_provider(
650        procedure_id: ProcedureId,
651        provider: Arc<dyn ContextProvider>,
652    ) -> Context {
653        Context {
654            procedure_id,
655            provider,
656        }
657    }
658
659    fn context_without_provider(procedure_id: ProcedureId) -> Context {
660        struct MockProvider;
661
662        #[async_trait]
663        impl ContextProvider for MockProvider {
664            async fn procedure_state(
665                &self,
666                _procedure_id: ProcedureId,
667            ) -> Result<Option<ProcedureState>> {
668                unimplemented!()
669            }
670
671            async fn try_put_poison(
672                &self,
673                _key: &PoisonKey,
674                _procedure_id: ProcedureId,
675            ) -> Result<()> {
676                unimplemented!()
677            }
678
679            async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
680                unimplemented!()
681            }
682        }
683
684        Context {
685            procedure_id,
686            provider: Arc::new(MockProvider),
687        }
688    }
689
690    type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
691
692    struct ProcedureAdapter<F> {
693        data: String,
694        lock_key: LockKey,
695        poison_keys: PoisonKeys,
696        exec_fn: F,
697        rollback_fn: Option<RollbackFn>,
698    }
699
700    impl<F> ProcedureAdapter<F> {
701        fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
702            let mut meta = test_util::procedure_meta_for_test();
703            meta.id = ProcedureId::parse_str(uuid).unwrap();
704            meta.lock_key = self.lock_key.clone();
705            meta.poison_keys = self.poison_keys.clone();
706
707            Arc::new(meta)
708        }
709    }
710
711    #[async_trait]
712    impl<F> Procedure for ProcedureAdapter<F>
713    where
714        F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
715    {
716        fn type_name(&self) -> &str {
717            "ProcedureAdapter"
718        }
719
720        async fn execute(&mut self, ctx: &Context) -> Result<Status> {
721            let f = (self.exec_fn)(ctx.clone());
722            f.await
723        }
724
725        async fn rollback(&mut self, ctx: &Context) -> Result<()> {
726            if let Some(f) = &mut self.rollback_fn {
727                return (f)(ctx.clone()).await;
728            }
729            Ok(())
730        }
731
732        fn rollback_supported(&self) -> bool {
733            self.rollback_fn.is_some()
734        }
735
736        fn dump(&self) -> Result<String> {
737            Ok(self.data.clone())
738        }
739
740        fn lock_key(&self) -> LockKey {
741            self.lock_key.clone()
742        }
743
744        fn poison_keys(&self) -> PoisonKeys {
745            self.poison_keys.clone()
746        }
747    }
748
749    async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
750        let mut times = 0;
751        let exec_fn = move |_| {
752            times += 1;
753            async move {
754                if times == 1 {
755                    Ok(Status::executing(persist))
756                } else {
757                    Ok(Status::done())
758                }
759            }
760            .boxed()
761        };
762        let normal = ProcedureAdapter {
763            data: "normal".to_string(),
764            lock_key: LockKey::single_exclusive("catalog.schema.table"),
765            poison_keys: PoisonKeys::default(),
766            exec_fn,
767            rollback_fn: None,
768        };
769
770        let dir = create_temp_dir("normal");
771        let meta = normal.new_meta(ROOT_ID);
772        let ctx = context_without_provider(meta.id);
773        let object_store = test_util::new_object_store(&dir);
774        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
775        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
776        runner.manager_ctx.start();
777
778        runner.execute_once(&ctx).await;
779        let state = runner.meta.state();
780        assert!(state.is_running(), "{state:?}");
781        check_files(
782            &object_store,
783            &procedure_store,
784            ctx.procedure_id,
785            first_files,
786        )
787        .await;
788
789        runner.execute_once(&ctx).await;
790        let state = runner.meta.state();
791        assert!(state.is_done(), "{state:?}");
792        check_files(
793            &object_store,
794            &procedure_store,
795            ctx.procedure_id,
796            second_files,
797        )
798        .await;
799    }
800
801    #[tokio::test]
802    async fn test_execute_once_normal() {
803        execute_once_normal(
804            true,
805            &["0000000000.step"],
806            &["0000000000.step", "0000000001.commit"],
807        )
808        .await;
809    }
810
811    #[tokio::test]
812    async fn test_execute_once_normal_skip_persist() {
813        execute_once_normal(false, &[], &["0000000000.commit"]).await;
814    }
815
816    #[tokio::test]
817    async fn test_on_suspend_empty() {
818        let exec_fn = move |_| {
819            async move {
820                Ok(Status::Suspended {
821                    subprocedures: Vec::new(),
822                    persist: false,
823                })
824            }
825            .boxed()
826        };
827        let suspend = ProcedureAdapter {
828            data: "suspend".to_string(),
829            lock_key: LockKey::single_exclusive("catalog.schema.table"),
830            poison_keys: PoisonKeys::default(),
831            exec_fn,
832            rollback_fn: None,
833        };
834
835        let dir = create_temp_dir("suspend");
836        let meta = suspend.new_meta(ROOT_ID);
837        let ctx = context_without_provider(meta.id);
838        let object_store = test_util::new_object_store(&dir);
839        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
840        let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
841        runner.manager_ctx.start();
842
843        runner.execute_once(&ctx).await;
844        let state = runner.meta.state();
845        assert!(state.is_running(), "{state:?}");
846    }
847
848    fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
849        let mut times = 0;
850        let exec_fn = move |_| {
851            times += 1;
852            async move {
853                if times == 1 {
854                    time::sleep(Duration::from_millis(200)).await;
855                    Ok(Status::executing(true))
856                } else {
857                    Ok(Status::done())
858                }
859            }
860            .boxed()
861        };
862        let child = ProcedureAdapter {
863            data: "child".to_string(),
864            lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
865            poison_keys: PoisonKeys::default(),
866            exec_fn,
867            rollback_fn: None,
868        };
869
870        ProcedureWithId {
871            id: procedure_id,
872            procedure: Box::new(child),
873        }
874    }
875
876    #[tokio::test]
877    async fn test_on_suspend_by_subprocedures() {
878        let mut times = 0;
879        let children_ids = [ProcedureId::random(), ProcedureId::random()];
880        let keys = [
881            &[
882                "catalog.schema.table.region-0",
883                "catalog.schema.table.region-1",
884            ],
885            &[
886                "catalog.schema.table.region-2",
887                "catalog.schema.table.region-3",
888            ],
889        ];
890
891        let exec_fn = move |ctx: Context| {
892            times += 1;
893            async move {
894                if times == 1 {
895                    // Submit subprocedures.
896                    Ok(Status::Suspended {
897                        subprocedures: children_ids
898                            .into_iter()
899                            .zip(keys)
900                            .map(|(id, key_slice)| new_child_procedure(id, key_slice))
901                            .collect(),
902                        persist: true,
903                    })
904                } else {
905                    // Wait for subprocedures.
906                    let mut all_child_done = true;
907                    for id in children_ids {
908                        let is_not_done = ctx
909                            .provider
910                            .procedure_state(id)
911                            .await
912                            .unwrap()
913                            .map(|s| !s.is_done())
914                            .unwrap_or(true);
915                        if is_not_done {
916                            all_child_done = false;
917                        }
918                    }
919                    if all_child_done {
920                        Ok(Status::done())
921                    } else {
922                        // Return suspended to wait for notify.
923                        Ok(Status::Suspended {
924                            subprocedures: Vec::new(),
925                            persist: false,
926                        })
927                    }
928                }
929            }
930            .boxed()
931        };
932        let parent = ProcedureAdapter {
933            data: "parent".to_string(),
934            lock_key: LockKey::single_exclusive("catalog.schema.table"),
935            poison_keys: PoisonKeys::default(),
936            exec_fn,
937            rollback_fn: None,
938        };
939
940        let dir = create_temp_dir("parent");
941        let meta = parent.new_meta(ROOT_ID);
942        let procedure_id = meta.id;
943
944        let object_store = test_util::new_object_store(&dir);
945        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
946        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
947        let poison_manager = Arc::new(InMemoryPoisonStore::default());
948        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
949        manager_ctx.start();
950        // Manually add this procedure to the manager ctx.
951        assert!(manager_ctx.try_insert_procedure(meta));
952        // Replace the manager ctx.
953        runner.manager_ctx = manager_ctx.clone();
954
955        runner.run().await;
956        assert!(manager_ctx.key_lock.is_empty());
957
958        // Check child procedures.
959        for child_id in children_ids {
960            let state = manager_ctx.state(child_id).unwrap();
961            assert!(state.is_done(), "{state:?}");
962        }
963        let state = manager_ctx.state(procedure_id).unwrap();
964        assert!(state.is_done(), "{state:?}");
965        // Files are removed.
966        check_files(&object_store, &procedure_store, procedure_id, &[]).await;
967
968        tokio::time::sleep(Duration::from_millis(5)).await;
969        // Clean outdated meta.
970        manager_ctx.remove_outdated_meta(Duration::from_millis(1));
971        assert!(manager_ctx.state(procedure_id).is_none());
972        assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
973        for child_id in children_ids {
974            assert!(manager_ctx.state(child_id).is_none());
975        }
976    }
977
978    #[tokio::test]
979    async fn test_running_is_stopped() {
980        let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
981        let normal = ProcedureAdapter {
982            data: "normal".to_string(),
983            lock_key: LockKey::single_exclusive("catalog.schema.table"),
984            poison_keys: PoisonKeys::default(),
985            exec_fn,
986            rollback_fn: None,
987        };
988
989        let dir = create_temp_dir("test_running_is_stopped");
990        let meta = normal.new_meta(ROOT_ID);
991        let ctx = context_without_provider(meta.id);
992        let object_store = test_util::new_object_store(&dir);
993        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
994        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
995        runner.manager_ctx.start();
996
997        runner.execute_once(&ctx).await;
998        let state = runner.meta.state();
999        assert!(state.is_running(), "{state:?}");
1000        check_files(
1001            &object_store,
1002            &procedure_store,
1003            ctx.procedure_id,
1004            &["0000000000.step"],
1005        )
1006        .await;
1007
1008        runner.manager_ctx.stop();
1009        runner.execute_once(&ctx).await;
1010        let state = runner.meta.state();
1011        assert!(state.is_failed(), "{state:?}");
1012        // Shouldn't write any files
1013        check_files(
1014            &object_store,
1015            &procedure_store,
1016            ctx.procedure_id,
1017            &["0000000000.step"],
1018        )
1019        .await;
1020    }
1021
1022    #[tokio::test]
1023    async fn test_running_is_stopped_on_error() {
1024        let exec_fn =
1025            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1026        let normal = ProcedureAdapter {
1027            data: "fail".to_string(),
1028            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1029            poison_keys: PoisonKeys::default(),
1030            exec_fn,
1031            rollback_fn: None,
1032        };
1033
1034        let dir = create_temp_dir("test_running_is_stopped_on_error");
1035        let meta = normal.new_meta(ROOT_ID);
1036        let ctx = context_without_provider(meta.id);
1037        let object_store = test_util::new_object_store(&dir);
1038        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1039        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1040        runner.manager_ctx.stop();
1041
1042        runner.execute_once(&ctx).await;
1043        let state = runner.meta.state();
1044        assert!(state.is_failed(), "{state:?}");
1045        // Shouldn't write any files
1046        check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1047    }
1048
1049    #[tokio::test]
1050    async fn test_execute_on_error() {
1051        let exec_fn =
1052            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1053        let fail = ProcedureAdapter {
1054            data: "fail".to_string(),
1055            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1056            poison_keys: PoisonKeys::default(),
1057            exec_fn,
1058            rollback_fn: None,
1059        };
1060
1061        let dir = create_temp_dir("fail");
1062        let meta = fail.new_meta(ROOT_ID);
1063        let ctx = context_without_provider(meta.id);
1064        let object_store = test_util::new_object_store(&dir);
1065        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1066        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1067        runner.manager_ctx.start();
1068
1069        runner.execute_once(&ctx).await;
1070        let state = runner.meta.state();
1071        assert!(state.is_prepare_rollback(), "{state:?}");
1072
1073        runner.execute_once(&ctx).await;
1074        let state = runner.meta.state();
1075        assert!(state.is_failed(), "{state:?}");
1076        check_files(
1077            &object_store,
1078            &procedure_store,
1079            ctx.procedure_id,
1080            &["0000000000.rollback"],
1081        )
1082        .await;
1083    }
1084
1085    #[tokio::test]
1086    async fn test_execute_with_rollback_on_error() {
1087        let exec_fn =
1088            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1089        let rollback_fn = move |_| async move { Ok(()) }.boxed();
1090        let fail = ProcedureAdapter {
1091            data: "fail".to_string(),
1092            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1093            poison_keys: PoisonKeys::default(),
1094            exec_fn,
1095            rollback_fn: Some(Box::new(rollback_fn)),
1096        };
1097
1098        let dir = create_temp_dir("fail");
1099        let meta = fail.new_meta(ROOT_ID);
1100        let ctx = context_without_provider(meta.id);
1101        let object_store = test_util::new_object_store(&dir);
1102        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1103        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1104        runner.manager_ctx.start();
1105
1106        runner.execute_once(&ctx).await;
1107        let state = runner.meta.state();
1108        assert!(state.is_prepare_rollback(), "{state:?}");
1109
1110        runner.execute_once(&ctx).await;
1111        let state = runner.meta.state();
1112        assert!(state.is_rolling_back(), "{state:?}");
1113
1114        runner.execute_once(&ctx).await;
1115        let state = runner.meta.state();
1116        assert!(state.is_failed(), "{state:?}");
1117        check_files(
1118            &object_store,
1119            &procedure_store,
1120            ctx.procedure_id,
1121            &["0000000000.rollback"],
1122        )
1123        .await;
1124    }
1125
1126    #[tokio::test]
1127    async fn test_execute_on_retry_later_error() {
1128        let mut times = 0;
1129
1130        let exec_fn = move |_| {
1131            times += 1;
1132            async move {
1133                if times == 1 {
1134                    Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1135                } else {
1136                    Ok(Status::done())
1137                }
1138            }
1139            .boxed()
1140        };
1141
1142        let retry_later = ProcedureAdapter {
1143            data: "retry_later".to_string(),
1144            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1145            poison_keys: PoisonKeys::default(),
1146            exec_fn,
1147            rollback_fn: None,
1148        };
1149
1150        let dir = create_temp_dir("retry_later");
1151        let meta = retry_later.new_meta(ROOT_ID);
1152        let ctx = context_without_provider(meta.id);
1153        let object_store = test_util::new_object_store(&dir);
1154        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1155        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1156        runner.manager_ctx.start();
1157        runner.execute_once(&ctx).await;
1158        let state = runner.meta.state();
1159        assert!(state.is_retrying(), "{state:?}");
1160
1161        runner.execute_once(&ctx).await;
1162        let state = runner.meta.state();
1163        assert!(state.is_done(), "{state:?}");
1164        assert!(meta.state().is_done());
1165        check_files(
1166            &object_store,
1167            &procedure_store,
1168            ctx.procedure_id,
1169            &["0000000000.commit"],
1170        )
1171        .await;
1172    }
1173
1174    #[tokio::test]
1175    async fn test_execute_exceed_max_retry_later() {
1176        let exec_fn =
1177            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1178
1179        let exceed_max_retry_later = ProcedureAdapter {
1180            data: "exceed_max_retry_later".to_string(),
1181            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1182            poison_keys: PoisonKeys::default(),
1183            exec_fn,
1184            rollback_fn: None,
1185        };
1186
1187        let dir = create_temp_dir("exceed_max_retry_later");
1188        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1189        let object_store = test_util::new_object_store(&dir);
1190        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1191        let mut runner = new_runner(
1192            meta.clone(),
1193            Box::new(exceed_max_retry_later),
1194            procedure_store,
1195        );
1196        runner.manager_ctx.start();
1197
1198        runner.exponential_builder = ExponentialBuilder::default()
1199            .with_min_delay(Duration::from_millis(1))
1200            .with_max_times(3);
1201
1202        // Run the runner and execute the procedure.
1203        runner.execute_procedure_in_loop().await;
1204        let err = meta.state().error().unwrap().to_string();
1205        assert!(err.contains("Procedure retry exceeded max times"));
1206    }
1207
1208    #[tokio::test]
1209    async fn test_rollback_exceed_max_retry_later() {
1210        let exec_fn =
1211            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1212        let rollback_fn = move |_| {
1213            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1214        };
1215        let exceed_max_retry_later = ProcedureAdapter {
1216            data: "exceed_max_rollback".to_string(),
1217            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1218            poison_keys: PoisonKeys::default(),
1219            exec_fn,
1220            rollback_fn: Some(Box::new(rollback_fn)),
1221        };
1222
1223        let dir = create_temp_dir("exceed_max_rollback");
1224        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1225        let object_store = test_util::new_object_store(&dir);
1226        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1227        let mut runner = new_runner(
1228            meta.clone(),
1229            Box::new(exceed_max_retry_later),
1230            procedure_store,
1231        );
1232        runner.manager_ctx.start();
1233        runner.exponential_builder = ExponentialBuilder::default()
1234            .with_min_delay(Duration::from_millis(1))
1235            .with_max_times(3);
1236
1237        // Run the runner and execute the procedure.
1238        runner.execute_procedure_in_loop().await;
1239        let err = meta.state().error().unwrap().to_string();
1240        assert!(err.contains("Procedure rollback exceeded max times"));
1241    }
1242
1243    #[tokio::test]
1244    async fn test_rollback_after_retry_fail() {
1245        let exec_fn = move |_| {
1246            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1247        };
1248
1249        let (tx, mut rx) = mpsc::channel(1);
1250        let rollback_fn = move |_| {
1251            let tx = tx.clone();
1252            async move {
1253                tx.send(()).await.unwrap();
1254                Ok(())
1255            }
1256            .boxed()
1257        };
1258        let retry_later = ProcedureAdapter {
1259            data: "rollback_after_retry_fail".to_string(),
1260            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1261            poison_keys: PoisonKeys::default(),
1262            exec_fn,
1263            rollback_fn: Some(Box::new(rollback_fn)),
1264        };
1265
1266        let dir = create_temp_dir("retry_later");
1267        let meta = retry_later.new_meta(ROOT_ID);
1268        let ctx = context_without_provider(meta.id);
1269        let object_store = test_util::new_object_store(&dir);
1270        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1271        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1272        runner.manager_ctx.start();
1273        runner.exponential_builder = ExponentialBuilder::default()
1274            .with_min_delay(Duration::from_millis(1))
1275            .with_max_times(3);
1276        // Run the runner and execute the procedure.
1277        runner.execute_procedure_in_loop().await;
1278        rx.recv().await.unwrap();
1279        assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1280        check_files(
1281            &object_store,
1282            &procedure_store,
1283            ctx.procedure_id,
1284            &["0000000000.rollback"],
1285        )
1286        .await;
1287    }
1288
1289    #[tokio::test]
1290    async fn test_child_error() {
1291        let mut times = 0;
1292        let child_id = ProcedureId::random();
1293
1294        let exec_fn = move |ctx: Context| {
1295            times += 1;
1296            async move {
1297                if times == 1 {
1298                    // Submit subprocedures.
1299                    let exec_fn = |_| {
1300                        async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1301                            .boxed()
1302                    };
1303                    let fail = ProcedureAdapter {
1304                        data: "fail".to_string(),
1305                        lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1306                        poison_keys: PoisonKeys::default(),
1307                        exec_fn,
1308                        rollback_fn: None,
1309                    };
1310
1311                    Ok(Status::Suspended {
1312                        subprocedures: vec![ProcedureWithId {
1313                            id: child_id,
1314                            procedure: Box::new(fail),
1315                        }],
1316                        persist: true,
1317                    })
1318                } else {
1319                    // Wait for subprocedures.
1320                    let state = ctx.provider.procedure_state(child_id).await.unwrap();
1321                    let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1322                    if is_failed {
1323                        // The parent procedure to abort itself if child procedure is failed.
1324                        Err(Error::from_error_ext(PlainError::new(
1325                            "subprocedure failed".to_string(),
1326                            StatusCode::Unexpected,
1327                        )))
1328                    } else {
1329                        // Return suspended to wait for notify.
1330                        Ok(Status::Suspended {
1331                            subprocedures: Vec::new(),
1332                            persist: false,
1333                        })
1334                    }
1335                }
1336            }
1337            .boxed()
1338        };
1339        let parent = ProcedureAdapter {
1340            data: "parent".to_string(),
1341            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1342            poison_keys: PoisonKeys::default(),
1343            exec_fn,
1344            rollback_fn: None,
1345        };
1346
1347        let dir = create_temp_dir("child_err");
1348        let meta = parent.new_meta(ROOT_ID);
1349
1350        let object_store = test_util::new_object_store(&dir);
1351        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1352        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1353        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1354        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1355        manager_ctx.start();
1356        // Manually add this procedure to the manager ctx.
1357        assert!(manager_ctx.try_insert_procedure(meta.clone()));
1358        // Replace the manager ctx.
1359        runner.manager_ctx = manager_ctx.clone();
1360
1361        // Run the runner and execute the procedure.
1362        runner.run().await;
1363        assert!(manager_ctx.key_lock.is_empty());
1364        let err = meta.state().error().unwrap().output_msg();
1365        assert!(err.contains("subprocedure failed"), "{err}");
1366    }
1367
1368    #[tokio::test]
1369    async fn test_execute_with_clean_poisons() {
1370        common_telemetry::init_default_ut_logging();
1371        let mut times = 0;
1372        let poison_key = PoisonKey::new("table/1024");
1373        let moved_poison_key = poison_key.clone();
1374        let exec_fn = move |ctx: Context| {
1375            times += 1;
1376            let poison_key = moved_poison_key.clone();
1377            async move {
1378                if times == 1 {
1379                    // Put the poison to the context.
1380                    ctx.provider
1381                        .try_put_poison(&poison_key, ctx.procedure_id)
1382                        .await
1383                        .unwrap();
1384
1385                    Ok(Status::executing(true))
1386                } else {
1387                    Ok(Status::executing_with_clean_poisons(true))
1388                }
1389            }
1390            .boxed()
1391        };
1392        let poison = ProcedureAdapter {
1393            data: "poison".to_string(),
1394            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1395            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1396            exec_fn,
1397            rollback_fn: None,
1398        };
1399
1400        let dir = create_temp_dir("clean_poisons");
1401        let meta = poison.new_meta(ROOT_ID);
1402
1403        let object_store = test_util::new_object_store(&dir);
1404        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1405        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1406
1407        // Use the manager ctx as the context provider.
1408        let ctx = context_with_provider(
1409            meta.id,
1410            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1411        );
1412        // Manually add this procedure to the manager ctx.
1413        runner
1414            .manager_ctx
1415            .procedures
1416            .write()
1417            .unwrap()
1418            .insert(meta.id, runner.meta.clone());
1419
1420        runner.manager_ctx.start();
1421        runner.execute_once(&ctx).await;
1422        let state = runner.meta.state();
1423        assert!(state.is_running(), "{state:?}");
1424
1425        let procedure_id = runner
1426            .manager_ctx
1427            .poison_manager
1428            .get_poison(&poison_key.to_string())
1429            .await
1430            .unwrap();
1431        // poison key should be exist.
1432        assert!(procedure_id.is_some());
1433
1434        runner.execute_once(&ctx).await;
1435        let state = runner.meta.state();
1436        assert!(state.is_running(), "{state:?}");
1437
1438        let procedure_id = runner
1439            .manager_ctx
1440            .poison_manager
1441            .get_poison(&poison_key.to_string())
1442            .await
1443            .unwrap();
1444        // poison key should be deleted.
1445        assert!(procedure_id.is_none());
1446    }
1447
1448    #[tokio::test]
1449    async fn test_execute_error_with_clean_poisons() {
1450        common_telemetry::init_default_ut_logging();
1451        let mut times = 0;
1452        let poison_key = PoisonKey::new("table/1024");
1453        let moved_poison_key = poison_key.clone();
1454        let exec_fn = move |ctx: Context| {
1455            times += 1;
1456            let poison_key = moved_poison_key.clone();
1457            async move {
1458                if times == 1 {
1459                    // Put the poison to the context.
1460                    ctx.provider
1461                        .try_put_poison(&poison_key, ctx.procedure_id)
1462                        .await
1463                        .unwrap();
1464
1465                    Ok(Status::executing(true))
1466                } else {
1467                    Err(Error::external_and_clean_poisons(MockError::new(
1468                        StatusCode::Unexpected,
1469                    )))
1470                }
1471            }
1472            .boxed()
1473        };
1474        let poison = ProcedureAdapter {
1475            data: "poison".to_string(),
1476            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1477            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1478            exec_fn,
1479            rollback_fn: None,
1480        };
1481
1482        let dir = create_temp_dir("error_with_clean_poisons");
1483        let meta = poison.new_meta(ROOT_ID);
1484
1485        let object_store = test_util::new_object_store(&dir);
1486        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1487        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1488
1489        // Use the manager ctx as the context provider.
1490        let ctx = context_with_provider(
1491            meta.id,
1492            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1493        );
1494        // Manually add this procedure to the manager ctx.
1495        runner
1496            .manager_ctx
1497            .procedures
1498            .write()
1499            .unwrap()
1500            .insert(meta.id, runner.meta.clone());
1501
1502        runner.manager_ctx.start();
1503        runner.execute_once(&ctx).await;
1504        let state = runner.meta.state();
1505        assert!(state.is_running(), "{state:?}");
1506
1507        let procedure_id = runner
1508            .manager_ctx
1509            .poison_manager
1510            .get_poison(&poison_key.to_string())
1511            .await
1512            .unwrap();
1513        // poison key should be exist.
1514        assert!(procedure_id.is_some());
1515
1516        runner.execute_once(&ctx).await;
1517        let state = runner.meta.state();
1518        assert!(state.is_prepare_rollback(), "{state:?}");
1519
1520        let procedure_id = runner
1521            .manager_ctx
1522            .poison_manager
1523            .get_poison(&poison_key.to_string())
1524            .await
1525            .unwrap();
1526        // poison key should be deleted.
1527        assert!(procedure_id.is_none());
1528    }
1529
1530    #[tokio::test]
1531    async fn test_execute_failed_after_set_poison() {
1532        let mut times = 0;
1533        let poison_key = PoisonKey::new("table/1024");
1534        let moved_poison_key = poison_key.clone();
1535        let exec_fn = move |ctx: Context| {
1536            times += 1;
1537            let poison_key = moved_poison_key.clone();
1538            async move {
1539                if times == 1 {
1540                    Ok(Status::executing(true))
1541                } else {
1542                    // Put the poison to the context.
1543                    ctx.provider
1544                        .try_put_poison(&poison_key, ctx.procedure_id)
1545                        .await
1546                        .unwrap();
1547                    Err(Error::external(MockError::new(StatusCode::Unexpected)))
1548                }
1549            }
1550            .boxed()
1551        };
1552        let poison = ProcedureAdapter {
1553            data: "poison".to_string(),
1554            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1555            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1556            exec_fn,
1557            rollback_fn: None,
1558        };
1559
1560        let dir = create_temp_dir("poison");
1561        let meta = poison.new_meta(ROOT_ID);
1562
1563        let object_store = test_util::new_object_store(&dir);
1564        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1565        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1566
1567        // Use the manager ctx as the context provider.
1568        let ctx = context_with_provider(
1569            meta.id,
1570            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1571        );
1572        // Manually add this procedure to the manager ctx.
1573        runner
1574            .manager_ctx
1575            .procedures
1576            .write()
1577            .unwrap()
1578            .insert(meta.id, runner.meta.clone());
1579
1580        runner.manager_ctx.start();
1581        runner.execute_once(&ctx).await;
1582        let state = runner.meta.state();
1583        assert!(state.is_running(), "{state:?}");
1584
1585        runner.execute_once(&ctx).await;
1586        let state = runner.meta.state();
1587        assert!(state.is_prepare_rollback(), "{state:?}");
1588        assert!(meta.state().is_prepare_rollback());
1589
1590        runner.execute_once(&ctx).await;
1591        let state = runner.meta.state();
1592        assert!(state.is_failed(), "{state:?}");
1593        assert!(meta.state().is_failed());
1594
1595        // Check the poison is set.
1596        let procedure_id = runner
1597            .manager_ctx
1598            .poison_manager
1599            .get_poison(&poison_key.to_string())
1600            .await
1601            .unwrap()
1602            .unwrap();
1603
1604        // If the procedure is poisoned, the poison key shouldn't be deleted.
1605        assert_eq!(&procedure_id.to_string(), ROOT_ID);
1606    }
1607
1608    #[tokio::test]
1609    async fn test_execute_exceed_max_retry_after_set_poison() {
1610        common_telemetry::init_default_ut_logging();
1611        let mut times = 0;
1612        let poison_key = PoisonKey::new("table/1024");
1613        let moved_poison_key = poison_key.clone();
1614        let exec_fn = move |ctx: Context| {
1615            times += 1;
1616            let poison_key = moved_poison_key.clone();
1617            async move {
1618                if times == 1 {
1619                    Ok(Status::executing(true))
1620                } else {
1621                    // Put the poison to the context.
1622                    ctx.provider
1623                        .try_put_poison(&poison_key, ctx.procedure_id)
1624                        .await
1625                        .unwrap();
1626                    Err(Error::retry_later_and_clean_poisons(MockError::new(
1627                        StatusCode::Unexpected,
1628                    )))
1629                }
1630            }
1631            .boxed()
1632        };
1633        let poison = ProcedureAdapter {
1634            data: "poison".to_string(),
1635            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1636            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1637            exec_fn,
1638            rollback_fn: None,
1639        };
1640
1641        let dir = create_temp_dir("exceed_max_after_set_poison");
1642        let meta = poison.new_meta(ROOT_ID);
1643        let object_store = test_util::new_object_store(&dir);
1644        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1645        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
1646        runner.manager_ctx.start();
1647        runner.exponential_builder = ExponentialBuilder::default()
1648            .with_min_delay(Duration::from_millis(1))
1649            .with_max_times(3);
1650        // Use the manager ctx as the context provider.
1651        let ctx = context_with_provider(
1652            meta.id,
1653            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1654        );
1655        // Manually add this procedure to the manager ctx.
1656        runner
1657            .manager_ctx
1658            .procedures
1659            .write()
1660            .unwrap()
1661            .insert(meta.id, runner.meta.clone());
1662        // Run the runner and execute the procedure.
1663        runner.execute_once_with_retry(&ctx).await;
1664        let err = meta.state().error().unwrap().clone();
1665        assert_matches!(&*err, Error::RetryTimesExceeded { .. });
1666
1667        // Check the poison is deleted.
1668        let procedure_id = runner
1669            .manager_ctx
1670            .poison_manager
1671            .get_poison(&poison_key.to_string())
1672            .await
1673            .unwrap();
1674        assert_eq!(procedure_id, None);
1675    }
1676
1677    #[tokio::test]
1678    async fn test_execute_poisoned() {
1679        let mut times = 0;
1680        let poison_key = PoisonKey::new("table/1024");
1681        let moved_poison_key = poison_key.clone();
1682        let exec_fn = move |ctx: Context| {
1683            times += 1;
1684            let poison_key = moved_poison_key.clone();
1685            async move {
1686                if times == 1 {
1687                    Ok(Status::executing(true))
1688                } else {
1689                    // Put the poison to the context.
1690                    ctx.provider
1691                        .try_put_poison(&poison_key, ctx.procedure_id)
1692                        .await
1693                        .unwrap();
1694                    Ok(Status::Poisoned {
1695                        keys: PoisonKeys::new(vec![poison_key.clone()]),
1696                        error: Error::external(MockError::new(StatusCode::Unexpected)),
1697                    })
1698                }
1699            }
1700            .boxed()
1701        };
1702        let poison = ProcedureAdapter {
1703            data: "poison".to_string(),
1704            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1705            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1706            exec_fn,
1707            rollback_fn: None,
1708        };
1709
1710        let dir = create_temp_dir("poison");
1711        let meta = poison.new_meta(ROOT_ID);
1712
1713        let object_store = test_util::new_object_store(&dir);
1714        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1715        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1716
1717        // Use the manager ctx as the context provider.
1718        let ctx = context_with_provider(
1719            meta.id,
1720            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1721        );
1722        // Manually add this procedure to the manager ctx.
1723        runner
1724            .manager_ctx
1725            .procedures
1726            .write()
1727            .unwrap()
1728            .insert(meta.id, runner.meta.clone());
1729
1730        runner.manager_ctx.start();
1731        runner.execute_once(&ctx).await;
1732        let state = runner.meta.state();
1733        assert!(state.is_running(), "{state:?}");
1734
1735        runner.execute_once(&ctx).await;
1736        let state = runner.meta.state();
1737        assert!(state.is_poisoned(), "{state:?}");
1738        assert!(meta.state().is_poisoned());
1739        check_files(
1740            &object_store,
1741            &procedure_store,
1742            ctx.procedure_id,
1743            &["0000000000.step"],
1744        )
1745        .await;
1746
1747        // Check the poison is set.
1748        let procedure_id = runner
1749            .manager_ctx
1750            .poison_manager
1751            .get_poison(&poison_key.to_string())
1752            .await
1753            .unwrap()
1754            .unwrap();
1755
1756        // If the procedure is poisoned, the poison key shouldn't be deleted.
1757        assert_eq!(procedure_id, ROOT_ID);
1758    }
1759
1760    fn test_procedure_with_dynamic_lock(
1761        shared_atomic_value: Arc<AtomicU64>,
1762        id: u64,
1763    ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1764        let exec_fn = move |ctx: Context| {
1765            let moved_shared_atomic_value = shared_atomic_value.clone();
1766            let moved_ctx = ctx.clone();
1767            async move {
1768                debug!("Acquiring write lock, id: {}", id);
1769                let key = StringKey::Exclusive("test_lock".to_string());
1770                let guard = moved_ctx.provider.acquire_lock(&key).await;
1771                debug!("Acquired write lock, id: {}", id);
1772                let millis = rand::rng().random_range(10..=50);
1773                tokio::time::sleep(Duration::from_millis(millis)).await;
1774                let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1775                moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1776                debug!("Dropping write lock, id: {}", id);
1777                drop(guard);
1778
1779                Ok(Status::done())
1780            }
1781            .boxed()
1782        };
1783
1784        let adapter = ProcedureAdapter {
1785            data: "dynamic_lock".to_string(),
1786            lock_key: LockKey::new_exclusive([]),
1787            poison_keys: PoisonKeys::new([]),
1788            exec_fn,
1789            rollback_fn: None,
1790        };
1791        let meta = adapter.new_meta(ROOT_ID);
1792
1793        (Box::new(adapter), meta)
1794    }
1795
1796    #[tokio::test(flavor = "multi_thread")]
1797    async fn test_execute_with_dynamic_lock() {
1798        common_telemetry::init_default_ut_logging();
1799        let shared_atomic_value = Arc::new(AtomicU64::new(0));
1800        let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1801        let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1802
1803        let dir = create_temp_dir("dynamic_lock");
1804        let object_store = test_util::new_object_store(&dir);
1805        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1806        let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
1807        let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
1808        let ctx1 = context_with_provider(
1809            meta1.id,
1810            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1811        );
1812        let ctx2 = context_with_provider(
1813            meta2.id,
1814            // use same manager ctx as runner1
1815            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1816        );
1817        let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
1818        join_all(tasks).await;
1819        assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
1820    }
1821}