common_procedure/local/
runner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_telemetry::{debug, error, info};
21use rand::Rng;
22use snafu::ResultExt;
23use tokio::time;
24
25use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
26use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
27use crate::procedure::{Output, StringKey};
28use crate::rwlock::OwnedKeyRwLockGuard;
29use crate::store::{ProcedureMessage, ProcedureStore};
30use crate::{
31    BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
32};
33
34/// A guard to cleanup procedure state.
35struct ProcedureGuard {
36    meta: ProcedureMetaRef,
37    manager_ctx: Arc<ManagerContext>,
38    key_guards: Vec<OwnedKeyRwLockGuard>,
39    finish: bool,
40}
41
42impl ProcedureGuard {
43    /// Returns a new [ProcedureGuard].
44    fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
45        ProcedureGuard {
46            meta,
47            manager_ctx,
48            key_guards: vec![],
49            finish: false,
50        }
51    }
52
53    /// The procedure is finished successfully.
54    fn finish(mut self) {
55        self.finish = true;
56    }
57}
58
59impl Drop for ProcedureGuard {
60    fn drop(&mut self) {
61        if !self.finish {
62            error!("Procedure {} exits unexpectedly", self.meta.id);
63
64            // Set state to failed. This is useful in test as runtime may not abort when the runner task panics.
65            // See https://github.com/tokio-rs/tokio/issues/2002 .
66            // We set set_panic_hook() in the application's main function. But our tests don't have this panic hook.
67            let err = ProcedurePanicSnafu {
68                procedure_id: self.meta.id,
69            }
70            .build();
71            self.meta.set_state(ProcedureState::failed(Arc::new(err)));
72        }
73
74        // Notify parent procedure.
75        if let Some(parent_id) = self.meta.parent_id {
76            self.manager_ctx.notify_by_subprocedure(parent_id);
77        }
78
79        // Drops the key guards in the reverse order.
80        while !self.key_guards.is_empty() {
81            self.key_guards.pop();
82        }
83
84        // Clean the staled locks.
85        self.manager_ctx
86            .key_lock
87            .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
88    }
89}
90
91pub(crate) struct Runner {
92    pub(crate) meta: ProcedureMetaRef,
93    pub(crate) procedure: BoxedProcedure,
94    pub(crate) manager_ctx: Arc<ManagerContext>,
95    pub(crate) step: u32,
96    pub(crate) exponential_builder: ExponentialBuilder,
97    pub(crate) store: Arc<ProcedureStore>,
98    pub(crate) rolling_back: bool,
99}
100
101impl Runner {
102    /// Return `ProcedureManager` is running.
103    pub(crate) fn running(&self) -> bool {
104        self.manager_ctx.running()
105    }
106
107    /// Run the procedure.
108    pub(crate) async fn run(mut self) {
109        // Ensure we can update the procedure state.
110        let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
111
112        info!(
113            "Runner {}-{} starts",
114            self.procedure.type_name(),
115            self.meta.id
116        );
117
118        // TODO(yingwen): Detect recursive locking (and deadlock) if possible. Maybe we could detect
119        // recursive locking by adding a root procedure id to the meta.
120        for key in self.meta.lock_key.keys_to_lock() {
121            // Acquire lock for each key.
122            let key_guard = match key {
123                StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
124                StringKey::Exclusive(key) => {
125                    self.manager_ctx.key_lock.write(key.clone()).await.into()
126                }
127            };
128
129            guard.key_guards.push(key_guard);
130        }
131
132        // Execute the procedure. We need to release the lock whenever the execution
133        // is successful or fail.
134        self.meta.set_start_time_ms();
135        self.execute_procedure_in_loop().await;
136        self.meta.set_end_time_ms();
137
138        // We can't remove the metadata of the procedure now as users and its parent might
139        // need to query its state.
140        // TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store
141        // so we don't need to always store the metadata in memory after the procedure is done.
142
143        // Release locks and notify parent procedure.
144        guard.finish();
145
146        // If this is the root procedure, clean up message cache.
147        if self.meta.parent_id.is_none() {
148            let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
149            // Clean resources.
150            self.manager_ctx.on_procedures_finish(&procedure_ids);
151
152            // If `ProcedureManager` is stopped, it stops the current task immediately without deleting the procedure.
153            if !self.running() {
154                return;
155            }
156
157            for id in procedure_ids {
158                if let Err(e) = self.store.delete_procedure(id).await {
159                    error!(
160                        e;
161                        "Runner {}-{} failed to delete procedure {}",
162                        self.procedure.type_name(),
163                        self.meta.id,
164                        id,
165                    );
166                }
167            }
168        }
169
170        info!(
171            "Runner {}-{} exits",
172            self.procedure.type_name(),
173            self.meta.id
174        );
175    }
176
177    async fn execute_procedure_in_loop(&mut self) {
178        let ctx = Context {
179            procedure_id: self.meta.id,
180            provider: self.manager_ctx.clone(),
181        };
182
183        self.rolling_back = false;
184        self.execute_once_with_retry(&ctx).await;
185    }
186
187    async fn execute_once_with_retry(&mut self, ctx: &Context) {
188        let mut retry = self.exponential_builder.build();
189        let mut retry_times = 0;
190
191        let mut rollback = self.exponential_builder.build();
192        let mut rollback_times = 0;
193
194        loop {
195            // Don't store state if `ProcedureManager` is stopped.
196            if !self.running() {
197                self.meta.set_state(ProcedureState::failed(Arc::new(
198                    error::ManagerNotStartSnafu {}.build(),
199                )));
200                return;
201            }
202            let state = self.meta.state();
203            match state {
204                ProcedureState::Running => {}
205                ProcedureState::Retrying { error } => {
206                    retry_times += 1;
207                    if let Some(d) = retry.next() {
208                        let millis = d.as_millis() as u64;
209                        // Add random noise to the retry delay to avoid retry storms.
210                        let noise = rand::rng().random_range(0..(millis / 4) + 1);
211                        let d = d.add(Duration::from_millis(noise));
212
213                        self.wait_on_err(d, retry_times).await;
214                    } else {
215                        self.meta
216                            .set_state(ProcedureState::prepare_rollback(Arc::new(
217                                Error::RetryTimesExceeded {
218                                    source: error.clone(),
219                                    procedure_id: self.meta.id,
220                                },
221                            )));
222                    }
223                }
224                ProcedureState::PrepareRollback { error }
225                | ProcedureState::RollingBack { error } => {
226                    rollback_times += 1;
227                    if let Some(d) = rollback.next() {
228                        self.wait_on_err(d, rollback_times).await;
229                    } else {
230                        let err = Err::<(), Arc<Error>>(error)
231                            .context(RollbackTimesExceededSnafu {
232                                procedure_id: self.meta.id,
233                            })
234                            .unwrap_err();
235                        self.meta.set_state(ProcedureState::failed(Arc::new(err)));
236                        return;
237                    }
238                }
239                ProcedureState::Done { .. } => return,
240                ProcedureState::Failed { .. } => return,
241                ProcedureState::Poisoned { .. } => return,
242            }
243            self.execute_once(ctx).await;
244        }
245    }
246
247    async fn clean_poisons(&mut self) -> Result<()> {
248        let mut error = None;
249        for key in self.meta.poison_keys.iter() {
250            let key = key.to_string();
251            if let Err(e) = self
252                .manager_ctx
253                .poison_manager
254                .delete_poison(key, self.meta.id.to_string())
255                .await
256            {
257                error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
258                error = Some(e);
259            }
260        }
261
262        // returns the last error if any.
263        if let Some(e) = error {
264            return Err(e);
265        }
266        Ok(())
267    }
268
269    async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
270        if self.procedure.rollback_supported() {
271            if let Err(e) = self.procedure.rollback(ctx).await {
272                self.meta
273                    .set_state(ProcedureState::rolling_back(Arc::new(e)));
274                return;
275            }
276        }
277        self.meta.set_state(ProcedureState::failed(err));
278    }
279
280    async fn prepare_rollback(&mut self, err: Arc<Error>) {
281        if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
282            self.meta
283                .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
284            return;
285        }
286        if self.procedure.rollback_supported() {
287            self.meta.set_state(ProcedureState::rolling_back(err));
288        } else {
289            self.meta.set_state(ProcedureState::failed(err));
290        }
291    }
292
293    async fn execute_once(&mut self, ctx: &Context) {
294        match self.meta.state() {
295            ProcedureState::Running | ProcedureState::Retrying { .. } => {
296                match self.procedure.execute(ctx).await {
297                    Ok(status) => {
298                        debug!(
299                            "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
300                            self.procedure.type_name(),
301                            self.meta.id,
302                            status,
303                            status.need_persist(),
304                        );
305
306                        // Don't store state if `ProcedureManager` is stopped.
307                        if !self.running() {
308                            self.meta.set_state(ProcedureState::failed(Arc::new(
309                                error::ManagerNotStartSnafu {}.build(),
310                            )));
311                            return;
312                        }
313
314                        // Cleans poisons before persist.
315                        if status.need_clean_poisons() {
316                            if let Err(e) = self.clean_poisons().await {
317                                error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
318                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
319                                return;
320                            }
321                        }
322
323                        if status.need_persist() {
324                            if let Err(e) = self.persist_procedure().await {
325                                error!(e; "Failed to persist procedure: {}", self.meta.id);
326                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
327                                return;
328                            }
329                        }
330
331                        match status {
332                            Status::Executing { .. } => {}
333                            Status::Suspended { subprocedures, .. } => {
334                                self.on_suspended(subprocedures).await;
335                            }
336                            Status::Done { output } => {
337                                if let Err(e) = self.commit_procedure().await {
338                                    error!(e; "Failed to commit procedure: {}", self.meta.id);
339                                    self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
340                                    return;
341                                }
342
343                                self.done(output);
344                            }
345                            Status::Poisoned { error, keys } => {
346                                error!(
347                                    error;
348                                    "Procedure {}-{} is poisoned, keys: {:?}",
349                                    self.procedure.type_name(),
350                                    self.meta.id,
351                                    keys,
352                                );
353                                self.meta
354                                    .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
355                            }
356                        }
357                    }
358                    Err(e) => {
359                        error!(
360                            e;
361                            "Failed to execute procedure {}-{}, retry: {}",
362                            self.procedure.type_name(),
363                            self.meta.id,
364                            e.is_retry_later(),
365                        );
366
367                        // Don't store state if `ProcedureManager` is stopped.
368                        if !self.running() {
369                            self.meta.set_state(ProcedureState::failed(Arc::new(
370                                error::ManagerNotStartSnafu {}.build(),
371                            )));
372                            return;
373                        }
374
375                        if e.need_clean_poisons() {
376                            if let Err(e) = self.clean_poisons().await {
377                                error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
378                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
379                                return;
380                            }
381                        }
382
383                        if e.is_retry_later() {
384                            self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
385                            return;
386                        }
387
388                        self.meta
389                            .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
390                    }
391                }
392            }
393            ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
394            ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
395            ProcedureState::Failed { .. }
396            | ProcedureState::Done { .. }
397            | ProcedureState::Poisoned { .. } => (),
398        }
399    }
400
401    /// Submit a subprocedure with specific `procedure_id`.
402    fn submit_subprocedure(
403        &self,
404        procedure_id: ProcedureId,
405        procedure_state: ProcedureState,
406        procedure: BoxedProcedure,
407    ) {
408        if self.manager_ctx.contains_procedure(procedure_id) {
409            // If the parent has already submitted this procedure, don't submit it again.
410            return;
411        }
412
413        let step = 0;
414
415        let meta = Arc::new(ProcedureMeta::new(
416            procedure_id,
417            procedure_state,
418            Some(self.meta.id),
419            procedure.lock_key(),
420            procedure.poison_keys(),
421            procedure.type_name(),
422        ));
423        let runner = Runner {
424            meta: meta.clone(),
425            procedure,
426            manager_ctx: self.manager_ctx.clone(),
427            step,
428            exponential_builder: self.exponential_builder,
429            store: self.store.clone(),
430            rolling_back: false,
431        };
432
433        // Insert the procedure. We already check the procedure existence before inserting
434        // so we add an assertion to ensure the procedure id is unique and no other procedures
435        // using the same procedure id.
436        assert!(
437            self.manager_ctx.try_insert_procedure(meta),
438            "Procedure {}-{} submit an existing procedure {}-{}",
439            self.procedure.type_name(),
440            self.meta.id,
441            runner.procedure.type_name(),
442            procedure_id,
443        );
444
445        // Add the id of the subprocedure to the metadata.
446        self.meta.push_child(procedure_id);
447
448        let _handle = common_runtime::spawn_global(async move {
449            // Run the root procedure.
450            runner.run().await
451        });
452    }
453
454    /// Extend the retry time to wait for the next retry.
455    async fn wait_on_err(&mut self, d: Duration, i: u64) {
456        info!(
457            "Procedure {}-{} retry for the {} times after {} millis",
458            self.procedure.type_name(),
459            self.meta.id,
460            i,
461            d.as_millis(),
462        );
463        time::sleep(d).await;
464    }
465
466    async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
467        let has_child = !subprocedures.is_empty();
468        for subprocedure in subprocedures {
469            info!(
470                "Procedure {}-{} submit subprocedure {}-{}",
471                self.procedure.type_name(),
472                self.meta.id,
473                subprocedure.procedure.type_name(),
474                subprocedure.id,
475            );
476
477            self.submit_subprocedure(
478                subprocedure.id,
479                ProcedureState::Running,
480                subprocedure.procedure,
481            );
482        }
483
484        info!(
485            "Procedure {}-{} is waiting for subprocedures",
486            self.procedure.type_name(),
487            self.meta.id,
488        );
489
490        // Wait for subprocedures.
491        if has_child {
492            self.meta.child_notify.notified().await;
493
494            info!(
495                "Procedure {}-{} is waked up",
496                self.procedure.type_name(),
497                self.meta.id,
498            );
499        }
500    }
501
502    async fn persist_procedure(&mut self) -> Result<()> {
503        let type_name = self.procedure.type_name().to_string();
504        let data = self.procedure.dump()?;
505
506        self.store
507            .store_procedure(
508                self.meta.id,
509                self.step,
510                type_name,
511                data,
512                self.meta.parent_id,
513            )
514            .await
515            .map_err(|e| {
516                error!(
517                    e; "Failed to persist procedure {}-{}",
518                    self.procedure.type_name(),
519                    self.meta.id
520                );
521                e
522            })?;
523        self.step += 1;
524        Ok(())
525    }
526
527    async fn commit_procedure(&mut self) -> Result<()> {
528        self.store
529            .commit_procedure(self.meta.id, self.step)
530            .await
531            .map_err(|e| {
532                error!(
533                    e; "Failed to commit procedure {}-{}",
534                    self.procedure.type_name(),
535                    self.meta.id
536                );
537                e
538            })?;
539        self.step += 1;
540        Ok(())
541    }
542
543    async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
544        // Persists procedure state
545        let type_name = self.procedure.type_name().to_string();
546        let data = self.procedure.dump()?;
547        let message = ProcedureMessage {
548            type_name,
549            data,
550            parent_id: self.meta.parent_id,
551            step: self.step,
552            error: Some(error),
553        };
554        self.store
555            .rollback_procedure(self.meta.id, message)
556            .await
557            .map_err(|e| {
558                error!(
559                    e; "Failed to write rollback key for procedure {}-{}",
560                    self.procedure.type_name(),
561                    self.meta.id
562                );
563                e
564            })?;
565        self.step += 1;
566        Ok(())
567    }
568
569    fn done(&self, output: Option<Output>) {
570        // TODO(yingwen): Add files to remove list.
571        info!(
572            "Procedure {}-{} done",
573            self.procedure.type_name(),
574            self.meta.id,
575        );
576
577        // Mark the state of this procedure to done.
578        self.meta.set_state(ProcedureState::Done { output });
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use std::sync::atomic::{AtomicU64, Ordering};
585    use std::sync::Arc;
586
587    use async_trait::async_trait;
588    use common_error::ext::{ErrorExt, PlainError};
589    use common_error::mock::MockError;
590    use common_error::status_code::StatusCode;
591    use common_test_util::temp_dir::create_temp_dir;
592    use futures::future::join_all;
593    use futures_util::future::BoxFuture;
594    use futures_util::FutureExt;
595    use object_store::{EntryMode, ObjectStore};
596    use tokio::sync::mpsc;
597
598    use super::*;
599    use crate::local::{test_util, DynamicKeyLockGuard};
600    use crate::procedure::PoisonKeys;
601    use crate::store::proc_path;
602    use crate::test_util::InMemoryPoisonStore;
603    use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
604
605    const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
606
607    fn new_runner(
608        meta: ProcedureMetaRef,
609        procedure: BoxedProcedure,
610        store: Arc<ProcedureStore>,
611    ) -> Runner {
612        Runner {
613            meta,
614            procedure,
615            manager_ctx: Arc::new(ManagerContext::new(
616                Arc::new(InMemoryPoisonStore::default()),
617            )),
618            step: 0,
619            exponential_builder: ExponentialBuilder::default(),
620            store,
621            rolling_back: false,
622        }
623    }
624
625    async fn check_files(
626        object_store: &ObjectStore,
627        procedure_store: &ProcedureStore,
628        procedure_id: ProcedureId,
629        files: &[&str],
630    ) {
631        let dir = proc_path!(procedure_store, "{procedure_id}/");
632        let lister = object_store.list(&dir).await.unwrap();
633        let mut files_in_dir: Vec<_> = lister
634            .into_iter()
635            .filter(|x| x.metadata().mode() == EntryMode::FILE)
636            .map(|de| de.name().to_string())
637            .collect();
638        files_in_dir.sort_unstable();
639        assert_eq!(files, files_in_dir);
640    }
641
642    fn context_with_provider(
643        procedure_id: ProcedureId,
644        provider: Arc<dyn ContextProvider>,
645    ) -> Context {
646        Context {
647            procedure_id,
648            provider,
649        }
650    }
651
652    fn context_without_provider(procedure_id: ProcedureId) -> Context {
653        struct MockProvider;
654
655        #[async_trait]
656        impl ContextProvider for MockProvider {
657            async fn procedure_state(
658                &self,
659                _procedure_id: ProcedureId,
660            ) -> Result<Option<ProcedureState>> {
661                unimplemented!()
662            }
663
664            async fn try_put_poison(
665                &self,
666                _key: &PoisonKey,
667                _procedure_id: ProcedureId,
668            ) -> Result<()> {
669                unimplemented!()
670            }
671
672            async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
673                unimplemented!()
674            }
675        }
676
677        Context {
678            procedure_id,
679            provider: Arc::new(MockProvider),
680        }
681    }
682
683    type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
684
685    struct ProcedureAdapter<F> {
686        data: String,
687        lock_key: LockKey,
688        poison_keys: PoisonKeys,
689        exec_fn: F,
690        rollback_fn: Option<RollbackFn>,
691    }
692
693    impl<F> ProcedureAdapter<F> {
694        fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
695            let mut meta = test_util::procedure_meta_for_test();
696            meta.id = ProcedureId::parse_str(uuid).unwrap();
697            meta.lock_key = self.lock_key.clone();
698            meta.poison_keys = self.poison_keys.clone();
699
700            Arc::new(meta)
701        }
702    }
703
704    #[async_trait]
705    impl<F> Procedure for ProcedureAdapter<F>
706    where
707        F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
708    {
709        fn type_name(&self) -> &str {
710            "ProcedureAdapter"
711        }
712
713        async fn execute(&mut self, ctx: &Context) -> Result<Status> {
714            let f = (self.exec_fn)(ctx.clone());
715            f.await
716        }
717
718        async fn rollback(&mut self, ctx: &Context) -> Result<()> {
719            if let Some(f) = &mut self.rollback_fn {
720                return (f)(ctx.clone()).await;
721            }
722            Ok(())
723        }
724
725        fn rollback_supported(&self) -> bool {
726            self.rollback_fn.is_some()
727        }
728
729        fn dump(&self) -> Result<String> {
730            Ok(self.data.clone())
731        }
732
733        fn lock_key(&self) -> LockKey {
734            self.lock_key.clone()
735        }
736
737        fn poison_keys(&self) -> PoisonKeys {
738            self.poison_keys.clone()
739        }
740    }
741
742    async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
743        let mut times = 0;
744        let exec_fn = move |_| {
745            times += 1;
746            async move {
747                if times == 1 {
748                    Ok(Status::executing(persist))
749                } else {
750                    Ok(Status::done())
751                }
752            }
753            .boxed()
754        };
755        let normal = ProcedureAdapter {
756            data: "normal".to_string(),
757            lock_key: LockKey::single_exclusive("catalog.schema.table"),
758            poison_keys: PoisonKeys::default(),
759            exec_fn,
760            rollback_fn: None,
761        };
762
763        let dir = create_temp_dir("normal");
764        let meta = normal.new_meta(ROOT_ID);
765        let ctx = context_without_provider(meta.id);
766        let object_store = test_util::new_object_store(&dir);
767        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
768        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
769        runner.manager_ctx.start();
770
771        runner.execute_once(&ctx).await;
772        let state = runner.meta.state();
773        assert!(state.is_running(), "{state:?}");
774        check_files(
775            &object_store,
776            &procedure_store,
777            ctx.procedure_id,
778            first_files,
779        )
780        .await;
781
782        runner.execute_once(&ctx).await;
783        let state = runner.meta.state();
784        assert!(state.is_done(), "{state:?}");
785        check_files(
786            &object_store,
787            &procedure_store,
788            ctx.procedure_id,
789            second_files,
790        )
791        .await;
792    }
793
794    #[tokio::test]
795    async fn test_execute_once_normal() {
796        execute_once_normal(
797            true,
798            &["0000000000.step"],
799            &["0000000000.step", "0000000001.commit"],
800        )
801        .await;
802    }
803
804    #[tokio::test]
805    async fn test_execute_once_normal_skip_persist() {
806        execute_once_normal(false, &[], &["0000000000.commit"]).await;
807    }
808
809    #[tokio::test]
810    async fn test_on_suspend_empty() {
811        let exec_fn = move |_| {
812            async move {
813                Ok(Status::Suspended {
814                    subprocedures: Vec::new(),
815                    persist: false,
816                })
817            }
818            .boxed()
819        };
820        let suspend = ProcedureAdapter {
821            data: "suspend".to_string(),
822            lock_key: LockKey::single_exclusive("catalog.schema.table"),
823            poison_keys: PoisonKeys::default(),
824            exec_fn,
825            rollback_fn: None,
826        };
827
828        let dir = create_temp_dir("suspend");
829        let meta = suspend.new_meta(ROOT_ID);
830        let ctx = context_without_provider(meta.id);
831        let object_store = test_util::new_object_store(&dir);
832        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
833        let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
834        runner.manager_ctx.start();
835
836        runner.execute_once(&ctx).await;
837        let state = runner.meta.state();
838        assert!(state.is_running(), "{state:?}");
839    }
840
841    fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
842        let mut times = 0;
843        let exec_fn = move |_| {
844            times += 1;
845            async move {
846                if times == 1 {
847                    time::sleep(Duration::from_millis(200)).await;
848                    Ok(Status::executing(true))
849                } else {
850                    Ok(Status::done())
851                }
852            }
853            .boxed()
854        };
855        let child = ProcedureAdapter {
856            data: "child".to_string(),
857            lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
858            poison_keys: PoisonKeys::default(),
859            exec_fn,
860            rollback_fn: None,
861        };
862
863        ProcedureWithId {
864            id: procedure_id,
865            procedure: Box::new(child),
866        }
867    }
868
869    #[tokio::test]
870    async fn test_on_suspend_by_subprocedures() {
871        let mut times = 0;
872        let children_ids = [ProcedureId::random(), ProcedureId::random()];
873        let keys = [
874            &[
875                "catalog.schema.table.region-0",
876                "catalog.schema.table.region-1",
877            ],
878            &[
879                "catalog.schema.table.region-2",
880                "catalog.schema.table.region-3",
881            ],
882        ];
883
884        let exec_fn = move |ctx: Context| {
885            times += 1;
886            async move {
887                if times == 1 {
888                    // Submit subprocedures.
889                    Ok(Status::Suspended {
890                        subprocedures: children_ids
891                            .into_iter()
892                            .zip(keys)
893                            .map(|(id, key_slice)| new_child_procedure(id, key_slice))
894                            .collect(),
895                        persist: true,
896                    })
897                } else {
898                    // Wait for subprocedures.
899                    let mut all_child_done = true;
900                    for id in children_ids {
901                        let is_not_done = ctx
902                            .provider
903                            .procedure_state(id)
904                            .await
905                            .unwrap()
906                            .map(|s| !s.is_done())
907                            .unwrap_or(true);
908                        if is_not_done {
909                            all_child_done = false;
910                        }
911                    }
912                    if all_child_done {
913                        Ok(Status::done())
914                    } else {
915                        // Return suspended to wait for notify.
916                        Ok(Status::Suspended {
917                            subprocedures: Vec::new(),
918                            persist: false,
919                        })
920                    }
921                }
922            }
923            .boxed()
924        };
925        let parent = ProcedureAdapter {
926            data: "parent".to_string(),
927            lock_key: LockKey::single_exclusive("catalog.schema.table"),
928            poison_keys: PoisonKeys::default(),
929            exec_fn,
930            rollback_fn: None,
931        };
932
933        let dir = create_temp_dir("parent");
934        let meta = parent.new_meta(ROOT_ID);
935        let procedure_id = meta.id;
936
937        let object_store = test_util::new_object_store(&dir);
938        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
939        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
940        let poison_manager = Arc::new(InMemoryPoisonStore::default());
941        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
942        manager_ctx.start();
943        // Manually add this procedure to the manager ctx.
944        assert!(manager_ctx.try_insert_procedure(meta));
945        // Replace the manager ctx.
946        runner.manager_ctx = manager_ctx.clone();
947
948        runner.run().await;
949        assert!(manager_ctx.key_lock.is_empty());
950
951        // Check child procedures.
952        for child_id in children_ids {
953            let state = manager_ctx.state(child_id).unwrap();
954            assert!(state.is_done(), "{state:?}");
955        }
956        let state = manager_ctx.state(procedure_id).unwrap();
957        assert!(state.is_done(), "{state:?}");
958        // Files are removed.
959        check_files(&object_store, &procedure_store, procedure_id, &[]).await;
960
961        tokio::time::sleep(Duration::from_millis(5)).await;
962        // Clean outdated meta.
963        manager_ctx.remove_outdated_meta(Duration::from_millis(1));
964        assert!(manager_ctx.state(procedure_id).is_none());
965        assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
966        for child_id in children_ids {
967            assert!(manager_ctx.state(child_id).is_none());
968        }
969    }
970
971    #[tokio::test]
972    async fn test_running_is_stopped() {
973        let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
974        let normal = ProcedureAdapter {
975            data: "normal".to_string(),
976            lock_key: LockKey::single_exclusive("catalog.schema.table"),
977            poison_keys: PoisonKeys::default(),
978            exec_fn,
979            rollback_fn: None,
980        };
981
982        let dir = create_temp_dir("test_running_is_stopped");
983        let meta = normal.new_meta(ROOT_ID);
984        let ctx = context_without_provider(meta.id);
985        let object_store = test_util::new_object_store(&dir);
986        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
987        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
988        runner.manager_ctx.start();
989
990        runner.execute_once(&ctx).await;
991        let state = runner.meta.state();
992        assert!(state.is_running(), "{state:?}");
993        check_files(
994            &object_store,
995            &procedure_store,
996            ctx.procedure_id,
997            &["0000000000.step"],
998        )
999        .await;
1000
1001        runner.manager_ctx.stop();
1002        runner.execute_once(&ctx).await;
1003        let state = runner.meta.state();
1004        assert!(state.is_failed(), "{state:?}");
1005        // Shouldn't write any files
1006        check_files(
1007            &object_store,
1008            &procedure_store,
1009            ctx.procedure_id,
1010            &["0000000000.step"],
1011        )
1012        .await;
1013    }
1014
1015    #[tokio::test]
1016    async fn test_running_is_stopped_on_error() {
1017        let exec_fn =
1018            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1019        let normal = ProcedureAdapter {
1020            data: "fail".to_string(),
1021            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1022            poison_keys: PoisonKeys::default(),
1023            exec_fn,
1024            rollback_fn: None,
1025        };
1026
1027        let dir = create_temp_dir("test_running_is_stopped_on_error");
1028        let meta = normal.new_meta(ROOT_ID);
1029        let ctx = context_without_provider(meta.id);
1030        let object_store = test_util::new_object_store(&dir);
1031        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1032        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1033        runner.manager_ctx.stop();
1034
1035        runner.execute_once(&ctx).await;
1036        let state = runner.meta.state();
1037        assert!(state.is_failed(), "{state:?}");
1038        // Shouldn't write any files
1039        check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1040    }
1041
1042    #[tokio::test]
1043    async fn test_execute_on_error() {
1044        let exec_fn =
1045            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1046        let fail = ProcedureAdapter {
1047            data: "fail".to_string(),
1048            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1049            poison_keys: PoisonKeys::default(),
1050            exec_fn,
1051            rollback_fn: None,
1052        };
1053
1054        let dir = create_temp_dir("fail");
1055        let meta = fail.new_meta(ROOT_ID);
1056        let ctx = context_without_provider(meta.id);
1057        let object_store = test_util::new_object_store(&dir);
1058        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1059        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1060        runner.manager_ctx.start();
1061
1062        runner.execute_once(&ctx).await;
1063        let state = runner.meta.state();
1064        assert!(state.is_prepare_rollback(), "{state:?}");
1065
1066        runner.execute_once(&ctx).await;
1067        let state = runner.meta.state();
1068        assert!(state.is_failed(), "{state:?}");
1069        check_files(
1070            &object_store,
1071            &procedure_store,
1072            ctx.procedure_id,
1073            &["0000000000.rollback"],
1074        )
1075        .await;
1076    }
1077
1078    #[tokio::test]
1079    async fn test_execute_with_rollback_on_error() {
1080        let exec_fn =
1081            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1082        let rollback_fn = move |_| async move { Ok(()) }.boxed();
1083        let fail = ProcedureAdapter {
1084            data: "fail".to_string(),
1085            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1086            poison_keys: PoisonKeys::default(),
1087            exec_fn,
1088            rollback_fn: Some(Box::new(rollback_fn)),
1089        };
1090
1091        let dir = create_temp_dir("fail");
1092        let meta = fail.new_meta(ROOT_ID);
1093        let ctx = context_without_provider(meta.id);
1094        let object_store = test_util::new_object_store(&dir);
1095        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1096        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1097        runner.manager_ctx.start();
1098
1099        runner.execute_once(&ctx).await;
1100        let state = runner.meta.state();
1101        assert!(state.is_prepare_rollback(), "{state:?}");
1102
1103        runner.execute_once(&ctx).await;
1104        let state = runner.meta.state();
1105        assert!(state.is_rolling_back(), "{state:?}");
1106
1107        runner.execute_once(&ctx).await;
1108        let state = runner.meta.state();
1109        assert!(state.is_failed(), "{state:?}");
1110        check_files(
1111            &object_store,
1112            &procedure_store,
1113            ctx.procedure_id,
1114            &["0000000000.rollback"],
1115        )
1116        .await;
1117    }
1118
1119    #[tokio::test]
1120    async fn test_execute_on_retry_later_error() {
1121        let mut times = 0;
1122
1123        let exec_fn = move |_| {
1124            times += 1;
1125            async move {
1126                if times == 1 {
1127                    Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1128                } else {
1129                    Ok(Status::done())
1130                }
1131            }
1132            .boxed()
1133        };
1134
1135        let retry_later = ProcedureAdapter {
1136            data: "retry_later".to_string(),
1137            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1138            poison_keys: PoisonKeys::default(),
1139            exec_fn,
1140            rollback_fn: None,
1141        };
1142
1143        let dir = create_temp_dir("retry_later");
1144        let meta = retry_later.new_meta(ROOT_ID);
1145        let ctx = context_without_provider(meta.id);
1146        let object_store = test_util::new_object_store(&dir);
1147        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1148        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1149        runner.manager_ctx.start();
1150        runner.execute_once(&ctx).await;
1151        let state = runner.meta.state();
1152        assert!(state.is_retrying(), "{state:?}");
1153
1154        runner.execute_once(&ctx).await;
1155        let state = runner.meta.state();
1156        assert!(state.is_done(), "{state:?}");
1157        assert!(meta.state().is_done());
1158        check_files(
1159            &object_store,
1160            &procedure_store,
1161            ctx.procedure_id,
1162            &["0000000000.commit"],
1163        )
1164        .await;
1165    }
1166
1167    #[tokio::test]
1168    async fn test_execute_exceed_max_retry_later() {
1169        let exec_fn =
1170            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1171
1172        let exceed_max_retry_later = ProcedureAdapter {
1173            data: "exceed_max_retry_later".to_string(),
1174            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1175            poison_keys: PoisonKeys::default(),
1176            exec_fn,
1177            rollback_fn: None,
1178        };
1179
1180        let dir = create_temp_dir("exceed_max_retry_later");
1181        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1182        let object_store = test_util::new_object_store(&dir);
1183        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1184        let mut runner = new_runner(
1185            meta.clone(),
1186            Box::new(exceed_max_retry_later),
1187            procedure_store,
1188        );
1189        runner.manager_ctx.start();
1190
1191        runner.exponential_builder = ExponentialBuilder::default()
1192            .with_min_delay(Duration::from_millis(1))
1193            .with_max_times(3);
1194
1195        // Run the runner and execute the procedure.
1196        runner.execute_procedure_in_loop().await;
1197        let err = meta.state().error().unwrap().to_string();
1198        assert!(err.contains("Procedure retry exceeded max times"));
1199    }
1200
1201    #[tokio::test]
1202    async fn test_rollback_exceed_max_retry_later() {
1203        let exec_fn =
1204            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1205        let rollback_fn = move |_| {
1206            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1207        };
1208        let exceed_max_retry_later = ProcedureAdapter {
1209            data: "exceed_max_rollback".to_string(),
1210            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1211            poison_keys: PoisonKeys::default(),
1212            exec_fn,
1213            rollback_fn: Some(Box::new(rollback_fn)),
1214        };
1215
1216        let dir = create_temp_dir("exceed_max_rollback");
1217        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1218        let object_store = test_util::new_object_store(&dir);
1219        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1220        let mut runner = new_runner(
1221            meta.clone(),
1222            Box::new(exceed_max_retry_later),
1223            procedure_store,
1224        );
1225        runner.manager_ctx.start();
1226        runner.exponential_builder = ExponentialBuilder::default()
1227            .with_min_delay(Duration::from_millis(1))
1228            .with_max_times(3);
1229
1230        // Run the runner and execute the procedure.
1231        runner.execute_procedure_in_loop().await;
1232        let err = meta.state().error().unwrap().to_string();
1233        assert!(err.contains("Procedure rollback exceeded max times"));
1234    }
1235
1236    #[tokio::test]
1237    async fn test_rollback_after_retry_fail() {
1238        let exec_fn = move |_| {
1239            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1240        };
1241
1242        let (tx, mut rx) = mpsc::channel(1);
1243        let rollback_fn = move |_| {
1244            let tx = tx.clone();
1245            async move {
1246                tx.send(()).await.unwrap();
1247                Ok(())
1248            }
1249            .boxed()
1250        };
1251        let retry_later = ProcedureAdapter {
1252            data: "rollback_after_retry_fail".to_string(),
1253            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1254            poison_keys: PoisonKeys::default(),
1255            exec_fn,
1256            rollback_fn: Some(Box::new(rollback_fn)),
1257        };
1258
1259        let dir = create_temp_dir("retry_later");
1260        let meta = retry_later.new_meta(ROOT_ID);
1261        let ctx = context_without_provider(meta.id);
1262        let object_store = test_util::new_object_store(&dir);
1263        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1264        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1265        runner.manager_ctx.start();
1266        runner.exponential_builder = ExponentialBuilder::default()
1267            .with_min_delay(Duration::from_millis(1))
1268            .with_max_times(3);
1269        // Run the runner and execute the procedure.
1270        runner.execute_procedure_in_loop().await;
1271        rx.recv().await.unwrap();
1272        assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1273        check_files(
1274            &object_store,
1275            &procedure_store,
1276            ctx.procedure_id,
1277            &["0000000000.rollback"],
1278        )
1279        .await;
1280    }
1281
1282    #[tokio::test]
1283    async fn test_child_error() {
1284        let mut times = 0;
1285        let child_id = ProcedureId::random();
1286
1287        let exec_fn = move |ctx: Context| {
1288            times += 1;
1289            async move {
1290                if times == 1 {
1291                    // Submit subprocedures.
1292                    let exec_fn = |_| {
1293                        async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1294                            .boxed()
1295                    };
1296                    let fail = ProcedureAdapter {
1297                        data: "fail".to_string(),
1298                        lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1299                        poison_keys: PoisonKeys::default(),
1300                        exec_fn,
1301                        rollback_fn: None,
1302                    };
1303
1304                    Ok(Status::Suspended {
1305                        subprocedures: vec![ProcedureWithId {
1306                            id: child_id,
1307                            procedure: Box::new(fail),
1308                        }],
1309                        persist: true,
1310                    })
1311                } else {
1312                    // Wait for subprocedures.
1313                    let state = ctx.provider.procedure_state(child_id).await.unwrap();
1314                    let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1315                    if is_failed {
1316                        // The parent procedure to abort itself if child procedure is failed.
1317                        Err(Error::from_error_ext(PlainError::new(
1318                            "subprocedure failed".to_string(),
1319                            StatusCode::Unexpected,
1320                        )))
1321                    } else {
1322                        // Return suspended to wait for notify.
1323                        Ok(Status::Suspended {
1324                            subprocedures: Vec::new(),
1325                            persist: false,
1326                        })
1327                    }
1328                }
1329            }
1330            .boxed()
1331        };
1332        let parent = ProcedureAdapter {
1333            data: "parent".to_string(),
1334            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1335            poison_keys: PoisonKeys::default(),
1336            exec_fn,
1337            rollback_fn: None,
1338        };
1339
1340        let dir = create_temp_dir("child_err");
1341        let meta = parent.new_meta(ROOT_ID);
1342
1343        let object_store = test_util::new_object_store(&dir);
1344        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1345        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1346        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1347        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1348        manager_ctx.start();
1349        // Manually add this procedure to the manager ctx.
1350        assert!(manager_ctx.try_insert_procedure(meta.clone()));
1351        // Replace the manager ctx.
1352        runner.manager_ctx = manager_ctx.clone();
1353
1354        // Run the runner and execute the procedure.
1355        runner.run().await;
1356        assert!(manager_ctx.key_lock.is_empty());
1357        let err = meta.state().error().unwrap().output_msg();
1358        assert!(err.contains("subprocedure failed"), "{err}");
1359    }
1360
1361    #[tokio::test]
1362    async fn test_execute_with_clean_poisons() {
1363        common_telemetry::init_default_ut_logging();
1364        let mut times = 0;
1365        let poison_key = PoisonKey::new("table/1024");
1366        let moved_poison_key = poison_key.clone();
1367        let exec_fn = move |ctx: Context| {
1368            times += 1;
1369            let poison_key = moved_poison_key.clone();
1370            async move {
1371                if times == 1 {
1372                    // Put the poison to the context.
1373                    ctx.provider
1374                        .try_put_poison(&poison_key, ctx.procedure_id)
1375                        .await
1376                        .unwrap();
1377
1378                    Ok(Status::executing(true))
1379                } else {
1380                    Ok(Status::executing_with_clean_poisons(true))
1381                }
1382            }
1383            .boxed()
1384        };
1385        let poison = ProcedureAdapter {
1386            data: "poison".to_string(),
1387            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1388            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1389            exec_fn,
1390            rollback_fn: None,
1391        };
1392
1393        let dir = create_temp_dir("clean_poisons");
1394        let meta = poison.new_meta(ROOT_ID);
1395
1396        let object_store = test_util::new_object_store(&dir);
1397        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1398        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1399
1400        // Use the manager ctx as the context provider.
1401        let ctx = context_with_provider(
1402            meta.id,
1403            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1404        );
1405        // Manually add this procedure to the manager ctx.
1406        runner
1407            .manager_ctx
1408            .procedures
1409            .write()
1410            .unwrap()
1411            .insert(meta.id, runner.meta.clone());
1412
1413        runner.manager_ctx.start();
1414        runner.execute_once(&ctx).await;
1415        let state = runner.meta.state();
1416        assert!(state.is_running(), "{state:?}");
1417
1418        let procedure_id = runner
1419            .manager_ctx
1420            .poison_manager
1421            .get_poison(&poison_key.to_string())
1422            .await
1423            .unwrap();
1424        // poison key should be exist.
1425        assert!(procedure_id.is_some());
1426
1427        runner.execute_once(&ctx).await;
1428        let state = runner.meta.state();
1429        assert!(state.is_running(), "{state:?}");
1430
1431        let procedure_id = runner
1432            .manager_ctx
1433            .poison_manager
1434            .get_poison(&poison_key.to_string())
1435            .await
1436            .unwrap();
1437        // poison key should be deleted.
1438        assert!(procedure_id.is_none());
1439    }
1440
1441    #[tokio::test]
1442    async fn test_execute_error_with_clean_poisons() {
1443        common_telemetry::init_default_ut_logging();
1444        let mut times = 0;
1445        let poison_key = PoisonKey::new("table/1024");
1446        let moved_poison_key = poison_key.clone();
1447        let exec_fn = move |ctx: Context| {
1448            times += 1;
1449            let poison_key = moved_poison_key.clone();
1450            async move {
1451                if times == 1 {
1452                    // Put the poison to the context.
1453                    ctx.provider
1454                        .try_put_poison(&poison_key, ctx.procedure_id)
1455                        .await
1456                        .unwrap();
1457
1458                    Ok(Status::executing(true))
1459                } else {
1460                    Err(Error::external_and_clean_poisons(MockError::new(
1461                        StatusCode::Unexpected,
1462                    )))
1463                }
1464            }
1465            .boxed()
1466        };
1467        let poison = ProcedureAdapter {
1468            data: "poison".to_string(),
1469            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1470            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1471            exec_fn,
1472            rollback_fn: None,
1473        };
1474
1475        let dir = create_temp_dir("error_with_clean_poisons");
1476        let meta = poison.new_meta(ROOT_ID);
1477
1478        let object_store = test_util::new_object_store(&dir);
1479        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1480        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1481
1482        // Use the manager ctx as the context provider.
1483        let ctx = context_with_provider(
1484            meta.id,
1485            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1486        );
1487        // Manually add this procedure to the manager ctx.
1488        runner
1489            .manager_ctx
1490            .procedures
1491            .write()
1492            .unwrap()
1493            .insert(meta.id, runner.meta.clone());
1494
1495        runner.manager_ctx.start();
1496        runner.execute_once(&ctx).await;
1497        let state = runner.meta.state();
1498        assert!(state.is_running(), "{state:?}");
1499
1500        let procedure_id = runner
1501            .manager_ctx
1502            .poison_manager
1503            .get_poison(&poison_key.to_string())
1504            .await
1505            .unwrap();
1506        // poison key should be exist.
1507        assert!(procedure_id.is_some());
1508
1509        runner.execute_once(&ctx).await;
1510        let state = runner.meta.state();
1511        assert!(state.is_prepare_rollback(), "{state:?}");
1512
1513        let procedure_id = runner
1514            .manager_ctx
1515            .poison_manager
1516            .get_poison(&poison_key.to_string())
1517            .await
1518            .unwrap();
1519        // poison key should be deleted.
1520        assert!(procedure_id.is_none());
1521    }
1522
1523    #[tokio::test]
1524    async fn test_execute_failed_after_set_poison() {
1525        let mut times = 0;
1526        let poison_key = PoisonKey::new("table/1024");
1527        let moved_poison_key = poison_key.clone();
1528        let exec_fn = move |ctx: Context| {
1529            times += 1;
1530            let poison_key = moved_poison_key.clone();
1531            async move {
1532                if times == 1 {
1533                    Ok(Status::executing(true))
1534                } else {
1535                    // Put the poison to the context.
1536                    ctx.provider
1537                        .try_put_poison(&poison_key, ctx.procedure_id)
1538                        .await
1539                        .unwrap();
1540                    Err(Error::external(MockError::new(StatusCode::Unexpected)))
1541                }
1542            }
1543            .boxed()
1544        };
1545        let poison = ProcedureAdapter {
1546            data: "poison".to_string(),
1547            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1548            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1549            exec_fn,
1550            rollback_fn: None,
1551        };
1552
1553        let dir = create_temp_dir("poison");
1554        let meta = poison.new_meta(ROOT_ID);
1555
1556        let object_store = test_util::new_object_store(&dir);
1557        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1558        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1559
1560        // Use the manager ctx as the context provider.
1561        let ctx = context_with_provider(
1562            meta.id,
1563            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1564        );
1565        // Manually add this procedure to the manager ctx.
1566        runner
1567            .manager_ctx
1568            .procedures
1569            .write()
1570            .unwrap()
1571            .insert(meta.id, runner.meta.clone());
1572
1573        runner.manager_ctx.start();
1574        runner.execute_once(&ctx).await;
1575        let state = runner.meta.state();
1576        assert!(state.is_running(), "{state:?}");
1577
1578        runner.execute_once(&ctx).await;
1579        let state = runner.meta.state();
1580        assert!(state.is_prepare_rollback(), "{state:?}");
1581        assert!(meta.state().is_prepare_rollback());
1582
1583        runner.execute_once(&ctx).await;
1584        let state = runner.meta.state();
1585        assert!(state.is_failed(), "{state:?}");
1586        assert!(meta.state().is_failed());
1587
1588        // Check the poison is set.
1589        let procedure_id = runner
1590            .manager_ctx
1591            .poison_manager
1592            .get_poison(&poison_key.to_string())
1593            .await
1594            .unwrap()
1595            .unwrap();
1596
1597        // If the procedure is poisoned, the poison key shouldn't be deleted.
1598        assert_eq!(&procedure_id.to_string(), ROOT_ID);
1599    }
1600
1601    #[tokio::test]
1602    async fn test_execute_poisoned() {
1603        let mut times = 0;
1604        let poison_key = PoisonKey::new("table/1024");
1605        let moved_poison_key = poison_key.clone();
1606        let exec_fn = move |ctx: Context| {
1607            times += 1;
1608            let poison_key = moved_poison_key.clone();
1609            async move {
1610                if times == 1 {
1611                    Ok(Status::executing(true))
1612                } else {
1613                    // Put the poison to the context.
1614                    ctx.provider
1615                        .try_put_poison(&poison_key, ctx.procedure_id)
1616                        .await
1617                        .unwrap();
1618                    Ok(Status::Poisoned {
1619                        keys: PoisonKeys::new(vec![poison_key.clone()]),
1620                        error: Error::external(MockError::new(StatusCode::Unexpected)),
1621                    })
1622                }
1623            }
1624            .boxed()
1625        };
1626        let poison = ProcedureAdapter {
1627            data: "poison".to_string(),
1628            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1629            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1630            exec_fn,
1631            rollback_fn: None,
1632        };
1633
1634        let dir = create_temp_dir("poison");
1635        let meta = poison.new_meta(ROOT_ID);
1636
1637        let object_store = test_util::new_object_store(&dir);
1638        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1639        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1640
1641        // Use the manager ctx as the context provider.
1642        let ctx = context_with_provider(
1643            meta.id,
1644            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1645        );
1646        // Manually add this procedure to the manager ctx.
1647        runner
1648            .manager_ctx
1649            .procedures
1650            .write()
1651            .unwrap()
1652            .insert(meta.id, runner.meta.clone());
1653
1654        runner.manager_ctx.start();
1655        runner.execute_once(&ctx).await;
1656        let state = runner.meta.state();
1657        assert!(state.is_running(), "{state:?}");
1658
1659        runner.execute_once(&ctx).await;
1660        let state = runner.meta.state();
1661        assert!(state.is_poisoned(), "{state:?}");
1662        assert!(meta.state().is_poisoned());
1663        check_files(
1664            &object_store,
1665            &procedure_store,
1666            ctx.procedure_id,
1667            &["0000000000.step"],
1668        )
1669        .await;
1670
1671        // Check the poison is set.
1672        let procedure_id = runner
1673            .manager_ctx
1674            .poison_manager
1675            .get_poison(&poison_key.to_string())
1676            .await
1677            .unwrap()
1678            .unwrap();
1679
1680        // If the procedure is poisoned, the poison key shouldn't be deleted.
1681        assert_eq!(procedure_id, ROOT_ID);
1682    }
1683
1684    fn test_procedure_with_dynamic_lock(
1685        shared_atomic_value: Arc<AtomicU64>,
1686        id: u64,
1687    ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1688        let exec_fn = move |ctx: Context| {
1689            let moved_shared_atomic_value = shared_atomic_value.clone();
1690            let moved_ctx = ctx.clone();
1691            async move {
1692                debug!("Acquiring write lock, id: {}", id);
1693                let key = StringKey::Exclusive("test_lock".to_string());
1694                let guard = moved_ctx.provider.acquire_lock(&key).await;
1695                debug!("Acquired write lock, id: {}", id);
1696                let millis = rand::rng().random_range(10..=50);
1697                tokio::time::sleep(Duration::from_millis(millis)).await;
1698                let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1699                moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1700                debug!("Dropping write lock, id: {}", id);
1701                drop(guard);
1702
1703                Ok(Status::done())
1704            }
1705            .boxed()
1706        };
1707
1708        let adapter = ProcedureAdapter {
1709            data: "dynamic_lock".to_string(),
1710            lock_key: LockKey::new_exclusive([]),
1711            poison_keys: PoisonKeys::new([]),
1712            exec_fn,
1713            rollback_fn: None,
1714        };
1715        let meta = adapter.new_meta(ROOT_ID);
1716
1717        (Box::new(adapter), meta)
1718    }
1719
1720    #[tokio::test(flavor = "multi_thread")]
1721    async fn test_execute_with_dynamic_lock() {
1722        common_telemetry::init_default_ut_logging();
1723        let shared_atomic_value = Arc::new(AtomicU64::new(0));
1724        let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1725        let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1726
1727        let dir = create_temp_dir("dynamic_lock");
1728        let object_store = test_util::new_object_store(&dir);
1729        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1730        let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
1731        let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
1732        let ctx1 = context_with_provider(
1733            meta1.id,
1734            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1735        );
1736        let ctx2 = context_with_provider(
1737            meta2.id,
1738            // use same manager ctx as runner1
1739            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1740        );
1741        let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
1742        join_all(tasks).await;
1743        assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
1744    }
1745}