common_procedure/local/
runner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_telemetry::{debug, error, info};
21use rand::Rng;
22use snafu::ResultExt;
23use tokio::time;
24
25use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
26use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
27use crate::procedure::{Output, StringKey};
28use crate::rwlock::OwnedKeyRwLockGuard;
29use crate::store::{ProcedureMessage, ProcedureStore};
30use crate::{
31    BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
32};
33
34/// A guard to cleanup procedure state.
35struct ProcedureGuard {
36    meta: ProcedureMetaRef,
37    manager_ctx: Arc<ManagerContext>,
38    key_guards: Vec<OwnedKeyRwLockGuard>,
39    finish: bool,
40}
41
42impl ProcedureGuard {
43    /// Returns a new [ProcedureGuard].
44    fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
45        ProcedureGuard {
46            meta,
47            manager_ctx,
48            key_guards: vec![],
49            finish: false,
50        }
51    }
52
53    /// The procedure is finished successfully.
54    fn finish(mut self) {
55        self.finish = true;
56    }
57}
58
59impl Drop for ProcedureGuard {
60    fn drop(&mut self) {
61        if !self.finish {
62            error!("Procedure {} exits unexpectedly", self.meta.id);
63
64            // Set state to failed. This is useful in test as runtime may not abort when the runner task panics.
65            // See https://github.com/tokio-rs/tokio/issues/2002 .
66            // We set set_panic_hook() in the application's main function. But our tests don't have this panic hook.
67            let err = ProcedurePanicSnafu {
68                procedure_id: self.meta.id,
69            }
70            .build();
71            self.meta.set_state(ProcedureState::failed(Arc::new(err)));
72        }
73
74        // Notify parent procedure.
75        if let Some(parent_id) = self.meta.parent_id {
76            self.manager_ctx.notify_by_subprocedure(parent_id);
77        }
78
79        // Drops the key guards in the reverse order.
80        while !self.key_guards.is_empty() {
81            self.key_guards.pop();
82        }
83
84        // Clean the staled locks.
85        self.manager_ctx
86            .key_lock
87            .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
88    }
89}
90
91pub(crate) struct Runner {
92    pub(crate) meta: ProcedureMetaRef,
93    pub(crate) procedure: BoxedProcedure,
94    pub(crate) manager_ctx: Arc<ManagerContext>,
95    pub(crate) step: u32,
96    pub(crate) exponential_builder: ExponentialBuilder,
97    pub(crate) store: Arc<ProcedureStore>,
98    pub(crate) rolling_back: bool,
99}
100
101impl Runner {
102    /// Return `ProcedureManager` is running.
103    pub(crate) fn running(&self) -> bool {
104        self.manager_ctx.running()
105    }
106
107    /// Run the procedure.
108    pub(crate) async fn run(mut self) {
109        // Ensure we can update the procedure state.
110        let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
111
112        info!(
113            "Runner {}-{} starts",
114            self.procedure.type_name(),
115            self.meta.id
116        );
117
118        // TODO(yingwen): Detect recursive locking (and deadlock) if possible. Maybe we could detect
119        // recursive locking by adding a root procedure id to the meta.
120        for key in self.meta.lock_key.keys_to_lock() {
121            // Acquire lock for each key.
122            let key_guard = match key {
123                StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
124                StringKey::Exclusive(key) => {
125                    self.manager_ctx.key_lock.write(key.clone()).await.into()
126                }
127            };
128
129            guard.key_guards.push(key_guard);
130        }
131
132        // Execute the procedure. We need to release the lock whenever the execution
133        // is successful or fail.
134        self.meta.set_start_time_ms();
135        self.execute_procedure_in_loop().await;
136        self.meta.set_end_time_ms();
137
138        // We can't remove the metadata of the procedure now as users and its parent might
139        // need to query its state.
140        // TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store
141        // so we don't need to always store the metadata in memory after the procedure is done.
142
143        // Release locks and notify parent procedure.
144        guard.finish();
145
146        // If this is the root procedure, clean up message cache.
147        if self.meta.parent_id.is_none() {
148            let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
149            // Clean resources.
150            self.manager_ctx.on_procedures_finish(&procedure_ids);
151
152            // If `ProcedureManager` is stopped, it stops the current task immediately without deleting the procedure.
153            if !self.running() {
154                return;
155            }
156
157            for id in procedure_ids {
158                if let Err(e) = self.store.delete_procedure(id).await {
159                    error!(
160                        e;
161                        "Runner {}-{} failed to delete procedure {}",
162                        self.procedure.type_name(),
163                        self.meta.id,
164                        id,
165                    );
166                }
167            }
168        }
169
170        info!(
171            "Runner {}-{} exits",
172            self.procedure.type_name(),
173            self.meta.id
174        );
175    }
176
177    async fn execute_procedure_in_loop(&mut self) {
178        let ctx = Context {
179            procedure_id: self.meta.id,
180            provider: self.manager_ctx.clone(),
181        };
182
183        self.rolling_back = false;
184        self.execute_once_with_retry(&ctx).await;
185    }
186
187    async fn execute_once_with_retry(&mut self, ctx: &Context) {
188        let mut retry = self.exponential_builder.build();
189        let mut retry_times = 0;
190
191        let mut rollback = self.exponential_builder.build();
192        let mut rollback_times = 0;
193
194        loop {
195            // Don't store state if `ProcedureManager` is stopped.
196            if !self.running() {
197                self.meta.set_state(ProcedureState::failed(Arc::new(
198                    error::ManagerNotStartSnafu {}.build(),
199                )));
200                return;
201            }
202            let state = self.meta.state();
203            match state {
204                ProcedureState::Running => {}
205                ProcedureState::Retrying { error } => {
206                    retry_times += 1;
207                    if let Some(d) = retry.next() {
208                        let millis = d.as_millis() as u64;
209                        // Add random noise to the retry delay to avoid retry storms.
210                        let noise = rand::rng().random_range(0..(millis / 4) + 1);
211                        let d = d.add(Duration::from_millis(noise));
212
213                        self.wait_on_err(d, retry_times).await;
214                    } else {
215                        self.meta
216                            .set_state(ProcedureState::prepare_rollback(Arc::new(
217                                Error::RetryTimesExceeded {
218                                    source: error.clone(),
219                                    procedure_id: self.meta.id,
220                                },
221                            )));
222                    }
223                }
224                ProcedureState::PrepareRollback { error }
225                | ProcedureState::RollingBack { error } => {
226                    rollback_times += 1;
227                    if let Some(d) = rollback.next() {
228                        self.wait_on_err(d, rollback_times).await;
229                    } else {
230                        let err = Err::<(), Arc<Error>>(error)
231                            .context(RollbackTimesExceededSnafu {
232                                procedure_id: self.meta.id,
233                            })
234                            .unwrap_err();
235                        self.meta.set_state(ProcedureState::failed(Arc::new(err)));
236                        return;
237                    }
238                }
239                ProcedureState::Done { .. } => return,
240                ProcedureState::Failed { .. } => return,
241                ProcedureState::Poisoned { .. } => return,
242            }
243            self.execute_once(ctx).await;
244        }
245    }
246
247    async fn clean_poisons(&mut self) -> Result<()> {
248        let mut error = None;
249        for key in self.meta.poison_keys.iter() {
250            let key = key.to_string();
251            if let Err(e) = self
252                .manager_ctx
253                .poison_manager
254                .delete_poison(key, self.meta.id.to_string())
255                .await
256            {
257                error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
258                error = Some(e);
259            }
260        }
261
262        // returns the last error if any.
263        if let Some(e) = error {
264            return Err(e);
265        }
266        Ok(())
267    }
268
269    async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
270        if self.procedure.rollback_supported() {
271            if let Err(e) = self.procedure.rollback(ctx).await {
272                self.meta
273                    .set_state(ProcedureState::rolling_back(Arc::new(e)));
274                return;
275            }
276        }
277        self.meta.set_state(ProcedureState::failed(err));
278    }
279
280    async fn prepare_rollback(&mut self, err: Arc<Error>) {
281        if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
282            self.meta
283                .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
284            return;
285        }
286        if self.procedure.rollback_supported() {
287            self.meta.set_state(ProcedureState::rolling_back(err));
288        } else {
289            self.meta.set_state(ProcedureState::failed(err));
290        }
291    }
292
293    async fn execute_once(&mut self, ctx: &Context) {
294        match self.meta.state() {
295            ProcedureState::Running | ProcedureState::Retrying { .. } => {
296                match self.procedure.execute(ctx).await {
297                    Ok(status) => {
298                        debug!(
299                            "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
300                            self.procedure.type_name(),
301                            self.meta.id,
302                            status,
303                            status.need_persist(),
304                        );
305
306                        // Don't store state if `ProcedureManager` is stopped.
307                        if !self.running() {
308                            self.meta.set_state(ProcedureState::failed(Arc::new(
309                                error::ManagerNotStartSnafu {}.build(),
310                            )));
311                            return;
312                        }
313
314                        // Cleans poisons before persist.
315                        if status.need_clean_poisons() {
316                            if let Err(e) = self.clean_poisons().await {
317                                error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
318                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
319                                return;
320                            }
321                        }
322
323                        if status.need_persist() {
324                            if let Err(e) = self.persist_procedure().await {
325                                error!(e; "Failed to persist procedure: {}", self.meta.id);
326                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
327                                return;
328                            }
329                        }
330
331                        match status {
332                            Status::Executing { .. } => {}
333                            Status::Suspended { subprocedures, .. } => {
334                                self.on_suspended(subprocedures).await;
335                            }
336                            Status::Done { output } => {
337                                if let Err(e) = self.commit_procedure().await {
338                                    error!(e; "Failed to commit procedure: {}", self.meta.id);
339                                    self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
340                                    return;
341                                }
342
343                                self.done(output);
344                            }
345                            Status::Poisoned { error, keys } => {
346                                error!(
347                                    error;
348                                    "Procedure {}-{} is poisoned, keys: {:?}",
349                                    self.procedure.type_name(),
350                                    self.meta.id,
351                                    keys,
352                                );
353                                self.meta
354                                    .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
355                            }
356                        }
357                    }
358                    Err(e) => {
359                        error!(
360                            e;
361                            "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
362                            self.procedure.type_name(),
363                            self.meta.id,
364                            e.is_retry_later(),
365                            e.need_clean_poisons(),
366                        );
367
368                        // Don't store state if `ProcedureManager` is stopped.
369                        if !self.running() {
370                            self.meta.set_state(ProcedureState::failed(Arc::new(
371                                error::ManagerNotStartSnafu {}.build(),
372                            )));
373                            return;
374                        }
375
376                        if e.need_clean_poisons() {
377                            if let Err(e) = self.clean_poisons().await {
378                                error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
379                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
380                                return;
381                            }
382                            debug!(
383                                "Procedure {}-{} cleaned poisons",
384                                self.procedure.type_name(),
385                                self.meta.id,
386                            );
387                        }
388
389                        if e.is_retry_later() {
390                            self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
391                            return;
392                        }
393
394                        self.meta
395                            .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
396                    }
397                }
398            }
399            ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
400            ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
401            ProcedureState::Failed { .. }
402            | ProcedureState::Done { .. }
403            | ProcedureState::Poisoned { .. } => (),
404        }
405    }
406
407    /// Submit a subprocedure with specific `procedure_id`.
408    fn submit_subprocedure(
409        &self,
410        procedure_id: ProcedureId,
411        procedure_state: ProcedureState,
412        procedure: BoxedProcedure,
413    ) {
414        if self.manager_ctx.contains_procedure(procedure_id) {
415            // If the parent has already submitted this procedure, don't submit it again.
416            return;
417        }
418
419        let step = 0;
420
421        let meta = Arc::new(ProcedureMeta::new(
422            procedure_id,
423            procedure_state,
424            Some(self.meta.id),
425            procedure.lock_key(),
426            procedure.poison_keys(),
427            procedure.type_name(),
428        ));
429        let runner = Runner {
430            meta: meta.clone(),
431            procedure,
432            manager_ctx: self.manager_ctx.clone(),
433            step,
434            exponential_builder: self.exponential_builder,
435            store: self.store.clone(),
436            rolling_back: false,
437        };
438
439        // Insert the procedure. We already check the procedure existence before inserting
440        // so we add an assertion to ensure the procedure id is unique and no other procedures
441        // using the same procedure id.
442        assert!(
443            self.manager_ctx.try_insert_procedure(meta),
444            "Procedure {}-{} submit an existing procedure {}-{}",
445            self.procedure.type_name(),
446            self.meta.id,
447            runner.procedure.type_name(),
448            procedure_id,
449        );
450
451        // Add the id of the subprocedure to the metadata.
452        self.meta.push_child(procedure_id);
453
454        let _handle = common_runtime::spawn_global(async move {
455            // Run the root procedure.
456            runner.run().await
457        });
458    }
459
460    /// Extend the retry time to wait for the next retry.
461    async fn wait_on_err(&mut self, d: Duration, i: u64) {
462        info!(
463            "Procedure {}-{} retry for the {} times after {} millis",
464            self.procedure.type_name(),
465            self.meta.id,
466            i,
467            d.as_millis(),
468        );
469        time::sleep(d).await;
470    }
471
472    async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
473        let has_child = !subprocedures.is_empty();
474        for subprocedure in subprocedures {
475            info!(
476                "Procedure {}-{} submit subprocedure {}-{}",
477                self.procedure.type_name(),
478                self.meta.id,
479                subprocedure.procedure.type_name(),
480                subprocedure.id,
481            );
482
483            self.submit_subprocedure(
484                subprocedure.id,
485                ProcedureState::Running,
486                subprocedure.procedure,
487            );
488        }
489
490        info!(
491            "Procedure {}-{} is waiting for subprocedures",
492            self.procedure.type_name(),
493            self.meta.id,
494        );
495
496        // Wait for subprocedures.
497        if has_child {
498            self.meta.child_notify.notified().await;
499
500            info!(
501                "Procedure {}-{} is waked up",
502                self.procedure.type_name(),
503                self.meta.id,
504            );
505        }
506    }
507
508    async fn persist_procedure(&mut self) -> Result<()> {
509        let type_name = self.procedure.type_name().to_string();
510        let data = self.procedure.dump()?;
511
512        self.store
513            .store_procedure(
514                self.meta.id,
515                self.step,
516                type_name,
517                data,
518                self.meta.parent_id,
519            )
520            .await
521            .map_err(|e| {
522                error!(
523                    e; "Failed to persist procedure {}-{}",
524                    self.procedure.type_name(),
525                    self.meta.id
526                );
527                e
528            })?;
529        self.step += 1;
530        Ok(())
531    }
532
533    async fn commit_procedure(&mut self) -> Result<()> {
534        self.store
535            .commit_procedure(self.meta.id, self.step)
536            .await
537            .map_err(|e| {
538                error!(
539                    e; "Failed to commit procedure {}-{}",
540                    self.procedure.type_name(),
541                    self.meta.id
542                );
543                e
544            })?;
545        self.step += 1;
546        Ok(())
547    }
548
549    async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
550        // Persists procedure state
551        let type_name = self.procedure.type_name().to_string();
552        let data = self.procedure.dump()?;
553        let message = ProcedureMessage {
554            type_name,
555            data,
556            parent_id: self.meta.parent_id,
557            step: self.step,
558            error: Some(error),
559        };
560        self.store
561            .rollback_procedure(self.meta.id, message)
562            .await
563            .map_err(|e| {
564                error!(
565                    e; "Failed to write rollback key for procedure {}-{}",
566                    self.procedure.type_name(),
567                    self.meta.id
568                );
569                e
570            })?;
571        self.step += 1;
572        Ok(())
573    }
574
575    fn done(&self, output: Option<Output>) {
576        // TODO(yingwen): Add files to remove list.
577        info!(
578            "Procedure {}-{} done",
579            self.procedure.type_name(),
580            self.meta.id,
581        );
582
583        // Mark the state of this procedure to done.
584        self.meta.set_state(ProcedureState::Done { output });
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use std::assert_matches::assert_matches;
591    use std::sync::atomic::{AtomicU64, Ordering};
592    use std::sync::Arc;
593
594    use async_trait::async_trait;
595    use common_error::ext::{ErrorExt, PlainError};
596    use common_error::mock::MockError;
597    use common_error::status_code::StatusCode;
598    use common_test_util::temp_dir::create_temp_dir;
599    use futures::future::join_all;
600    use futures_util::future::BoxFuture;
601    use futures_util::FutureExt;
602    use object_store::{EntryMode, ObjectStore};
603    use tokio::sync::mpsc;
604    use tokio::sync::watch::Receiver;
605
606    use super::*;
607    use crate::local::{test_util, DynamicKeyLockGuard};
608    use crate::procedure::PoisonKeys;
609    use crate::store::proc_path;
610    use crate::test_util::InMemoryPoisonStore;
611    use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
612
613    const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
614
615    fn new_runner(
616        meta: ProcedureMetaRef,
617        procedure: BoxedProcedure,
618        store: Arc<ProcedureStore>,
619    ) -> Runner {
620        Runner {
621            meta,
622            procedure,
623            manager_ctx: Arc::new(ManagerContext::new(
624                Arc::new(InMemoryPoisonStore::default()),
625            )),
626            step: 0,
627            exponential_builder: ExponentialBuilder::default(),
628            store,
629            rolling_back: false,
630        }
631    }
632
633    async fn check_files(
634        object_store: &ObjectStore,
635        procedure_store: &ProcedureStore,
636        procedure_id: ProcedureId,
637        files: &[&str],
638    ) {
639        let dir = proc_path!(procedure_store, "{procedure_id}/");
640        let lister = object_store.list(&dir).await.unwrap();
641        let mut files_in_dir: Vec<_> = lister
642            .into_iter()
643            .filter(|x| x.metadata().mode() == EntryMode::FILE)
644            .map(|de| de.name().to_string())
645            .collect();
646        files_in_dir.sort_unstable();
647        assert_eq!(files, files_in_dir);
648    }
649
650    fn context_with_provider(
651        procedure_id: ProcedureId,
652        provider: Arc<dyn ContextProvider>,
653    ) -> Context {
654        Context {
655            procedure_id,
656            provider,
657        }
658    }
659
660    fn context_without_provider(procedure_id: ProcedureId) -> Context {
661        struct MockProvider;
662
663        #[async_trait]
664        impl ContextProvider for MockProvider {
665            async fn procedure_state(
666                &self,
667                _procedure_id: ProcedureId,
668            ) -> Result<Option<ProcedureState>> {
669                unimplemented!()
670            }
671
672            async fn procedure_state_receiver(
673                &self,
674                _procedure_id: ProcedureId,
675            ) -> Result<Option<Receiver<ProcedureState>>> {
676                unimplemented!()
677            }
678
679            async fn try_put_poison(
680                &self,
681                _key: &PoisonKey,
682                _procedure_id: ProcedureId,
683            ) -> Result<()> {
684                unimplemented!()
685            }
686
687            async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
688                unimplemented!()
689            }
690        }
691
692        Context {
693            procedure_id,
694            provider: Arc::new(MockProvider),
695        }
696    }
697
698    type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
699
700    struct ProcedureAdapter<F> {
701        data: String,
702        lock_key: LockKey,
703        poison_keys: PoisonKeys,
704        exec_fn: F,
705        rollback_fn: Option<RollbackFn>,
706    }
707
708    impl<F> ProcedureAdapter<F> {
709        fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
710            let mut meta = test_util::procedure_meta_for_test();
711            meta.id = ProcedureId::parse_str(uuid).unwrap();
712            meta.lock_key = self.lock_key.clone();
713            meta.poison_keys = self.poison_keys.clone();
714
715            Arc::new(meta)
716        }
717    }
718
719    #[async_trait]
720    impl<F> Procedure for ProcedureAdapter<F>
721    where
722        F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
723    {
724        fn type_name(&self) -> &str {
725            "ProcedureAdapter"
726        }
727
728        async fn execute(&mut self, ctx: &Context) -> Result<Status> {
729            let f = (self.exec_fn)(ctx.clone());
730            f.await
731        }
732
733        async fn rollback(&mut self, ctx: &Context) -> Result<()> {
734            if let Some(f) = &mut self.rollback_fn {
735                return (f)(ctx.clone()).await;
736            }
737            Ok(())
738        }
739
740        fn rollback_supported(&self) -> bool {
741            self.rollback_fn.is_some()
742        }
743
744        fn dump(&self) -> Result<String> {
745            Ok(self.data.clone())
746        }
747
748        fn lock_key(&self) -> LockKey {
749            self.lock_key.clone()
750        }
751
752        fn poison_keys(&self) -> PoisonKeys {
753            self.poison_keys.clone()
754        }
755    }
756
757    async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
758        let mut times = 0;
759        let exec_fn = move |_| {
760            times += 1;
761            async move {
762                if times == 1 {
763                    Ok(Status::executing(persist))
764                } else {
765                    Ok(Status::done())
766                }
767            }
768            .boxed()
769        };
770        let normal = ProcedureAdapter {
771            data: "normal".to_string(),
772            lock_key: LockKey::single_exclusive("catalog.schema.table"),
773            poison_keys: PoisonKeys::default(),
774            exec_fn,
775            rollback_fn: None,
776        };
777
778        let dir = create_temp_dir("normal");
779        let meta = normal.new_meta(ROOT_ID);
780        let ctx = context_without_provider(meta.id);
781        let object_store = test_util::new_object_store(&dir);
782        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
783        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
784        runner.manager_ctx.start();
785
786        runner.execute_once(&ctx).await;
787        let state = runner.meta.state();
788        assert!(state.is_running(), "{state:?}");
789        check_files(
790            &object_store,
791            &procedure_store,
792            ctx.procedure_id,
793            first_files,
794        )
795        .await;
796
797        runner.execute_once(&ctx).await;
798        let state = runner.meta.state();
799        assert!(state.is_done(), "{state:?}");
800        check_files(
801            &object_store,
802            &procedure_store,
803            ctx.procedure_id,
804            second_files,
805        )
806        .await;
807    }
808
809    #[tokio::test]
810    async fn test_execute_once_normal() {
811        execute_once_normal(
812            true,
813            &["0000000000.step"],
814            &["0000000000.step", "0000000001.commit"],
815        )
816        .await;
817    }
818
819    #[tokio::test]
820    async fn test_execute_once_normal_skip_persist() {
821        execute_once_normal(false, &[], &["0000000000.commit"]).await;
822    }
823
824    #[tokio::test]
825    async fn test_on_suspend_empty() {
826        let exec_fn = move |_| {
827            async move {
828                Ok(Status::Suspended {
829                    subprocedures: Vec::new(),
830                    persist: false,
831                })
832            }
833            .boxed()
834        };
835        let suspend = ProcedureAdapter {
836            data: "suspend".to_string(),
837            lock_key: LockKey::single_exclusive("catalog.schema.table"),
838            poison_keys: PoisonKeys::default(),
839            exec_fn,
840            rollback_fn: None,
841        };
842
843        let dir = create_temp_dir("suspend");
844        let meta = suspend.new_meta(ROOT_ID);
845        let ctx = context_without_provider(meta.id);
846        let object_store = test_util::new_object_store(&dir);
847        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
848        let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
849        runner.manager_ctx.start();
850
851        runner.execute_once(&ctx).await;
852        let state = runner.meta.state();
853        assert!(state.is_running(), "{state:?}");
854    }
855
856    fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
857        let mut times = 0;
858        let exec_fn = move |_| {
859            times += 1;
860            async move {
861                if times == 1 {
862                    time::sleep(Duration::from_millis(200)).await;
863                    Ok(Status::executing(true))
864                } else {
865                    Ok(Status::done())
866                }
867            }
868            .boxed()
869        };
870        let child = ProcedureAdapter {
871            data: "child".to_string(),
872            lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
873            poison_keys: PoisonKeys::default(),
874            exec_fn,
875            rollback_fn: None,
876        };
877
878        ProcedureWithId {
879            id: procedure_id,
880            procedure: Box::new(child),
881        }
882    }
883
884    #[tokio::test]
885    async fn test_on_suspend_by_subprocedures() {
886        let mut times = 0;
887        let children_ids = [ProcedureId::random(), ProcedureId::random()];
888        let keys = [
889            &[
890                "catalog.schema.table.region-0",
891                "catalog.schema.table.region-1",
892            ],
893            &[
894                "catalog.schema.table.region-2",
895                "catalog.schema.table.region-3",
896            ],
897        ];
898
899        let exec_fn = move |ctx: Context| {
900            times += 1;
901            async move {
902                if times == 1 {
903                    // Submit subprocedures.
904                    Ok(Status::Suspended {
905                        subprocedures: children_ids
906                            .into_iter()
907                            .zip(keys)
908                            .map(|(id, key_slice)| new_child_procedure(id, key_slice))
909                            .collect(),
910                        persist: true,
911                    })
912                } else {
913                    // Wait for subprocedures.
914                    let mut all_child_done = true;
915                    for id in children_ids {
916                        let is_not_done = ctx
917                            .provider
918                            .procedure_state(id)
919                            .await
920                            .unwrap()
921                            .map(|s| !s.is_done())
922                            .unwrap_or(true);
923                        if is_not_done {
924                            all_child_done = false;
925                        }
926                    }
927                    if all_child_done {
928                        Ok(Status::done())
929                    } else {
930                        // Return suspended to wait for notify.
931                        Ok(Status::Suspended {
932                            subprocedures: Vec::new(),
933                            persist: false,
934                        })
935                    }
936                }
937            }
938            .boxed()
939        };
940        let parent = ProcedureAdapter {
941            data: "parent".to_string(),
942            lock_key: LockKey::single_exclusive("catalog.schema.table"),
943            poison_keys: PoisonKeys::default(),
944            exec_fn,
945            rollback_fn: None,
946        };
947
948        let dir = create_temp_dir("parent");
949        let meta = parent.new_meta(ROOT_ID);
950        let procedure_id = meta.id;
951
952        let object_store = test_util::new_object_store(&dir);
953        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
954        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
955        let poison_manager = Arc::new(InMemoryPoisonStore::default());
956        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
957        manager_ctx.start();
958        // Manually add this procedure to the manager ctx.
959        assert!(manager_ctx.try_insert_procedure(meta));
960        // Replace the manager ctx.
961        runner.manager_ctx = manager_ctx.clone();
962
963        runner.run().await;
964        assert!(manager_ctx.key_lock.is_empty());
965
966        // Check child procedures.
967        for child_id in children_ids {
968            let state = manager_ctx.state(child_id).unwrap();
969            assert!(state.is_done(), "{state:?}");
970        }
971        let state = manager_ctx.state(procedure_id).unwrap();
972        assert!(state.is_done(), "{state:?}");
973        // Files are removed.
974        check_files(&object_store, &procedure_store, procedure_id, &[]).await;
975
976        tokio::time::sleep(Duration::from_millis(5)).await;
977        // Clean outdated meta.
978        manager_ctx.remove_outdated_meta(Duration::from_millis(1));
979        assert!(manager_ctx.state(procedure_id).is_none());
980        assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
981        for child_id in children_ids {
982            assert!(manager_ctx.state(child_id).is_none());
983        }
984    }
985
986    #[tokio::test]
987    async fn test_running_is_stopped() {
988        let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
989        let normal = ProcedureAdapter {
990            data: "normal".to_string(),
991            lock_key: LockKey::single_exclusive("catalog.schema.table"),
992            poison_keys: PoisonKeys::default(),
993            exec_fn,
994            rollback_fn: None,
995        };
996
997        let dir = create_temp_dir("test_running_is_stopped");
998        let meta = normal.new_meta(ROOT_ID);
999        let ctx = context_without_provider(meta.id);
1000        let object_store = test_util::new_object_store(&dir);
1001        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1002        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1003        runner.manager_ctx.start();
1004
1005        runner.execute_once(&ctx).await;
1006        let state = runner.meta.state();
1007        assert!(state.is_running(), "{state:?}");
1008        check_files(
1009            &object_store,
1010            &procedure_store,
1011            ctx.procedure_id,
1012            &["0000000000.step"],
1013        )
1014        .await;
1015
1016        runner.manager_ctx.stop();
1017        runner.execute_once(&ctx).await;
1018        let state = runner.meta.state();
1019        assert!(state.is_failed(), "{state:?}");
1020        // Shouldn't write any files
1021        check_files(
1022            &object_store,
1023            &procedure_store,
1024            ctx.procedure_id,
1025            &["0000000000.step"],
1026        )
1027        .await;
1028    }
1029
1030    #[tokio::test]
1031    async fn test_running_is_stopped_on_error() {
1032        let exec_fn =
1033            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1034        let normal = ProcedureAdapter {
1035            data: "fail".to_string(),
1036            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1037            poison_keys: PoisonKeys::default(),
1038            exec_fn,
1039            rollback_fn: None,
1040        };
1041
1042        let dir = create_temp_dir("test_running_is_stopped_on_error");
1043        let meta = normal.new_meta(ROOT_ID);
1044        let ctx = context_without_provider(meta.id);
1045        let object_store = test_util::new_object_store(&dir);
1046        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1047        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1048        runner.manager_ctx.stop();
1049
1050        runner.execute_once(&ctx).await;
1051        let state = runner.meta.state();
1052        assert!(state.is_failed(), "{state:?}");
1053        // Shouldn't write any files
1054        check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1055    }
1056
1057    #[tokio::test]
1058    async fn test_execute_on_error() {
1059        let exec_fn =
1060            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1061        let fail = ProcedureAdapter {
1062            data: "fail".to_string(),
1063            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1064            poison_keys: PoisonKeys::default(),
1065            exec_fn,
1066            rollback_fn: None,
1067        };
1068
1069        let dir = create_temp_dir("fail");
1070        let meta = fail.new_meta(ROOT_ID);
1071        let ctx = context_without_provider(meta.id);
1072        let object_store = test_util::new_object_store(&dir);
1073        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1074        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1075        runner.manager_ctx.start();
1076
1077        runner.execute_once(&ctx).await;
1078        let state = runner.meta.state();
1079        assert!(state.is_prepare_rollback(), "{state:?}");
1080
1081        runner.execute_once(&ctx).await;
1082        let state = runner.meta.state();
1083        assert!(state.is_failed(), "{state:?}");
1084        check_files(
1085            &object_store,
1086            &procedure_store,
1087            ctx.procedure_id,
1088            &["0000000000.rollback"],
1089        )
1090        .await;
1091    }
1092
1093    #[tokio::test]
1094    async fn test_execute_with_rollback_on_error() {
1095        let exec_fn =
1096            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1097        let rollback_fn = move |_| async move { Ok(()) }.boxed();
1098        let fail = ProcedureAdapter {
1099            data: "fail".to_string(),
1100            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1101            poison_keys: PoisonKeys::default(),
1102            exec_fn,
1103            rollback_fn: Some(Box::new(rollback_fn)),
1104        };
1105
1106        let dir = create_temp_dir("fail");
1107        let meta = fail.new_meta(ROOT_ID);
1108        let ctx = context_without_provider(meta.id);
1109        let object_store = test_util::new_object_store(&dir);
1110        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1111        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1112        runner.manager_ctx.start();
1113
1114        runner.execute_once(&ctx).await;
1115        let state = runner.meta.state();
1116        assert!(state.is_prepare_rollback(), "{state:?}");
1117
1118        runner.execute_once(&ctx).await;
1119        let state = runner.meta.state();
1120        assert!(state.is_rolling_back(), "{state:?}");
1121
1122        runner.execute_once(&ctx).await;
1123        let state = runner.meta.state();
1124        assert!(state.is_failed(), "{state:?}");
1125        check_files(
1126            &object_store,
1127            &procedure_store,
1128            ctx.procedure_id,
1129            &["0000000000.rollback"],
1130        )
1131        .await;
1132    }
1133
1134    #[tokio::test]
1135    async fn test_execute_on_retry_later_error() {
1136        let mut times = 0;
1137
1138        let exec_fn = move |_| {
1139            times += 1;
1140            async move {
1141                if times == 1 {
1142                    Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1143                } else {
1144                    Ok(Status::done())
1145                }
1146            }
1147            .boxed()
1148        };
1149
1150        let retry_later = ProcedureAdapter {
1151            data: "retry_later".to_string(),
1152            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1153            poison_keys: PoisonKeys::default(),
1154            exec_fn,
1155            rollback_fn: None,
1156        };
1157
1158        let dir = create_temp_dir("retry_later");
1159        let meta = retry_later.new_meta(ROOT_ID);
1160        let ctx = context_without_provider(meta.id);
1161        let object_store = test_util::new_object_store(&dir);
1162        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1163        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1164        runner.manager_ctx.start();
1165        runner.execute_once(&ctx).await;
1166        let state = runner.meta.state();
1167        assert!(state.is_retrying(), "{state:?}");
1168
1169        runner.execute_once(&ctx).await;
1170        let state = runner.meta.state();
1171        assert!(state.is_done(), "{state:?}");
1172        assert!(meta.state().is_done());
1173        check_files(
1174            &object_store,
1175            &procedure_store,
1176            ctx.procedure_id,
1177            &["0000000000.commit"],
1178        )
1179        .await;
1180    }
1181
1182    #[tokio::test]
1183    async fn test_execute_exceed_max_retry_later() {
1184        let exec_fn =
1185            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1186
1187        let exceed_max_retry_later = ProcedureAdapter {
1188            data: "exceed_max_retry_later".to_string(),
1189            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1190            poison_keys: PoisonKeys::default(),
1191            exec_fn,
1192            rollback_fn: None,
1193        };
1194
1195        let dir = create_temp_dir("exceed_max_retry_later");
1196        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1197        let object_store = test_util::new_object_store(&dir);
1198        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1199        let mut runner = new_runner(
1200            meta.clone(),
1201            Box::new(exceed_max_retry_later),
1202            procedure_store,
1203        );
1204        runner.manager_ctx.start();
1205
1206        runner.exponential_builder = ExponentialBuilder::default()
1207            .with_min_delay(Duration::from_millis(1))
1208            .with_max_times(3);
1209
1210        // Run the runner and execute the procedure.
1211        runner.execute_procedure_in_loop().await;
1212        let err = meta.state().error().unwrap().to_string();
1213        assert!(err.contains("Procedure retry exceeded max times"));
1214    }
1215
1216    #[tokio::test]
1217    async fn test_rollback_exceed_max_retry_later() {
1218        let exec_fn =
1219            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1220        let rollback_fn = move |_| {
1221            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1222        };
1223        let exceed_max_retry_later = ProcedureAdapter {
1224            data: "exceed_max_rollback".to_string(),
1225            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1226            poison_keys: PoisonKeys::default(),
1227            exec_fn,
1228            rollback_fn: Some(Box::new(rollback_fn)),
1229        };
1230
1231        let dir = create_temp_dir("exceed_max_rollback");
1232        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1233        let object_store = test_util::new_object_store(&dir);
1234        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1235        let mut runner = new_runner(
1236            meta.clone(),
1237            Box::new(exceed_max_retry_later),
1238            procedure_store,
1239        );
1240        runner.manager_ctx.start();
1241        runner.exponential_builder = ExponentialBuilder::default()
1242            .with_min_delay(Duration::from_millis(1))
1243            .with_max_times(3);
1244
1245        // Run the runner and execute the procedure.
1246        runner.execute_procedure_in_loop().await;
1247        let err = meta.state().error().unwrap().to_string();
1248        assert!(err.contains("Procedure rollback exceeded max times"));
1249    }
1250
1251    #[tokio::test]
1252    async fn test_rollback_after_retry_fail() {
1253        let exec_fn = move |_| {
1254            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1255        };
1256
1257        let (tx, mut rx) = mpsc::channel(1);
1258        let rollback_fn = move |_| {
1259            let tx = tx.clone();
1260            async move {
1261                tx.send(()).await.unwrap();
1262                Ok(())
1263            }
1264            .boxed()
1265        };
1266        let retry_later = ProcedureAdapter {
1267            data: "rollback_after_retry_fail".to_string(),
1268            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1269            poison_keys: PoisonKeys::default(),
1270            exec_fn,
1271            rollback_fn: Some(Box::new(rollback_fn)),
1272        };
1273
1274        let dir = create_temp_dir("retry_later");
1275        let meta = retry_later.new_meta(ROOT_ID);
1276        let ctx = context_without_provider(meta.id);
1277        let object_store = test_util::new_object_store(&dir);
1278        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1279        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1280        runner.manager_ctx.start();
1281        runner.exponential_builder = ExponentialBuilder::default()
1282            .with_min_delay(Duration::from_millis(1))
1283            .with_max_times(3);
1284        // Run the runner and execute the procedure.
1285        runner.execute_procedure_in_loop().await;
1286        rx.recv().await.unwrap();
1287        assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1288        check_files(
1289            &object_store,
1290            &procedure_store,
1291            ctx.procedure_id,
1292            &["0000000000.rollback"],
1293        )
1294        .await;
1295    }
1296
1297    #[tokio::test]
1298    async fn test_child_error() {
1299        let mut times = 0;
1300        let child_id = ProcedureId::random();
1301
1302        let exec_fn = move |ctx: Context| {
1303            times += 1;
1304            async move {
1305                if times == 1 {
1306                    // Submit subprocedures.
1307                    let exec_fn = |_| {
1308                        async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1309                            .boxed()
1310                    };
1311                    let fail = ProcedureAdapter {
1312                        data: "fail".to_string(),
1313                        lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1314                        poison_keys: PoisonKeys::default(),
1315                        exec_fn,
1316                        rollback_fn: None,
1317                    };
1318
1319                    Ok(Status::Suspended {
1320                        subprocedures: vec![ProcedureWithId {
1321                            id: child_id,
1322                            procedure: Box::new(fail),
1323                        }],
1324                        persist: true,
1325                    })
1326                } else {
1327                    // Wait for subprocedures.
1328                    let state = ctx.provider.procedure_state(child_id).await.unwrap();
1329                    let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1330                    if is_failed {
1331                        // The parent procedure to abort itself if child procedure is failed.
1332                        Err(Error::from_error_ext(PlainError::new(
1333                            "subprocedure failed".to_string(),
1334                            StatusCode::Unexpected,
1335                        )))
1336                    } else {
1337                        // Return suspended to wait for notify.
1338                        Ok(Status::Suspended {
1339                            subprocedures: Vec::new(),
1340                            persist: false,
1341                        })
1342                    }
1343                }
1344            }
1345            .boxed()
1346        };
1347        let parent = ProcedureAdapter {
1348            data: "parent".to_string(),
1349            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1350            poison_keys: PoisonKeys::default(),
1351            exec_fn,
1352            rollback_fn: None,
1353        };
1354
1355        let dir = create_temp_dir("child_err");
1356        let meta = parent.new_meta(ROOT_ID);
1357
1358        let object_store = test_util::new_object_store(&dir);
1359        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1360        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1361        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1362        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1363        manager_ctx.start();
1364        // Manually add this procedure to the manager ctx.
1365        assert!(manager_ctx.try_insert_procedure(meta.clone()));
1366        // Replace the manager ctx.
1367        runner.manager_ctx = manager_ctx.clone();
1368
1369        // Run the runner and execute the procedure.
1370        runner.run().await;
1371        assert!(manager_ctx.key_lock.is_empty());
1372        let err = meta.state().error().unwrap().output_msg();
1373        assert!(err.contains("subprocedure failed"), "{err}");
1374    }
1375
1376    #[tokio::test]
1377    async fn test_execute_with_clean_poisons() {
1378        common_telemetry::init_default_ut_logging();
1379        let mut times = 0;
1380        let poison_key = PoisonKey::new("table/1024");
1381        let moved_poison_key = poison_key.clone();
1382        let exec_fn = move |ctx: Context| {
1383            times += 1;
1384            let poison_key = moved_poison_key.clone();
1385            async move {
1386                if times == 1 {
1387                    // Put the poison to the context.
1388                    ctx.provider
1389                        .try_put_poison(&poison_key, ctx.procedure_id)
1390                        .await
1391                        .unwrap();
1392
1393                    Ok(Status::executing(true))
1394                } else {
1395                    Ok(Status::executing_with_clean_poisons(true))
1396                }
1397            }
1398            .boxed()
1399        };
1400        let poison = ProcedureAdapter {
1401            data: "poison".to_string(),
1402            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1403            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1404            exec_fn,
1405            rollback_fn: None,
1406        };
1407
1408        let dir = create_temp_dir("clean_poisons");
1409        let meta = poison.new_meta(ROOT_ID);
1410
1411        let object_store = test_util::new_object_store(&dir);
1412        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1413        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1414
1415        // Use the manager ctx as the context provider.
1416        let ctx = context_with_provider(
1417            meta.id,
1418            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1419        );
1420        // Manually add this procedure to the manager ctx.
1421        runner
1422            .manager_ctx
1423            .procedures
1424            .write()
1425            .unwrap()
1426            .insert(meta.id, runner.meta.clone());
1427
1428        runner.manager_ctx.start();
1429        runner.execute_once(&ctx).await;
1430        let state = runner.meta.state();
1431        assert!(state.is_running(), "{state:?}");
1432
1433        let procedure_id = runner
1434            .manager_ctx
1435            .poison_manager
1436            .get_poison(&poison_key.to_string())
1437            .await
1438            .unwrap();
1439        // poison key should be exist.
1440        assert!(procedure_id.is_some());
1441
1442        runner.execute_once(&ctx).await;
1443        let state = runner.meta.state();
1444        assert!(state.is_running(), "{state:?}");
1445
1446        let procedure_id = runner
1447            .manager_ctx
1448            .poison_manager
1449            .get_poison(&poison_key.to_string())
1450            .await
1451            .unwrap();
1452        // poison key should be deleted.
1453        assert!(procedure_id.is_none());
1454    }
1455
1456    #[tokio::test]
1457    async fn test_execute_error_with_clean_poisons() {
1458        common_telemetry::init_default_ut_logging();
1459        let mut times = 0;
1460        let poison_key = PoisonKey::new("table/1024");
1461        let moved_poison_key = poison_key.clone();
1462        let exec_fn = move |ctx: Context| {
1463            times += 1;
1464            let poison_key = moved_poison_key.clone();
1465            async move {
1466                if times == 1 {
1467                    // Put the poison to the context.
1468                    ctx.provider
1469                        .try_put_poison(&poison_key, ctx.procedure_id)
1470                        .await
1471                        .unwrap();
1472
1473                    Ok(Status::executing(true))
1474                } else {
1475                    Err(Error::external_and_clean_poisons(MockError::new(
1476                        StatusCode::Unexpected,
1477                    )))
1478                }
1479            }
1480            .boxed()
1481        };
1482        let poison = ProcedureAdapter {
1483            data: "poison".to_string(),
1484            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1485            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1486            exec_fn,
1487            rollback_fn: None,
1488        };
1489
1490        let dir = create_temp_dir("error_with_clean_poisons");
1491        let meta = poison.new_meta(ROOT_ID);
1492
1493        let object_store = test_util::new_object_store(&dir);
1494        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1495        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1496
1497        // Use the manager ctx as the context provider.
1498        let ctx = context_with_provider(
1499            meta.id,
1500            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1501        );
1502        // Manually add this procedure to the manager ctx.
1503        runner
1504            .manager_ctx
1505            .procedures
1506            .write()
1507            .unwrap()
1508            .insert(meta.id, runner.meta.clone());
1509
1510        runner.manager_ctx.start();
1511        runner.execute_once(&ctx).await;
1512        let state = runner.meta.state();
1513        assert!(state.is_running(), "{state:?}");
1514
1515        let procedure_id = runner
1516            .manager_ctx
1517            .poison_manager
1518            .get_poison(&poison_key.to_string())
1519            .await
1520            .unwrap();
1521        // poison key should be exist.
1522        assert!(procedure_id.is_some());
1523
1524        runner.execute_once(&ctx).await;
1525        let state = runner.meta.state();
1526        assert!(state.is_prepare_rollback(), "{state:?}");
1527
1528        let procedure_id = runner
1529            .manager_ctx
1530            .poison_manager
1531            .get_poison(&poison_key.to_string())
1532            .await
1533            .unwrap();
1534        // poison key should be deleted.
1535        assert!(procedure_id.is_none());
1536    }
1537
1538    #[tokio::test]
1539    async fn test_execute_failed_after_set_poison() {
1540        let mut times = 0;
1541        let poison_key = PoisonKey::new("table/1024");
1542        let moved_poison_key = poison_key.clone();
1543        let exec_fn = move |ctx: Context| {
1544            times += 1;
1545            let poison_key = moved_poison_key.clone();
1546            async move {
1547                if times == 1 {
1548                    Ok(Status::executing(true))
1549                } else {
1550                    // Put the poison to the context.
1551                    ctx.provider
1552                        .try_put_poison(&poison_key, ctx.procedure_id)
1553                        .await
1554                        .unwrap();
1555                    Err(Error::external(MockError::new(StatusCode::Unexpected)))
1556                }
1557            }
1558            .boxed()
1559        };
1560        let poison = ProcedureAdapter {
1561            data: "poison".to_string(),
1562            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1563            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1564            exec_fn,
1565            rollback_fn: None,
1566        };
1567
1568        let dir = create_temp_dir("poison");
1569        let meta = poison.new_meta(ROOT_ID);
1570
1571        let object_store = test_util::new_object_store(&dir);
1572        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1573        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1574
1575        // Use the manager ctx as the context provider.
1576        let ctx = context_with_provider(
1577            meta.id,
1578            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1579        );
1580        // Manually add this procedure to the manager ctx.
1581        runner
1582            .manager_ctx
1583            .procedures
1584            .write()
1585            .unwrap()
1586            .insert(meta.id, runner.meta.clone());
1587
1588        runner.manager_ctx.start();
1589        runner.execute_once(&ctx).await;
1590        let state = runner.meta.state();
1591        assert!(state.is_running(), "{state:?}");
1592
1593        runner.execute_once(&ctx).await;
1594        let state = runner.meta.state();
1595        assert!(state.is_prepare_rollback(), "{state:?}");
1596        assert!(meta.state().is_prepare_rollback());
1597
1598        runner.execute_once(&ctx).await;
1599        let state = runner.meta.state();
1600        assert!(state.is_failed(), "{state:?}");
1601        assert!(meta.state().is_failed());
1602
1603        // Check the poison is set.
1604        let procedure_id = runner
1605            .manager_ctx
1606            .poison_manager
1607            .get_poison(&poison_key.to_string())
1608            .await
1609            .unwrap()
1610            .unwrap();
1611
1612        // If the procedure is poisoned, the poison key shouldn't be deleted.
1613        assert_eq!(&procedure_id.to_string(), ROOT_ID);
1614    }
1615
1616    #[tokio::test]
1617    async fn test_execute_exceed_max_retry_after_set_poison() {
1618        common_telemetry::init_default_ut_logging();
1619        let mut times = 0;
1620        let poison_key = PoisonKey::new("table/1024");
1621        let moved_poison_key = poison_key.clone();
1622        let exec_fn = move |ctx: Context| {
1623            times += 1;
1624            let poison_key = moved_poison_key.clone();
1625            async move {
1626                if times == 1 {
1627                    Ok(Status::executing(true))
1628                } else {
1629                    // Put the poison to the context.
1630                    ctx.provider
1631                        .try_put_poison(&poison_key, ctx.procedure_id)
1632                        .await
1633                        .unwrap();
1634                    Err(Error::retry_later_and_clean_poisons(MockError::new(
1635                        StatusCode::Unexpected,
1636                    )))
1637                }
1638            }
1639            .boxed()
1640        };
1641        let poison = ProcedureAdapter {
1642            data: "poison".to_string(),
1643            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1644            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1645            exec_fn,
1646            rollback_fn: None,
1647        };
1648
1649        let dir = create_temp_dir("exceed_max_after_set_poison");
1650        let meta = poison.new_meta(ROOT_ID);
1651        let object_store = test_util::new_object_store(&dir);
1652        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1653        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
1654        runner.manager_ctx.start();
1655        runner.exponential_builder = ExponentialBuilder::default()
1656            .with_min_delay(Duration::from_millis(1))
1657            .with_max_times(3);
1658        // Use the manager ctx as the context provider.
1659        let ctx = context_with_provider(
1660            meta.id,
1661            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1662        );
1663        // Manually add this procedure to the manager ctx.
1664        runner
1665            .manager_ctx
1666            .procedures
1667            .write()
1668            .unwrap()
1669            .insert(meta.id, runner.meta.clone());
1670        // Run the runner and execute the procedure.
1671        runner.execute_once_with_retry(&ctx).await;
1672        let err = meta.state().error().unwrap().clone();
1673        assert_matches!(&*err, Error::RetryTimesExceeded { .. });
1674
1675        // Check the poison is deleted.
1676        let procedure_id = runner
1677            .manager_ctx
1678            .poison_manager
1679            .get_poison(&poison_key.to_string())
1680            .await
1681            .unwrap();
1682        assert_eq!(procedure_id, None);
1683    }
1684
1685    #[tokio::test]
1686    async fn test_execute_poisoned() {
1687        let mut times = 0;
1688        let poison_key = PoisonKey::new("table/1024");
1689        let moved_poison_key = poison_key.clone();
1690        let exec_fn = move |ctx: Context| {
1691            times += 1;
1692            let poison_key = moved_poison_key.clone();
1693            async move {
1694                if times == 1 {
1695                    Ok(Status::executing(true))
1696                } else {
1697                    // Put the poison to the context.
1698                    ctx.provider
1699                        .try_put_poison(&poison_key, ctx.procedure_id)
1700                        .await
1701                        .unwrap();
1702                    Ok(Status::Poisoned {
1703                        keys: PoisonKeys::new(vec![poison_key.clone()]),
1704                        error: Error::external(MockError::new(StatusCode::Unexpected)),
1705                    })
1706                }
1707            }
1708            .boxed()
1709        };
1710        let poison = ProcedureAdapter {
1711            data: "poison".to_string(),
1712            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1713            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1714            exec_fn,
1715            rollback_fn: None,
1716        };
1717
1718        let dir = create_temp_dir("poison");
1719        let meta = poison.new_meta(ROOT_ID);
1720
1721        let object_store = test_util::new_object_store(&dir);
1722        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1723        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1724
1725        // Use the manager ctx as the context provider.
1726        let ctx = context_with_provider(
1727            meta.id,
1728            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1729        );
1730        // Manually add this procedure to the manager ctx.
1731        runner
1732            .manager_ctx
1733            .procedures
1734            .write()
1735            .unwrap()
1736            .insert(meta.id, runner.meta.clone());
1737
1738        runner.manager_ctx.start();
1739        runner.execute_once(&ctx).await;
1740        let state = runner.meta.state();
1741        assert!(state.is_running(), "{state:?}");
1742
1743        runner.execute_once(&ctx).await;
1744        let state = runner.meta.state();
1745        assert!(state.is_poisoned(), "{state:?}");
1746        assert!(meta.state().is_poisoned());
1747        check_files(
1748            &object_store,
1749            &procedure_store,
1750            ctx.procedure_id,
1751            &["0000000000.step"],
1752        )
1753        .await;
1754
1755        // Check the poison is set.
1756        let procedure_id = runner
1757            .manager_ctx
1758            .poison_manager
1759            .get_poison(&poison_key.to_string())
1760            .await
1761            .unwrap()
1762            .unwrap();
1763
1764        // If the procedure is poisoned, the poison key shouldn't be deleted.
1765        assert_eq!(procedure_id, ROOT_ID);
1766    }
1767
1768    fn test_procedure_with_dynamic_lock(
1769        shared_atomic_value: Arc<AtomicU64>,
1770        id: u64,
1771    ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1772        let exec_fn = move |ctx: Context| {
1773            let moved_shared_atomic_value = shared_atomic_value.clone();
1774            let moved_ctx = ctx.clone();
1775            async move {
1776                debug!("Acquiring write lock, id: {}", id);
1777                let key = StringKey::Exclusive("test_lock".to_string());
1778                let guard = moved_ctx.provider.acquire_lock(&key).await;
1779                debug!("Acquired write lock, id: {}", id);
1780                let millis = rand::rng().random_range(10..=50);
1781                tokio::time::sleep(Duration::from_millis(millis)).await;
1782                let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1783                moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1784                debug!("Dropping write lock, id: {}", id);
1785                drop(guard);
1786
1787                Ok(Status::done())
1788            }
1789            .boxed()
1790        };
1791
1792        let adapter = ProcedureAdapter {
1793            data: "dynamic_lock".to_string(),
1794            lock_key: LockKey::new_exclusive([]),
1795            poison_keys: PoisonKeys::new([]),
1796            exec_fn,
1797            rollback_fn: None,
1798        };
1799        let meta = adapter.new_meta(ROOT_ID);
1800
1801        (Box::new(adapter), meta)
1802    }
1803
1804    #[tokio::test(flavor = "multi_thread")]
1805    async fn test_execute_with_dynamic_lock() {
1806        common_telemetry::init_default_ut_logging();
1807        let shared_atomic_value = Arc::new(AtomicU64::new(0));
1808        let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1809        let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1810
1811        let dir = create_temp_dir("dynamic_lock");
1812        let object_store = test_util::new_object_store(&dir);
1813        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1814        let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
1815        let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
1816        let ctx1 = context_with_provider(
1817            meta1.id,
1818            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1819        );
1820        let ctx2 = context_with_provider(
1821            meta2.id,
1822            // use same manager ctx as runner1
1823            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1824        );
1825        let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
1826        join_all(tasks).await;
1827        assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
1828    }
1829}