common_procedure/local/
runner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_event_recorder::EventRecorderRef;
21use common_telemetry::{debug, error, info};
22use rand::Rng;
23use snafu::ResultExt;
24use tokio::time;
25
26use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
27use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
28use crate::procedure::{Output, StringKey};
29use crate::rwlock::OwnedKeyRwLockGuard;
30use crate::store::{ProcedureMessage, ProcedureStore};
31use crate::{
32    BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
33};
34
35/// A guard to cleanup procedure state.
36struct ProcedureGuard {
37    meta: ProcedureMetaRef,
38    manager_ctx: Arc<ManagerContext>,
39    key_guards: Vec<OwnedKeyRwLockGuard>,
40    finish: bool,
41}
42
43impl ProcedureGuard {
44    /// Returns a new [ProcedureGuard].
45    fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
46        ProcedureGuard {
47            meta,
48            manager_ctx,
49            key_guards: vec![],
50            finish: false,
51        }
52    }
53
54    /// The procedure is finished successfully.
55    fn finish(mut self) {
56        self.finish = true;
57    }
58}
59
60impl Drop for ProcedureGuard {
61    fn drop(&mut self) {
62        if !self.finish {
63            error!("Procedure {} exits unexpectedly", self.meta.id);
64
65            // Set state to failed. This is useful in test as runtime may not abort when the runner task panics.
66            // See https://github.com/tokio-rs/tokio/issues/2002 .
67            // We set set_panic_hook() in the application's main function. But our tests don't have this panic hook.
68            let err = ProcedurePanicSnafu {
69                procedure_id: self.meta.id,
70            }
71            .build();
72            self.meta.set_state(ProcedureState::failed(Arc::new(err)));
73        }
74
75        // Notify parent procedure.
76        if let Some(parent_id) = self.meta.parent_id {
77            self.manager_ctx.notify_by_subprocedure(parent_id);
78        }
79
80        // Drops the key guards in the reverse order.
81        while !self.key_guards.is_empty() {
82            self.key_guards.pop();
83        }
84
85        // Clean the staled locks.
86        self.manager_ctx
87            .key_lock
88            .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
89    }
90}
91
92pub(crate) struct Runner {
93    pub(crate) meta: ProcedureMetaRef,
94    pub(crate) procedure: BoxedProcedure,
95    pub(crate) manager_ctx: Arc<ManagerContext>,
96    pub(crate) step: u32,
97    pub(crate) exponential_builder: ExponentialBuilder,
98    pub(crate) store: Arc<ProcedureStore>,
99    pub(crate) rolling_back: bool,
100    pub(crate) event_recorder: Option<EventRecorderRef>,
101}
102
103impl Runner {
104    /// Return `ProcedureManager` is running.
105    pub(crate) fn running(&self) -> bool {
106        self.manager_ctx.running()
107    }
108
109    /// Run the procedure.
110    pub(crate) async fn run(mut self) {
111        // Ensure we can update the procedure state.
112        let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
113
114        info!(
115            "Runner {}-{} starts",
116            self.procedure.type_name(),
117            self.meta.id
118        );
119
120        // TODO(yingwen): Detect recursive locking (and deadlock) if possible. Maybe we could detect
121        // recursive locking by adding a root procedure id to the meta.
122        for key in self.meta.lock_key.keys_to_lock() {
123            // Acquire lock for each key.
124            let key_guard = match key {
125                StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
126                StringKey::Exclusive(key) => {
127                    self.manager_ctx.key_lock.write(key.clone()).await.into()
128                }
129            };
130
131            guard.key_guards.push(key_guard);
132        }
133
134        // Execute the procedure. We need to release the lock whenever the execution
135        // is successful or fail.
136        self.meta.set_start_time_ms();
137        self.execute_procedure_in_loop().await;
138        self.meta.set_end_time_ms();
139
140        // We can't remove the metadata of the procedure now as users and its parent might
141        // need to query its state.
142        // TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store
143        // so we don't need to always store the metadata in memory after the procedure is done.
144
145        // Release locks and notify parent procedure.
146        guard.finish();
147
148        // If this is the root procedure, clean up message cache.
149        if self.meta.parent_id.is_none() {
150            let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
151            // Clean resources.
152            self.manager_ctx.on_procedures_finish(&procedure_ids);
153
154            // If `ProcedureManager` is stopped, it stops the current task immediately without deleting the procedure.
155            if !self.running() {
156                return;
157            }
158
159            for id in procedure_ids {
160                if let Err(e) = self.store.delete_procedure(id).await {
161                    error!(
162                        e;
163                        "Runner {}-{} failed to delete procedure {}",
164                        self.procedure.type_name(),
165                        self.meta.id,
166                        id,
167                    );
168                }
169            }
170        }
171
172        info!(
173            "Runner {}-{} exits",
174            self.procedure.type_name(),
175            self.meta.id
176        );
177    }
178
179    async fn execute_procedure_in_loop(&mut self) {
180        let ctx = Context {
181            procedure_id: self.meta.id,
182            provider: self.manager_ctx.clone(),
183        };
184
185        self.rolling_back = false;
186        self.execute_once_with_retry(&ctx).await;
187    }
188
189    async fn execute_once_with_retry(&mut self, ctx: &Context) {
190        let mut retry = self.exponential_builder.build();
191        let mut retry_times = 0;
192
193        let mut rollback = self.exponential_builder.build();
194        let mut rollback_times = 0;
195
196        loop {
197            // Don't store state if `ProcedureManager` is stopped.
198            if !self.running() {
199                self.meta.set_state(ProcedureState::failed(Arc::new(
200                    error::ManagerNotStartSnafu {}.build(),
201                )));
202                return;
203            }
204            let state = self.meta.state();
205            match state {
206                ProcedureState::Running => {}
207                ProcedureState::Retrying { error } => {
208                    retry_times += 1;
209                    if let Some(d) = retry.next() {
210                        let millis = d.as_millis() as u64;
211                        // Add random noise to the retry delay to avoid retry storms.
212                        let noise = rand::rng().random_range(0..(millis / 4) + 1);
213                        let d = d.add(Duration::from_millis(noise));
214
215                        self.wait_on_err(d, retry_times).await;
216                    } else {
217                        self.meta
218                            .set_state(ProcedureState::prepare_rollback(Arc::new(
219                                Error::RetryTimesExceeded {
220                                    source: error.clone(),
221                                    procedure_id: self.meta.id,
222                                },
223                            )));
224                    }
225                }
226                ProcedureState::PrepareRollback { error }
227                | ProcedureState::RollingBack { error } => {
228                    rollback_times += 1;
229                    if let Some(d) = rollback.next() {
230                        self.wait_on_err(d, rollback_times).await;
231                    } else {
232                        let err = Err::<(), Arc<Error>>(error)
233                            .context(RollbackTimesExceededSnafu {
234                                procedure_id: self.meta.id,
235                            })
236                            .unwrap_err();
237                        self.meta.set_state(ProcedureState::failed(Arc::new(err)));
238                        return;
239                    }
240                }
241                ProcedureState::Done { .. } => return,
242                ProcedureState::Failed { .. } => return,
243                ProcedureState::Poisoned { .. } => return,
244            }
245            self.execute_once(ctx).await;
246        }
247    }
248
249    async fn clean_poisons(&mut self) -> Result<()> {
250        let mut error = None;
251        for key in self.meta.poison_keys.iter() {
252            let key = key.to_string();
253            if let Err(e) = self
254                .manager_ctx
255                .poison_manager
256                .delete_poison(key, self.meta.id.to_string())
257                .await
258            {
259                error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
260                error = Some(e);
261            }
262        }
263
264        // returns the last error if any.
265        if let Some(e) = error {
266            return Err(e);
267        }
268        Ok(())
269    }
270
271    async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
272        if self.procedure.rollback_supported() {
273            if let Err(e) = self.procedure.rollback(ctx).await {
274                self.meta
275                    .set_state(ProcedureState::rolling_back(Arc::new(e)));
276                return;
277            }
278        }
279        self.meta.set_state(ProcedureState::failed(err));
280    }
281
282    async fn prepare_rollback(&mut self, err: Arc<Error>) {
283        if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
284            self.meta
285                .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
286            return;
287        }
288        if self.procedure.rollback_supported() {
289            self.meta.set_state(ProcedureState::rolling_back(err));
290        } else {
291            self.meta.set_state(ProcedureState::failed(err));
292        }
293    }
294
295    async fn execute_once(&mut self, ctx: &Context) {
296        match self.meta.state() {
297            ProcedureState::Running | ProcedureState::Retrying { .. } => {
298                match self.procedure.execute(ctx).await {
299                    Ok(status) => {
300                        debug!(
301                            "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
302                            self.procedure.type_name(),
303                            self.meta.id,
304                            status,
305                            status.need_persist(),
306                        );
307
308                        // Don't store state if `ProcedureManager` is stopped.
309                        if !self.running() {
310                            self.meta.set_state(ProcedureState::failed(Arc::new(
311                                error::ManagerNotStartSnafu {}.build(),
312                            )));
313                            return;
314                        }
315
316                        // Cleans poisons before persist.
317                        if status.need_clean_poisons() {
318                            if let Err(e) = self.clean_poisons().await {
319                                error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
320                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
321                                return;
322                            }
323                        }
324
325                        if status.need_persist() {
326                            if let Err(e) = self.persist_procedure().await {
327                                error!(e; "Failed to persist procedure: {}", self.meta.id);
328                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
329                                return;
330                            }
331                        }
332
333                        match status {
334                            Status::Executing { .. } => {}
335                            Status::Suspended { subprocedures, .. } => {
336                                self.on_suspended(subprocedures).await;
337                            }
338                            Status::Done { output } => {
339                                if let Err(e) = self.commit_procedure().await {
340                                    error!(e; "Failed to commit procedure: {}", self.meta.id);
341                                    self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
342                                    return;
343                                }
344
345                                self.done(output);
346                            }
347                            Status::Poisoned { error, keys } => {
348                                error!(
349                                    error;
350                                    "Procedure {}-{} is poisoned, keys: {:?}",
351                                    self.procedure.type_name(),
352                                    self.meta.id,
353                                    keys,
354                                );
355                                self.meta
356                                    .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
357                            }
358                        }
359                    }
360                    Err(e) => {
361                        error!(
362                            e;
363                            "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
364                            self.procedure.type_name(),
365                            self.meta.id,
366                            e.is_retry_later(),
367                            e.need_clean_poisons(),
368                        );
369
370                        // Don't store state if `ProcedureManager` is stopped.
371                        if !self.running() {
372                            self.meta.set_state(ProcedureState::failed(Arc::new(
373                                error::ManagerNotStartSnafu {}.build(),
374                            )));
375                            return;
376                        }
377
378                        if e.need_clean_poisons() {
379                            if let Err(e) = self.clean_poisons().await {
380                                error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
381                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
382                                return;
383                            }
384                            debug!(
385                                "Procedure {}-{} cleaned poisons",
386                                self.procedure.type_name(),
387                                self.meta.id,
388                            );
389                        }
390
391                        if e.is_retry_later() {
392                            self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
393                            return;
394                        }
395
396                        self.meta
397                            .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
398                    }
399                }
400            }
401            ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
402            ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
403            ProcedureState::Failed { .. }
404            | ProcedureState::Done { .. }
405            | ProcedureState::Poisoned { .. } => (),
406        }
407    }
408
409    /// Submit a subprocedure with specific `procedure_id`.
410    fn submit_subprocedure(
411        &self,
412        procedure_id: ProcedureId,
413        procedure_state: ProcedureState,
414        procedure: BoxedProcedure,
415    ) {
416        if self.manager_ctx.contains_procedure(procedure_id) {
417            // If the parent has already submitted this procedure, don't submit it again.
418            return;
419        }
420
421        let step = 0;
422
423        let meta = Arc::new(ProcedureMeta::new(
424            procedure_id,
425            procedure_state,
426            Some(self.meta.id),
427            procedure.lock_key(),
428            procedure.poison_keys(),
429            procedure.type_name(),
430            self.event_recorder.clone(),
431            procedure.user_metadata(),
432        ));
433        let runner = Runner {
434            meta: meta.clone(),
435            procedure,
436            manager_ctx: self.manager_ctx.clone(),
437            step,
438            exponential_builder: self.exponential_builder,
439            store: self.store.clone(),
440            rolling_back: false,
441            event_recorder: self.event_recorder.clone(),
442        };
443
444        // Insert the procedure. We already check the procedure existence before inserting
445        // so we add an assertion to ensure the procedure id is unique and no other procedures
446        // using the same procedure id.
447        assert!(
448            self.manager_ctx.try_insert_procedure(meta),
449            "Procedure {}-{} submit an existing procedure {}-{}",
450            self.procedure.type_name(),
451            self.meta.id,
452            runner.procedure.type_name(),
453            procedure_id,
454        );
455
456        // Add the id of the subprocedure to the metadata.
457        self.meta.push_child(procedure_id);
458
459        let _handle = common_runtime::spawn_global(async move {
460            // Run the root procedure.
461            runner.run().await
462        });
463    }
464
465    /// Extend the retry time to wait for the next retry.
466    async fn wait_on_err(&mut self, d: Duration, i: u64) {
467        info!(
468            "Procedure {}-{} retry for the {} times after {} millis",
469            self.procedure.type_name(),
470            self.meta.id,
471            i,
472            d.as_millis(),
473        );
474        time::sleep(d).await;
475    }
476
477    async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
478        let has_child = !subprocedures.is_empty();
479        for subprocedure in subprocedures {
480            info!(
481                "Procedure {}-{} submit subprocedure {}-{}",
482                self.procedure.type_name(),
483                self.meta.id,
484                subprocedure.procedure.type_name(),
485                subprocedure.id,
486            );
487
488            self.submit_subprocedure(
489                subprocedure.id,
490                ProcedureState::Running,
491                subprocedure.procedure,
492            );
493        }
494
495        info!(
496            "Procedure {}-{} is waiting for subprocedures",
497            self.procedure.type_name(),
498            self.meta.id,
499        );
500
501        // Wait for subprocedures.
502        if has_child {
503            self.meta.child_notify.notified().await;
504
505            info!(
506                "Procedure {}-{} is waked up",
507                self.procedure.type_name(),
508                self.meta.id,
509            );
510        }
511    }
512
513    async fn persist_procedure(&mut self) -> Result<()> {
514        let type_name = self.procedure.type_name().to_string();
515        let data = self.procedure.dump()?;
516
517        self.store
518            .store_procedure(
519                self.meta.id,
520                self.step,
521                type_name,
522                data,
523                self.meta.parent_id,
524            )
525            .await
526            .map_err(|e| {
527                error!(
528                    e; "Failed to persist procedure {}-{}",
529                    self.procedure.type_name(),
530                    self.meta.id
531                );
532                e
533            })?;
534        self.step += 1;
535        Ok(())
536    }
537
538    async fn commit_procedure(&mut self) -> Result<()> {
539        self.store
540            .commit_procedure(self.meta.id, self.step)
541            .await
542            .map_err(|e| {
543                error!(
544                    e; "Failed to commit procedure {}-{}",
545                    self.procedure.type_name(),
546                    self.meta.id
547                );
548                e
549            })?;
550        self.step += 1;
551        Ok(())
552    }
553
554    async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
555        // Persists procedure state
556        let type_name = self.procedure.type_name().to_string();
557        let data = self.procedure.dump()?;
558        let message = ProcedureMessage {
559            type_name,
560            data,
561            parent_id: self.meta.parent_id,
562            step: self.step,
563            error: Some(error),
564        };
565        self.store
566            .rollback_procedure(self.meta.id, message)
567            .await
568            .map_err(|e| {
569                error!(
570                    e; "Failed to write rollback key for procedure {}-{}",
571                    self.procedure.type_name(),
572                    self.meta.id
573                );
574                e
575            })?;
576        self.step += 1;
577        Ok(())
578    }
579
580    fn done(&self, output: Option<Output>) {
581        // TODO(yingwen): Add files to remove list.
582        info!(
583            "Procedure {}-{} done",
584            self.procedure.type_name(),
585            self.meta.id,
586        );
587
588        // Mark the state of this procedure to done.
589        self.meta.set_state(ProcedureState::Done { output });
590    }
591}
592
593#[cfg(test)]
594mod tests {
595    use std::assert_matches::assert_matches;
596    use std::sync::atomic::{AtomicU64, Ordering};
597    use std::sync::Arc;
598
599    use async_trait::async_trait;
600    use common_error::ext::{ErrorExt, PlainError};
601    use common_error::mock::MockError;
602    use common_error::status_code::StatusCode;
603    use common_test_util::temp_dir::create_temp_dir;
604    use futures::future::join_all;
605    use futures_util::future::BoxFuture;
606    use futures_util::FutureExt;
607    use object_store::{EntryMode, ObjectStore};
608    use tokio::sync::mpsc;
609    use tokio::sync::watch::Receiver;
610
611    use super::*;
612    use crate::local::{test_util, DynamicKeyLockGuard};
613    use crate::procedure::PoisonKeys;
614    use crate::store::proc_path;
615    use crate::test_util::InMemoryPoisonStore;
616    use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
617
618    const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
619
620    fn new_runner(
621        meta: ProcedureMetaRef,
622        procedure: BoxedProcedure,
623        store: Arc<ProcedureStore>,
624    ) -> Runner {
625        Runner {
626            meta,
627            procedure,
628            manager_ctx: Arc::new(ManagerContext::new(
629                Arc::new(InMemoryPoisonStore::default()),
630            )),
631            step: 0,
632            exponential_builder: ExponentialBuilder::default(),
633            store,
634            rolling_back: false,
635            event_recorder: None,
636        }
637    }
638
639    async fn check_files(
640        object_store: &ObjectStore,
641        procedure_store: &ProcedureStore,
642        procedure_id: ProcedureId,
643        files: &[&str],
644    ) {
645        let dir = proc_path!(procedure_store, "{procedure_id}/");
646        let lister = object_store.list(&dir).await.unwrap();
647        let mut files_in_dir: Vec<_> = lister
648            .into_iter()
649            .filter(|x| x.metadata().mode() == EntryMode::FILE)
650            .map(|de| de.name().to_string())
651            .collect();
652        files_in_dir.sort_unstable();
653        assert_eq!(files, files_in_dir);
654    }
655
656    fn context_with_provider(
657        procedure_id: ProcedureId,
658        provider: Arc<dyn ContextProvider>,
659    ) -> Context {
660        Context {
661            procedure_id,
662            provider,
663        }
664    }
665
666    fn context_without_provider(procedure_id: ProcedureId) -> Context {
667        struct MockProvider;
668
669        #[async_trait]
670        impl ContextProvider for MockProvider {
671            async fn procedure_state(
672                &self,
673                _procedure_id: ProcedureId,
674            ) -> Result<Option<ProcedureState>> {
675                unimplemented!()
676            }
677
678            async fn procedure_state_receiver(
679                &self,
680                _procedure_id: ProcedureId,
681            ) -> Result<Option<Receiver<ProcedureState>>> {
682                unimplemented!()
683            }
684
685            async fn try_put_poison(
686                &self,
687                _key: &PoisonKey,
688                _procedure_id: ProcedureId,
689            ) -> Result<()> {
690                unimplemented!()
691            }
692
693            async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
694                unimplemented!()
695            }
696        }
697
698        Context {
699            procedure_id,
700            provider: Arc::new(MockProvider),
701        }
702    }
703
704    type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
705
706    struct ProcedureAdapter<F> {
707        data: String,
708        lock_key: LockKey,
709        poison_keys: PoisonKeys,
710        exec_fn: F,
711        rollback_fn: Option<RollbackFn>,
712    }
713
714    impl<F> ProcedureAdapter<F> {
715        fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
716            let mut meta = test_util::procedure_meta_for_test();
717            meta.id = ProcedureId::parse_str(uuid).unwrap();
718            meta.lock_key = self.lock_key.clone();
719            meta.poison_keys = self.poison_keys.clone();
720
721            Arc::new(meta)
722        }
723    }
724
725    #[async_trait]
726    impl<F> Procedure for ProcedureAdapter<F>
727    where
728        F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
729    {
730        fn type_name(&self) -> &str {
731            "ProcedureAdapter"
732        }
733
734        async fn execute(&mut self, ctx: &Context) -> Result<Status> {
735            let f = (self.exec_fn)(ctx.clone());
736            f.await
737        }
738
739        async fn rollback(&mut self, ctx: &Context) -> Result<()> {
740            if let Some(f) = &mut self.rollback_fn {
741                return (f)(ctx.clone()).await;
742            }
743            Ok(())
744        }
745
746        fn rollback_supported(&self) -> bool {
747            self.rollback_fn.is_some()
748        }
749
750        fn dump(&self) -> Result<String> {
751            Ok(self.data.clone())
752        }
753
754        fn lock_key(&self) -> LockKey {
755            self.lock_key.clone()
756        }
757
758        fn poison_keys(&self) -> PoisonKeys {
759            self.poison_keys.clone()
760        }
761    }
762
763    async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
764        let mut times = 0;
765        let exec_fn = move |_| {
766            times += 1;
767            async move {
768                if times == 1 {
769                    Ok(Status::executing(persist))
770                } else {
771                    Ok(Status::done())
772                }
773            }
774            .boxed()
775        };
776        let normal = ProcedureAdapter {
777            data: "normal".to_string(),
778            lock_key: LockKey::single_exclusive("catalog.schema.table"),
779            poison_keys: PoisonKeys::default(),
780            exec_fn,
781            rollback_fn: None,
782        };
783
784        let dir = create_temp_dir("normal");
785        let meta = normal.new_meta(ROOT_ID);
786        let ctx = context_without_provider(meta.id);
787        let object_store = test_util::new_object_store(&dir);
788        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
789        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
790        runner.manager_ctx.start();
791
792        runner.execute_once(&ctx).await;
793        let state = runner.meta.state();
794        assert!(state.is_running(), "{state:?}");
795        check_files(
796            &object_store,
797            &procedure_store,
798            ctx.procedure_id,
799            first_files,
800        )
801        .await;
802
803        runner.execute_once(&ctx).await;
804        let state = runner.meta.state();
805        assert!(state.is_done(), "{state:?}");
806        check_files(
807            &object_store,
808            &procedure_store,
809            ctx.procedure_id,
810            second_files,
811        )
812        .await;
813    }
814
815    #[tokio::test]
816    async fn test_execute_once_normal() {
817        execute_once_normal(
818            true,
819            &["0000000000.step"],
820            &["0000000000.step", "0000000001.commit"],
821        )
822        .await;
823    }
824
825    #[tokio::test]
826    async fn test_execute_once_normal_skip_persist() {
827        execute_once_normal(false, &[], &["0000000000.commit"]).await;
828    }
829
830    #[tokio::test]
831    async fn test_on_suspend_empty() {
832        let exec_fn = move |_| {
833            async move {
834                Ok(Status::Suspended {
835                    subprocedures: Vec::new(),
836                    persist: false,
837                })
838            }
839            .boxed()
840        };
841        let suspend = ProcedureAdapter {
842            data: "suspend".to_string(),
843            lock_key: LockKey::single_exclusive("catalog.schema.table"),
844            poison_keys: PoisonKeys::default(),
845            exec_fn,
846            rollback_fn: None,
847        };
848
849        let dir = create_temp_dir("suspend");
850        let meta = suspend.new_meta(ROOT_ID);
851        let ctx = context_without_provider(meta.id);
852        let object_store = test_util::new_object_store(&dir);
853        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
854        let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
855        runner.manager_ctx.start();
856
857        runner.execute_once(&ctx).await;
858        let state = runner.meta.state();
859        assert!(state.is_running(), "{state:?}");
860    }
861
862    fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
863        let mut times = 0;
864        let exec_fn = move |_| {
865            times += 1;
866            async move {
867                if times == 1 {
868                    time::sleep(Duration::from_millis(200)).await;
869                    Ok(Status::executing(true))
870                } else {
871                    Ok(Status::done())
872                }
873            }
874            .boxed()
875        };
876        let child = ProcedureAdapter {
877            data: "child".to_string(),
878            lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
879            poison_keys: PoisonKeys::default(),
880            exec_fn,
881            rollback_fn: None,
882        };
883
884        ProcedureWithId {
885            id: procedure_id,
886            procedure: Box::new(child),
887        }
888    }
889
890    #[tokio::test]
891    async fn test_on_suspend_by_subprocedures() {
892        let mut times = 0;
893        let children_ids = [ProcedureId::random(), ProcedureId::random()];
894        let keys = [
895            &[
896                "catalog.schema.table.region-0",
897                "catalog.schema.table.region-1",
898            ],
899            &[
900                "catalog.schema.table.region-2",
901                "catalog.schema.table.region-3",
902            ],
903        ];
904
905        let exec_fn = move |ctx: Context| {
906            times += 1;
907            async move {
908                if times == 1 {
909                    // Submit subprocedures.
910                    Ok(Status::Suspended {
911                        subprocedures: children_ids
912                            .into_iter()
913                            .zip(keys)
914                            .map(|(id, key_slice)| new_child_procedure(id, key_slice))
915                            .collect(),
916                        persist: true,
917                    })
918                } else {
919                    // Wait for subprocedures.
920                    let mut all_child_done = true;
921                    for id in children_ids {
922                        let is_not_done = ctx
923                            .provider
924                            .procedure_state(id)
925                            .await
926                            .unwrap()
927                            .map(|s| !s.is_done())
928                            .unwrap_or(true);
929                        if is_not_done {
930                            all_child_done = false;
931                        }
932                    }
933                    if all_child_done {
934                        Ok(Status::done())
935                    } else {
936                        // Return suspended to wait for notify.
937                        Ok(Status::Suspended {
938                            subprocedures: Vec::new(),
939                            persist: false,
940                        })
941                    }
942                }
943            }
944            .boxed()
945        };
946        let parent = ProcedureAdapter {
947            data: "parent".to_string(),
948            lock_key: LockKey::single_exclusive("catalog.schema.table"),
949            poison_keys: PoisonKeys::default(),
950            exec_fn,
951            rollback_fn: None,
952        };
953
954        let dir = create_temp_dir("parent");
955        let meta = parent.new_meta(ROOT_ID);
956        let procedure_id = meta.id;
957
958        let object_store = test_util::new_object_store(&dir);
959        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
960        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
961        let poison_manager = Arc::new(InMemoryPoisonStore::default());
962        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
963        manager_ctx.start();
964        // Manually add this procedure to the manager ctx.
965        assert!(manager_ctx.try_insert_procedure(meta));
966        // Replace the manager ctx.
967        runner.manager_ctx = manager_ctx.clone();
968
969        runner.run().await;
970        assert!(manager_ctx.key_lock.is_empty());
971
972        // Check child procedures.
973        for child_id in children_ids {
974            let state = manager_ctx.state(child_id).unwrap();
975            assert!(state.is_done(), "{state:?}");
976        }
977        let state = manager_ctx.state(procedure_id).unwrap();
978        assert!(state.is_done(), "{state:?}");
979        // Files are removed.
980        check_files(&object_store, &procedure_store, procedure_id, &[]).await;
981
982        tokio::time::sleep(Duration::from_millis(5)).await;
983        // Clean outdated meta.
984        manager_ctx.remove_outdated_meta(Duration::from_millis(1));
985        assert!(manager_ctx.state(procedure_id).is_none());
986        assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
987        for child_id in children_ids {
988            assert!(manager_ctx.state(child_id).is_none());
989        }
990    }
991
992    #[tokio::test]
993    async fn test_running_is_stopped() {
994        let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
995        let normal = ProcedureAdapter {
996            data: "normal".to_string(),
997            lock_key: LockKey::single_exclusive("catalog.schema.table"),
998            poison_keys: PoisonKeys::default(),
999            exec_fn,
1000            rollback_fn: None,
1001        };
1002
1003        let dir = create_temp_dir("test_running_is_stopped");
1004        let meta = normal.new_meta(ROOT_ID);
1005        let ctx = context_without_provider(meta.id);
1006        let object_store = test_util::new_object_store(&dir);
1007        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1008        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1009        runner.manager_ctx.start();
1010
1011        runner.execute_once(&ctx).await;
1012        let state = runner.meta.state();
1013        assert!(state.is_running(), "{state:?}");
1014        check_files(
1015            &object_store,
1016            &procedure_store,
1017            ctx.procedure_id,
1018            &["0000000000.step"],
1019        )
1020        .await;
1021
1022        runner.manager_ctx.stop();
1023        runner.execute_once(&ctx).await;
1024        let state = runner.meta.state();
1025        assert!(state.is_failed(), "{state:?}");
1026        // Shouldn't write any files
1027        check_files(
1028            &object_store,
1029            &procedure_store,
1030            ctx.procedure_id,
1031            &["0000000000.step"],
1032        )
1033        .await;
1034    }
1035
1036    #[tokio::test]
1037    async fn test_running_is_stopped_on_error() {
1038        let exec_fn =
1039            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1040        let normal = ProcedureAdapter {
1041            data: "fail".to_string(),
1042            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1043            poison_keys: PoisonKeys::default(),
1044            exec_fn,
1045            rollback_fn: None,
1046        };
1047
1048        let dir = create_temp_dir("test_running_is_stopped_on_error");
1049        let meta = normal.new_meta(ROOT_ID);
1050        let ctx = context_without_provider(meta.id);
1051        let object_store = test_util::new_object_store(&dir);
1052        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1053        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1054        runner.manager_ctx.stop();
1055
1056        runner.execute_once(&ctx).await;
1057        let state = runner.meta.state();
1058        assert!(state.is_failed(), "{state:?}");
1059        // Shouldn't write any files
1060        check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1061    }
1062
1063    #[tokio::test]
1064    async fn test_execute_on_error() {
1065        let exec_fn =
1066            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1067        let fail = ProcedureAdapter {
1068            data: "fail".to_string(),
1069            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1070            poison_keys: PoisonKeys::default(),
1071            exec_fn,
1072            rollback_fn: None,
1073        };
1074
1075        let dir = create_temp_dir("fail");
1076        let meta = fail.new_meta(ROOT_ID);
1077        let ctx = context_without_provider(meta.id);
1078        let object_store = test_util::new_object_store(&dir);
1079        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1080        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1081        runner.manager_ctx.start();
1082
1083        runner.execute_once(&ctx).await;
1084        let state = runner.meta.state();
1085        assert!(state.is_prepare_rollback(), "{state:?}");
1086
1087        runner.execute_once(&ctx).await;
1088        let state = runner.meta.state();
1089        assert!(state.is_failed(), "{state:?}");
1090        check_files(
1091            &object_store,
1092            &procedure_store,
1093            ctx.procedure_id,
1094            &["0000000000.rollback"],
1095        )
1096        .await;
1097    }
1098
1099    #[tokio::test]
1100    async fn test_execute_with_rollback_on_error() {
1101        let exec_fn =
1102            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1103        let rollback_fn = move |_| async move { Ok(()) }.boxed();
1104        let fail = ProcedureAdapter {
1105            data: "fail".to_string(),
1106            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1107            poison_keys: PoisonKeys::default(),
1108            exec_fn,
1109            rollback_fn: Some(Box::new(rollback_fn)),
1110        };
1111
1112        let dir = create_temp_dir("fail");
1113        let meta = fail.new_meta(ROOT_ID);
1114        let ctx = context_without_provider(meta.id);
1115        let object_store = test_util::new_object_store(&dir);
1116        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1117        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1118        runner.manager_ctx.start();
1119
1120        runner.execute_once(&ctx).await;
1121        let state = runner.meta.state();
1122        assert!(state.is_prepare_rollback(), "{state:?}");
1123
1124        runner.execute_once(&ctx).await;
1125        let state = runner.meta.state();
1126        assert!(state.is_rolling_back(), "{state:?}");
1127
1128        runner.execute_once(&ctx).await;
1129        let state = runner.meta.state();
1130        assert!(state.is_failed(), "{state:?}");
1131        check_files(
1132            &object_store,
1133            &procedure_store,
1134            ctx.procedure_id,
1135            &["0000000000.rollback"],
1136        )
1137        .await;
1138    }
1139
1140    #[tokio::test]
1141    async fn test_execute_on_retry_later_error() {
1142        let mut times = 0;
1143
1144        let exec_fn = move |_| {
1145            times += 1;
1146            async move {
1147                if times == 1 {
1148                    Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1149                } else {
1150                    Ok(Status::done())
1151                }
1152            }
1153            .boxed()
1154        };
1155
1156        let retry_later = ProcedureAdapter {
1157            data: "retry_later".to_string(),
1158            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1159            poison_keys: PoisonKeys::default(),
1160            exec_fn,
1161            rollback_fn: None,
1162        };
1163
1164        let dir = create_temp_dir("retry_later");
1165        let meta = retry_later.new_meta(ROOT_ID);
1166        let ctx = context_without_provider(meta.id);
1167        let object_store = test_util::new_object_store(&dir);
1168        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1169        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1170        runner.manager_ctx.start();
1171        runner.execute_once(&ctx).await;
1172        let state = runner.meta.state();
1173        assert!(state.is_retrying(), "{state:?}");
1174
1175        runner.execute_once(&ctx).await;
1176        let state = runner.meta.state();
1177        assert!(state.is_done(), "{state:?}");
1178        assert!(meta.state().is_done());
1179        check_files(
1180            &object_store,
1181            &procedure_store,
1182            ctx.procedure_id,
1183            &["0000000000.commit"],
1184        )
1185        .await;
1186    }
1187
1188    #[tokio::test]
1189    async fn test_execute_exceed_max_retry_later() {
1190        let exec_fn =
1191            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1192
1193        let exceed_max_retry_later = ProcedureAdapter {
1194            data: "exceed_max_retry_later".to_string(),
1195            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1196            poison_keys: PoisonKeys::default(),
1197            exec_fn,
1198            rollback_fn: None,
1199        };
1200
1201        let dir = create_temp_dir("exceed_max_retry_later");
1202        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1203        let object_store = test_util::new_object_store(&dir);
1204        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1205        let mut runner = new_runner(
1206            meta.clone(),
1207            Box::new(exceed_max_retry_later),
1208            procedure_store,
1209        );
1210        runner.manager_ctx.start();
1211
1212        runner.exponential_builder = ExponentialBuilder::default()
1213            .with_min_delay(Duration::from_millis(1))
1214            .with_max_times(3);
1215
1216        // Run the runner and execute the procedure.
1217        runner.execute_procedure_in_loop().await;
1218        let err = meta.state().error().unwrap().to_string();
1219        assert!(err.contains("Procedure retry exceeded max times"));
1220    }
1221
1222    #[tokio::test]
1223    async fn test_rollback_exceed_max_retry_later() {
1224        let exec_fn =
1225            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1226        let rollback_fn = move |_| {
1227            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1228        };
1229        let exceed_max_retry_later = ProcedureAdapter {
1230            data: "exceed_max_rollback".to_string(),
1231            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1232            poison_keys: PoisonKeys::default(),
1233            exec_fn,
1234            rollback_fn: Some(Box::new(rollback_fn)),
1235        };
1236
1237        let dir = create_temp_dir("exceed_max_rollback");
1238        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1239        let object_store = test_util::new_object_store(&dir);
1240        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1241        let mut runner = new_runner(
1242            meta.clone(),
1243            Box::new(exceed_max_retry_later),
1244            procedure_store,
1245        );
1246        runner.manager_ctx.start();
1247        runner.exponential_builder = ExponentialBuilder::default()
1248            .with_min_delay(Duration::from_millis(1))
1249            .with_max_times(3);
1250
1251        // Run the runner and execute the procedure.
1252        runner.execute_procedure_in_loop().await;
1253        let err = meta.state().error().unwrap().to_string();
1254        assert!(err.contains("Procedure rollback exceeded max times"));
1255    }
1256
1257    #[tokio::test]
1258    async fn test_rollback_after_retry_fail() {
1259        let exec_fn = move |_| {
1260            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1261        };
1262
1263        let (tx, mut rx) = mpsc::channel(1);
1264        let rollback_fn = move |_| {
1265            let tx = tx.clone();
1266            async move {
1267                tx.send(()).await.unwrap();
1268                Ok(())
1269            }
1270            .boxed()
1271        };
1272        let retry_later = ProcedureAdapter {
1273            data: "rollback_after_retry_fail".to_string(),
1274            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1275            poison_keys: PoisonKeys::default(),
1276            exec_fn,
1277            rollback_fn: Some(Box::new(rollback_fn)),
1278        };
1279
1280        let dir = create_temp_dir("retry_later");
1281        let meta = retry_later.new_meta(ROOT_ID);
1282        let ctx = context_without_provider(meta.id);
1283        let object_store = test_util::new_object_store(&dir);
1284        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1285        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1286        runner.manager_ctx.start();
1287        runner.exponential_builder = ExponentialBuilder::default()
1288            .with_min_delay(Duration::from_millis(1))
1289            .with_max_times(3);
1290        // Run the runner and execute the procedure.
1291        runner.execute_procedure_in_loop().await;
1292        rx.recv().await.unwrap();
1293        assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1294        check_files(
1295            &object_store,
1296            &procedure_store,
1297            ctx.procedure_id,
1298            &["0000000000.rollback"],
1299        )
1300        .await;
1301    }
1302
1303    #[tokio::test]
1304    async fn test_child_error() {
1305        let mut times = 0;
1306        let child_id = ProcedureId::random();
1307
1308        let exec_fn = move |ctx: Context| {
1309            times += 1;
1310            async move {
1311                if times == 1 {
1312                    // Submit subprocedures.
1313                    let exec_fn = |_| {
1314                        async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1315                            .boxed()
1316                    };
1317                    let fail = ProcedureAdapter {
1318                        data: "fail".to_string(),
1319                        lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1320                        poison_keys: PoisonKeys::default(),
1321                        exec_fn,
1322                        rollback_fn: None,
1323                    };
1324
1325                    Ok(Status::Suspended {
1326                        subprocedures: vec![ProcedureWithId {
1327                            id: child_id,
1328                            procedure: Box::new(fail),
1329                        }],
1330                        persist: true,
1331                    })
1332                } else {
1333                    // Wait for subprocedures.
1334                    let state = ctx.provider.procedure_state(child_id).await.unwrap();
1335                    let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1336                    if is_failed {
1337                        // The parent procedure to abort itself if child procedure is failed.
1338                        Err(Error::from_error_ext(PlainError::new(
1339                            "subprocedure failed".to_string(),
1340                            StatusCode::Unexpected,
1341                        )))
1342                    } else {
1343                        // Return suspended to wait for notify.
1344                        Ok(Status::Suspended {
1345                            subprocedures: Vec::new(),
1346                            persist: false,
1347                        })
1348                    }
1349                }
1350            }
1351            .boxed()
1352        };
1353        let parent = ProcedureAdapter {
1354            data: "parent".to_string(),
1355            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1356            poison_keys: PoisonKeys::default(),
1357            exec_fn,
1358            rollback_fn: None,
1359        };
1360
1361        let dir = create_temp_dir("child_err");
1362        let meta = parent.new_meta(ROOT_ID);
1363
1364        let object_store = test_util::new_object_store(&dir);
1365        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1366        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1367        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1368        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1369        manager_ctx.start();
1370        // Manually add this procedure to the manager ctx.
1371        assert!(manager_ctx.try_insert_procedure(meta.clone()));
1372        // Replace the manager ctx.
1373        runner.manager_ctx = manager_ctx.clone();
1374
1375        // Run the runner and execute the procedure.
1376        runner.run().await;
1377        assert!(manager_ctx.key_lock.is_empty());
1378        let err = meta.state().error().unwrap().output_msg();
1379        assert!(err.contains("subprocedure failed"), "{err}");
1380    }
1381
1382    #[tokio::test]
1383    async fn test_execute_with_clean_poisons() {
1384        common_telemetry::init_default_ut_logging();
1385        let mut times = 0;
1386        let poison_key = PoisonKey::new("table/1024");
1387        let moved_poison_key = poison_key.clone();
1388        let exec_fn = move |ctx: Context| {
1389            times += 1;
1390            let poison_key = moved_poison_key.clone();
1391            async move {
1392                if times == 1 {
1393                    // Put the poison to the context.
1394                    ctx.provider
1395                        .try_put_poison(&poison_key, ctx.procedure_id)
1396                        .await
1397                        .unwrap();
1398
1399                    Ok(Status::executing(true))
1400                } else {
1401                    Ok(Status::executing_with_clean_poisons(true))
1402                }
1403            }
1404            .boxed()
1405        };
1406        let poison = ProcedureAdapter {
1407            data: "poison".to_string(),
1408            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1409            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1410            exec_fn,
1411            rollback_fn: None,
1412        };
1413
1414        let dir = create_temp_dir("clean_poisons");
1415        let meta = poison.new_meta(ROOT_ID);
1416
1417        let object_store = test_util::new_object_store(&dir);
1418        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1419        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1420
1421        // Use the manager ctx as the context provider.
1422        let ctx = context_with_provider(
1423            meta.id,
1424            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1425        );
1426        // Manually add this procedure to the manager ctx.
1427        runner
1428            .manager_ctx
1429            .procedures
1430            .write()
1431            .unwrap()
1432            .insert(meta.id, runner.meta.clone());
1433
1434        runner.manager_ctx.start();
1435        runner.execute_once(&ctx).await;
1436        let state = runner.meta.state();
1437        assert!(state.is_running(), "{state:?}");
1438
1439        let procedure_id = runner
1440            .manager_ctx
1441            .poison_manager
1442            .get_poison(&poison_key.to_string())
1443            .await
1444            .unwrap();
1445        // poison key should be exist.
1446        assert!(procedure_id.is_some());
1447
1448        runner.execute_once(&ctx).await;
1449        let state = runner.meta.state();
1450        assert!(state.is_running(), "{state:?}");
1451
1452        let procedure_id = runner
1453            .manager_ctx
1454            .poison_manager
1455            .get_poison(&poison_key.to_string())
1456            .await
1457            .unwrap();
1458        // poison key should be deleted.
1459        assert!(procedure_id.is_none());
1460    }
1461
1462    #[tokio::test]
1463    async fn test_execute_error_with_clean_poisons() {
1464        common_telemetry::init_default_ut_logging();
1465        let mut times = 0;
1466        let poison_key = PoisonKey::new("table/1024");
1467        let moved_poison_key = poison_key.clone();
1468        let exec_fn = move |ctx: Context| {
1469            times += 1;
1470            let poison_key = moved_poison_key.clone();
1471            async move {
1472                if times == 1 {
1473                    // Put the poison to the context.
1474                    ctx.provider
1475                        .try_put_poison(&poison_key, ctx.procedure_id)
1476                        .await
1477                        .unwrap();
1478
1479                    Ok(Status::executing(true))
1480                } else {
1481                    Err(Error::external_and_clean_poisons(MockError::new(
1482                        StatusCode::Unexpected,
1483                    )))
1484                }
1485            }
1486            .boxed()
1487        };
1488        let poison = ProcedureAdapter {
1489            data: "poison".to_string(),
1490            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1491            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1492            exec_fn,
1493            rollback_fn: None,
1494        };
1495
1496        let dir = create_temp_dir("error_with_clean_poisons");
1497        let meta = poison.new_meta(ROOT_ID);
1498
1499        let object_store = test_util::new_object_store(&dir);
1500        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1501        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1502
1503        // Use the manager ctx as the context provider.
1504        let ctx = context_with_provider(
1505            meta.id,
1506            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1507        );
1508        // Manually add this procedure to the manager ctx.
1509        runner
1510            .manager_ctx
1511            .procedures
1512            .write()
1513            .unwrap()
1514            .insert(meta.id, runner.meta.clone());
1515
1516        runner.manager_ctx.start();
1517        runner.execute_once(&ctx).await;
1518        let state = runner.meta.state();
1519        assert!(state.is_running(), "{state:?}");
1520
1521        let procedure_id = runner
1522            .manager_ctx
1523            .poison_manager
1524            .get_poison(&poison_key.to_string())
1525            .await
1526            .unwrap();
1527        // poison key should be exist.
1528        assert!(procedure_id.is_some());
1529
1530        runner.execute_once(&ctx).await;
1531        let state = runner.meta.state();
1532        assert!(state.is_prepare_rollback(), "{state:?}");
1533
1534        let procedure_id = runner
1535            .manager_ctx
1536            .poison_manager
1537            .get_poison(&poison_key.to_string())
1538            .await
1539            .unwrap();
1540        // poison key should be deleted.
1541        assert!(procedure_id.is_none());
1542    }
1543
1544    #[tokio::test]
1545    async fn test_execute_failed_after_set_poison() {
1546        let mut times = 0;
1547        let poison_key = PoisonKey::new("table/1024");
1548        let moved_poison_key = poison_key.clone();
1549        let exec_fn = move |ctx: Context| {
1550            times += 1;
1551            let poison_key = moved_poison_key.clone();
1552            async move {
1553                if times == 1 {
1554                    Ok(Status::executing(true))
1555                } else {
1556                    // Put the poison to the context.
1557                    ctx.provider
1558                        .try_put_poison(&poison_key, ctx.procedure_id)
1559                        .await
1560                        .unwrap();
1561                    Err(Error::external(MockError::new(StatusCode::Unexpected)))
1562                }
1563            }
1564            .boxed()
1565        };
1566        let poison = ProcedureAdapter {
1567            data: "poison".to_string(),
1568            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1569            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1570            exec_fn,
1571            rollback_fn: None,
1572        };
1573
1574        let dir = create_temp_dir("poison");
1575        let meta = poison.new_meta(ROOT_ID);
1576
1577        let object_store = test_util::new_object_store(&dir);
1578        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1579        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1580
1581        // Use the manager ctx as the context provider.
1582        let ctx = context_with_provider(
1583            meta.id,
1584            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1585        );
1586        // Manually add this procedure to the manager ctx.
1587        runner
1588            .manager_ctx
1589            .procedures
1590            .write()
1591            .unwrap()
1592            .insert(meta.id, runner.meta.clone());
1593
1594        runner.manager_ctx.start();
1595        runner.execute_once(&ctx).await;
1596        let state = runner.meta.state();
1597        assert!(state.is_running(), "{state:?}");
1598
1599        runner.execute_once(&ctx).await;
1600        let state = runner.meta.state();
1601        assert!(state.is_prepare_rollback(), "{state:?}");
1602        assert!(meta.state().is_prepare_rollback());
1603
1604        runner.execute_once(&ctx).await;
1605        let state = runner.meta.state();
1606        assert!(state.is_failed(), "{state:?}");
1607        assert!(meta.state().is_failed());
1608
1609        // Check the poison is set.
1610        let procedure_id = runner
1611            .manager_ctx
1612            .poison_manager
1613            .get_poison(&poison_key.to_string())
1614            .await
1615            .unwrap()
1616            .unwrap();
1617
1618        // If the procedure is poisoned, the poison key shouldn't be deleted.
1619        assert_eq!(&procedure_id.to_string(), ROOT_ID);
1620    }
1621
1622    #[tokio::test]
1623    async fn test_execute_exceed_max_retry_after_set_poison() {
1624        common_telemetry::init_default_ut_logging();
1625        let mut times = 0;
1626        let poison_key = PoisonKey::new("table/1024");
1627        let moved_poison_key = poison_key.clone();
1628        let exec_fn = move |ctx: Context| {
1629            times += 1;
1630            let poison_key = moved_poison_key.clone();
1631            async move {
1632                if times == 1 {
1633                    Ok(Status::executing(true))
1634                } else {
1635                    // Put the poison to the context.
1636                    ctx.provider
1637                        .try_put_poison(&poison_key, ctx.procedure_id)
1638                        .await
1639                        .unwrap();
1640                    Err(Error::retry_later_and_clean_poisons(MockError::new(
1641                        StatusCode::Unexpected,
1642                    )))
1643                }
1644            }
1645            .boxed()
1646        };
1647        let poison = ProcedureAdapter {
1648            data: "poison".to_string(),
1649            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1650            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1651            exec_fn,
1652            rollback_fn: None,
1653        };
1654
1655        let dir = create_temp_dir("exceed_max_after_set_poison");
1656        let meta = poison.new_meta(ROOT_ID);
1657        let object_store = test_util::new_object_store(&dir);
1658        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1659        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
1660        runner.manager_ctx.start();
1661        runner.exponential_builder = ExponentialBuilder::default()
1662            .with_min_delay(Duration::from_millis(1))
1663            .with_max_times(3);
1664        // Use the manager ctx as the context provider.
1665        let ctx = context_with_provider(
1666            meta.id,
1667            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1668        );
1669        // Manually add this procedure to the manager ctx.
1670        runner
1671            .manager_ctx
1672            .procedures
1673            .write()
1674            .unwrap()
1675            .insert(meta.id, runner.meta.clone());
1676        // Run the runner and execute the procedure.
1677        runner.execute_once_with_retry(&ctx).await;
1678        let err = meta.state().error().unwrap().clone();
1679        assert_matches!(&*err, Error::RetryTimesExceeded { .. });
1680
1681        // Check the poison is deleted.
1682        let procedure_id = runner
1683            .manager_ctx
1684            .poison_manager
1685            .get_poison(&poison_key.to_string())
1686            .await
1687            .unwrap();
1688        assert_eq!(procedure_id, None);
1689    }
1690
1691    #[tokio::test]
1692    async fn test_execute_poisoned() {
1693        let mut times = 0;
1694        let poison_key = PoisonKey::new("table/1024");
1695        let moved_poison_key = poison_key.clone();
1696        let exec_fn = move |ctx: Context| {
1697            times += 1;
1698            let poison_key = moved_poison_key.clone();
1699            async move {
1700                if times == 1 {
1701                    Ok(Status::executing(true))
1702                } else {
1703                    // Put the poison to the context.
1704                    ctx.provider
1705                        .try_put_poison(&poison_key, ctx.procedure_id)
1706                        .await
1707                        .unwrap();
1708                    Ok(Status::Poisoned {
1709                        keys: PoisonKeys::new(vec![poison_key.clone()]),
1710                        error: Error::external(MockError::new(StatusCode::Unexpected)),
1711                    })
1712                }
1713            }
1714            .boxed()
1715        };
1716        let poison = ProcedureAdapter {
1717            data: "poison".to_string(),
1718            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1719            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1720            exec_fn,
1721            rollback_fn: None,
1722        };
1723
1724        let dir = create_temp_dir("poison");
1725        let meta = poison.new_meta(ROOT_ID);
1726
1727        let object_store = test_util::new_object_store(&dir);
1728        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1729        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1730
1731        // Use the manager ctx as the context provider.
1732        let ctx = context_with_provider(
1733            meta.id,
1734            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1735        );
1736        // Manually add this procedure to the manager ctx.
1737        runner
1738            .manager_ctx
1739            .procedures
1740            .write()
1741            .unwrap()
1742            .insert(meta.id, runner.meta.clone());
1743
1744        runner.manager_ctx.start();
1745        runner.execute_once(&ctx).await;
1746        let state = runner.meta.state();
1747        assert!(state.is_running(), "{state:?}");
1748
1749        runner.execute_once(&ctx).await;
1750        let state = runner.meta.state();
1751        assert!(state.is_poisoned(), "{state:?}");
1752        assert!(meta.state().is_poisoned());
1753        check_files(
1754            &object_store,
1755            &procedure_store,
1756            ctx.procedure_id,
1757            &["0000000000.step"],
1758        )
1759        .await;
1760
1761        // Check the poison is set.
1762        let procedure_id = runner
1763            .manager_ctx
1764            .poison_manager
1765            .get_poison(&poison_key.to_string())
1766            .await
1767            .unwrap()
1768            .unwrap();
1769
1770        // If the procedure is poisoned, the poison key shouldn't be deleted.
1771        assert_eq!(procedure_id, ROOT_ID);
1772    }
1773
1774    fn test_procedure_with_dynamic_lock(
1775        shared_atomic_value: Arc<AtomicU64>,
1776        id: u64,
1777    ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1778        let exec_fn = move |ctx: Context| {
1779            let moved_shared_atomic_value = shared_atomic_value.clone();
1780            let moved_ctx = ctx.clone();
1781            async move {
1782                debug!("Acquiring write lock, id: {}", id);
1783                let key = StringKey::Exclusive("test_lock".to_string());
1784                let guard = moved_ctx.provider.acquire_lock(&key).await;
1785                debug!("Acquired write lock, id: {}", id);
1786                let millis = rand::rng().random_range(10..=50);
1787                tokio::time::sleep(Duration::from_millis(millis)).await;
1788                let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1789                moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1790                debug!("Dropping write lock, id: {}", id);
1791                drop(guard);
1792
1793                Ok(Status::done())
1794            }
1795            .boxed()
1796        };
1797
1798        let adapter = ProcedureAdapter {
1799            data: "dynamic_lock".to_string(),
1800            lock_key: LockKey::new_exclusive([]),
1801            poison_keys: PoisonKeys::new([]),
1802            exec_fn,
1803            rollback_fn: None,
1804        };
1805        let meta = adapter.new_meta(ROOT_ID);
1806
1807        (Box::new(adapter), meta)
1808    }
1809
1810    #[tokio::test(flavor = "multi_thread")]
1811    async fn test_execute_with_dynamic_lock() {
1812        common_telemetry::init_default_ut_logging();
1813        let shared_atomic_value = Arc::new(AtomicU64::new(0));
1814        let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1815        let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1816
1817        let dir = create_temp_dir("dynamic_lock");
1818        let object_store = test_util::new_object_store(&dir);
1819        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1820        let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
1821        let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
1822        let ctx1 = context_with_provider(
1823            meta1.id,
1824            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1825        );
1826        let ctx2 = context_with_provider(
1827            meta2.id,
1828            // use same manager ctx as runner1
1829            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1830        );
1831        let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
1832        join_all(tasks).await;
1833        assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
1834    }
1835}