common_procedure/local/
runner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_event_recorder::EventRecorderRef;
21use common_telemetry::{debug, error, info};
22use rand::Rng;
23use snafu::ResultExt;
24use tokio::time;
25
26use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
27use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
28use crate::procedure::{Output, StringKey};
29use crate::rwlock::OwnedKeyRwLockGuard;
30use crate::store::{ProcedureMessage, ProcedureStore};
31use crate::{
32    BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
33};
34
35/// A guard to cleanup procedure state.
36struct ProcedureGuard {
37    meta: ProcedureMetaRef,
38    manager_ctx: Arc<ManagerContext>,
39    key_guards: Vec<OwnedKeyRwLockGuard>,
40    finish: bool,
41}
42
43impl ProcedureGuard {
44    /// Returns a new [ProcedureGuard].
45    fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
46        ProcedureGuard {
47            meta,
48            manager_ctx,
49            key_guards: vec![],
50            finish: false,
51        }
52    }
53
54    /// The procedure is finished successfully.
55    fn finish(mut self) {
56        self.finish = true;
57    }
58}
59
60impl Drop for ProcedureGuard {
61    fn drop(&mut self) {
62        if !self.finish {
63            error!("Procedure {} exits unexpectedly", self.meta.id);
64
65            // Set state to failed. This is useful in test as runtime may not abort when the runner task panics.
66            // See https://github.com/tokio-rs/tokio/issues/2002 .
67            // We set set_panic_hook() in the application's main function. But our tests don't have this panic hook.
68            let err = ProcedurePanicSnafu {
69                procedure_id: self.meta.id,
70            }
71            .build();
72            self.meta.set_state(ProcedureState::failed(Arc::new(err)));
73        }
74
75        // Notify parent procedure.
76        if let Some(parent_id) = self.meta.parent_id {
77            self.manager_ctx.notify_by_subprocedure(parent_id);
78        }
79
80        // Drops the key guards in the reverse order.
81        while !self.key_guards.is_empty() {
82            self.key_guards.pop();
83        }
84
85        // Clean the staled locks.
86        self.manager_ctx
87            .key_lock
88            .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
89    }
90}
91
92pub(crate) struct Runner {
93    pub(crate) meta: ProcedureMetaRef,
94    pub(crate) procedure: BoxedProcedure,
95    pub(crate) manager_ctx: Arc<ManagerContext>,
96    pub(crate) step: u32,
97    pub(crate) exponential_builder: ExponentialBuilder,
98    pub(crate) store: Arc<ProcedureStore>,
99    pub(crate) rolling_back: bool,
100    pub(crate) event_recorder: Option<EventRecorderRef>,
101}
102
103impl Runner {
104    /// Return `ProcedureManager` is running.
105    pub(crate) fn running(&self) -> bool {
106        self.manager_ctx.running()
107    }
108
109    /// Run the procedure.
110    pub(crate) async fn run(mut self) {
111        // Ensure we can update the procedure state.
112        let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
113
114        info!(
115            "Runner {}-{} starts",
116            self.procedure.type_name(),
117            self.meta.id
118        );
119
120        // TODO(yingwen): Detect recursive locking (and deadlock) if possible. Maybe we could detect
121        // recursive locking by adding a root procedure id to the meta.
122        for key in self.meta.lock_key.keys_to_lock() {
123            // Acquire lock for each key.
124            let key_guard = match key {
125                StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
126                StringKey::Exclusive(key) => {
127                    self.manager_ctx.key_lock.write(key.clone()).await.into()
128                }
129            };
130
131            guard.key_guards.push(key_guard);
132        }
133
134        // Execute the procedure. We need to release the lock whenever the execution
135        // is successful or fail.
136        self.meta.set_start_time_ms();
137        self.execute_procedure_in_loop().await;
138        self.meta.set_end_time_ms();
139
140        // We can't remove the metadata of the procedure now as users and its parent might
141        // need to query its state.
142        // TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store
143        // so we don't need to always store the metadata in memory after the procedure is done.
144
145        // Release locks and notify parent procedure.
146        guard.finish();
147
148        // If this is the root procedure, clean up message cache.
149        if self.meta.parent_id.is_none() {
150            let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
151            // Clean resources.
152            self.manager_ctx.on_procedures_finish(&procedure_ids);
153
154            // If `ProcedureManager` is stopped, it stops the current task immediately without deleting the procedure.
155            if !self.running() {
156                return;
157            }
158
159            for id in procedure_ids {
160                if let Err(e) = self.store.delete_procedure(id).await {
161                    error!(
162                        e;
163                        "Runner {}-{} failed to delete procedure {}",
164                        self.procedure.type_name(),
165                        self.meta.id,
166                        id,
167                    );
168                }
169            }
170        }
171
172        info!(
173            "Runner {}-{} exits",
174            self.procedure.type_name(),
175            self.meta.id
176        );
177    }
178
179    async fn execute_procedure_in_loop(&mut self) {
180        let ctx = Context {
181            procedure_id: self.meta.id,
182            provider: self.manager_ctx.clone(),
183        };
184
185        self.rolling_back = false;
186        self.execute_once_with_retry(&ctx).await;
187    }
188
189    async fn execute_once_with_retry(&mut self, ctx: &Context) {
190        let mut retry = self.exponential_builder.build();
191        let mut retry_times = 0;
192
193        let mut rollback = self.exponential_builder.build();
194        let mut rollback_times = 0;
195
196        loop {
197            // Don't store state if `ProcedureManager` is stopped.
198            if !self.running() {
199                self.meta.set_state(ProcedureState::failed(Arc::new(
200                    error::ManagerNotStartSnafu {}.build(),
201                )));
202                return;
203            }
204            let state = self.meta.state();
205            match state {
206                ProcedureState::Running => {}
207                ProcedureState::Retrying { error } => {
208                    retry_times += 1;
209                    if let Some(d) = retry.next() {
210                        let millis = d.as_millis() as u64;
211                        // Add random noise to the retry delay to avoid retry storms.
212                        let noise = rand::rng().random_range(0..(millis / 4) + 1);
213                        let d = d.add(Duration::from_millis(noise));
214
215                        self.wait_on_err(d, retry_times).await;
216                    } else {
217                        self.meta
218                            .set_state(ProcedureState::prepare_rollback(Arc::new(
219                                Error::RetryTimesExceeded {
220                                    source: error.clone(),
221                                    procedure_id: self.meta.id,
222                                },
223                            )));
224                    }
225                }
226                ProcedureState::PrepareRollback { error }
227                | ProcedureState::RollingBack { error } => {
228                    rollback_times += 1;
229                    if let Some(d) = rollback.next() {
230                        self.wait_on_err(d, rollback_times).await;
231                    } else {
232                        let err = Err::<(), Arc<Error>>(error)
233                            .context(RollbackTimesExceededSnafu {
234                                procedure_id: self.meta.id,
235                            })
236                            .unwrap_err();
237                        self.meta.set_state(ProcedureState::failed(Arc::new(err)));
238                        return;
239                    }
240                }
241                ProcedureState::Done { .. } => return,
242                ProcedureState::Failed { .. } => return,
243                ProcedureState::Poisoned { .. } => return,
244            }
245            self.execute_once(ctx).await;
246        }
247    }
248
249    async fn clean_poisons(&mut self) -> Result<()> {
250        let mut error = None;
251        for key in self.meta.poison_keys.iter() {
252            let key = key.to_string();
253            if let Err(e) = self
254                .manager_ctx
255                .poison_manager
256                .delete_poison(key, self.meta.id.to_string())
257                .await
258            {
259                error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
260                error = Some(e);
261            }
262        }
263
264        // returns the last error if any.
265        if let Some(e) = error {
266            return Err(e);
267        }
268        Ok(())
269    }
270
271    async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
272        if self.procedure.rollback_supported()
273            && let Err(e) = self.procedure.rollback(ctx).await
274        {
275            self.meta
276                .set_state(ProcedureState::rolling_back(Arc::new(e)));
277            return;
278        }
279        self.meta.set_state(ProcedureState::failed(err));
280    }
281
282    async fn prepare_rollback(&mut self, err: Arc<Error>) {
283        if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
284            self.meta
285                .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
286            return;
287        }
288        if self.procedure.rollback_supported() {
289            self.meta.set_state(ProcedureState::rolling_back(err));
290        } else {
291            self.meta.set_state(ProcedureState::failed(err));
292        }
293    }
294
295    async fn execute_once(&mut self, ctx: &Context) {
296        match self.meta.state() {
297            ProcedureState::Running | ProcedureState::Retrying { .. } => {
298                match self.procedure.execute(ctx).await {
299                    Ok(status) => {
300                        debug!(
301                            "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
302                            self.procedure.type_name(),
303                            self.meta.id,
304                            status,
305                            status.need_persist(),
306                        );
307
308                        // Don't store state if `ProcedureManager` is stopped.
309                        if !self.running() {
310                            self.meta.set_state(ProcedureState::failed(Arc::new(
311                                error::ManagerNotStartSnafu {}.build(),
312                            )));
313                            return;
314                        }
315
316                        // Cleans poisons before persist.
317                        if status.need_clean_poisons()
318                            && let Err(e) = self.clean_poisons().await
319                        {
320                            error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
321                            self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
322                            return;
323                        }
324
325                        if status.need_persist()
326                            && let Err(e) = self.persist_procedure().await
327                        {
328                            error!(e; "Failed to persist procedure: {}", self.meta.id);
329                            self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
330                            return;
331                        }
332
333                        match status {
334                            Status::Executing { .. } => {
335                                let prev_state = self.meta.state();
336                                if !matches!(prev_state, ProcedureState::Running) {
337                                    info!(
338                                        "Set Procedure {}-{} state to running, prev_state: {:?}",
339                                        self.procedure.type_name(),
340                                        self.meta.id,
341                                        prev_state
342                                    );
343                                    self.meta.set_state(ProcedureState::Running);
344                                }
345                            }
346                            Status::Suspended { subprocedures, .. } => {
347                                let prev_state = self.meta.state();
348                                if !matches!(prev_state, ProcedureState::Running) {
349                                    info!(
350                                        "Set Procedure {}-{} state to running, prev_state: {:?}",
351                                        self.procedure.type_name(),
352                                        self.meta.id,
353                                        prev_state
354                                    );
355                                    self.meta.set_state(ProcedureState::Running);
356                                }
357                                self.on_suspended(subprocedures).await;
358                            }
359                            Status::Done { output } => {
360                                if let Err(e) = self.commit_procedure().await {
361                                    error!(e; "Failed to commit procedure: {}", self.meta.id);
362                                    self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
363                                    return;
364                                }
365
366                                self.done(output);
367                            }
368                            Status::Poisoned { error, keys } => {
369                                error!(
370                                    error;
371                                    "Procedure {}-{} is poisoned, keys: {:?}",
372                                    self.procedure.type_name(),
373                                    self.meta.id,
374                                    keys,
375                                );
376                                self.meta
377                                    .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
378                            }
379                        }
380                    }
381                    Err(e) => {
382                        error!(
383                            e;
384                            "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
385                            self.procedure.type_name(),
386                            self.meta.id,
387                            e.is_retry_later(),
388                            e.need_clean_poisons(),
389                        );
390
391                        // Don't store state if `ProcedureManager` is stopped.
392                        if !self.running() {
393                            self.meta.set_state(ProcedureState::failed(Arc::new(
394                                error::ManagerNotStartSnafu {}.build(),
395                            )));
396                            return;
397                        }
398
399                        if e.need_clean_poisons() {
400                            if let Err(e) = self.clean_poisons().await {
401                                error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
402                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
403                                return;
404                            }
405                            debug!(
406                                "Procedure {}-{} cleaned poisons",
407                                self.procedure.type_name(),
408                                self.meta.id,
409                            );
410                        }
411
412                        if e.is_retry_later() {
413                            self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
414                            return;
415                        }
416
417                        if self.procedure.rollback_supported() {
418                            self.meta
419                                .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
420                        } else {
421                            self.meta.set_state(ProcedureState::failed(Arc::new(e)));
422                        }
423                    }
424                }
425            }
426            ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
427            ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
428            ProcedureState::Failed { .. }
429            | ProcedureState::Done { .. }
430            | ProcedureState::Poisoned { .. } => (),
431        }
432    }
433
434    /// Submit a subprocedure with specific `procedure_id`.
435    fn submit_subprocedure(
436        &self,
437        procedure_id: ProcedureId,
438        procedure_state: ProcedureState,
439        procedure: BoxedProcedure,
440    ) {
441        if self.manager_ctx.contains_procedure(procedure_id) {
442            // If the parent has already submitted this procedure, don't submit it again.
443            return;
444        }
445
446        let step = 0;
447
448        let meta = Arc::new(ProcedureMeta::new(
449            procedure_id,
450            procedure_state,
451            Some(self.meta.id),
452            procedure.lock_key(),
453            procedure.poison_keys(),
454            procedure.type_name(),
455            self.event_recorder.clone(),
456            procedure.user_metadata(),
457        ));
458        let runner = Runner {
459            meta: meta.clone(),
460            procedure,
461            manager_ctx: self.manager_ctx.clone(),
462            step,
463            exponential_builder: self.exponential_builder,
464            store: self.store.clone(),
465            rolling_back: false,
466            event_recorder: self.event_recorder.clone(),
467        };
468
469        // Insert the procedure. We already check the procedure existence before inserting
470        // so we add an assertion to ensure the procedure id is unique and no other procedures
471        // using the same procedure id.
472        assert!(
473            self.manager_ctx.try_insert_procedure(meta),
474            "Procedure {}-{} submit an existing procedure {}-{}",
475            self.procedure.type_name(),
476            self.meta.id,
477            runner.procedure.type_name(),
478            procedure_id,
479        );
480
481        // Add the id of the subprocedure to the metadata.
482        self.meta.push_child(procedure_id);
483
484        let _handle = common_runtime::spawn_global(async move {
485            // Run the root procedure.
486            runner.run().await
487        });
488    }
489
490    /// Extend the retry time to wait for the next retry.
491    async fn wait_on_err(&mut self, d: Duration, i: u64) {
492        info!(
493            "Procedure {}-{} retry for the {} times after {} millis",
494            self.procedure.type_name(),
495            self.meta.id,
496            i,
497            d.as_millis(),
498        );
499        time::sleep(d).await;
500    }
501
502    async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
503        let has_child = !subprocedures.is_empty();
504        for subprocedure in subprocedures {
505            info!(
506                "Procedure {}-{} submit subprocedure {}-{}",
507                self.procedure.type_name(),
508                self.meta.id,
509                subprocedure.procedure.type_name(),
510                subprocedure.id,
511            );
512
513            self.submit_subprocedure(
514                subprocedure.id,
515                ProcedureState::Running,
516                subprocedure.procedure,
517            );
518        }
519
520        info!(
521            "Procedure {}-{} is waiting for subprocedures",
522            self.procedure.type_name(),
523            self.meta.id,
524        );
525
526        // Wait for subprocedures.
527        if has_child {
528            self.meta.child_notify.notified().await;
529
530            info!(
531                "Procedure {}-{} is waked up",
532                self.procedure.type_name(),
533                self.meta.id,
534            );
535        }
536    }
537
538    async fn persist_procedure(&mut self) -> Result<()> {
539        let type_name = self.procedure.type_name().to_string();
540        let data = self.procedure.dump()?;
541
542        self.store
543            .store_procedure(
544                self.meta.id,
545                self.step,
546                type_name,
547                data,
548                self.meta.parent_id,
549            )
550            .await
551            .map_err(|e| {
552                error!(
553                    e; "Failed to persist procedure {}-{}",
554                    self.procedure.type_name(),
555                    self.meta.id
556                );
557                e
558            })?;
559        self.step += 1;
560        Ok(())
561    }
562
563    async fn commit_procedure(&mut self) -> Result<()> {
564        self.store
565            .commit_procedure(self.meta.id, self.step)
566            .await
567            .map_err(|e| {
568                error!(
569                    e; "Failed to commit procedure {}-{}",
570                    self.procedure.type_name(),
571                    self.meta.id
572                );
573                e
574            })?;
575        self.step += 1;
576        Ok(())
577    }
578
579    async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
580        // Persists procedure state
581        let type_name = self.procedure.type_name().to_string();
582        let data = self.procedure.dump()?;
583        let message = ProcedureMessage {
584            type_name,
585            data,
586            parent_id: self.meta.parent_id,
587            step: self.step,
588            error: Some(error),
589        };
590        self.store
591            .rollback_procedure(self.meta.id, message)
592            .await
593            .map_err(|e| {
594                error!(
595                    e; "Failed to write rollback key for procedure {}-{}",
596                    self.procedure.type_name(),
597                    self.meta.id
598                );
599                e
600            })?;
601        self.step += 1;
602        Ok(())
603    }
604
605    fn done(&self, output: Option<Output>) {
606        // TODO(yingwen): Add files to remove list.
607        info!(
608            "Procedure {}-{} done",
609            self.procedure.type_name(),
610            self.meta.id,
611        );
612
613        // Mark the state of this procedure to done.
614        self.meta.set_state(ProcedureState::Done { output });
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use std::assert_matches::assert_matches;
621    use std::sync::Arc;
622    use std::sync::atomic::{AtomicU64, Ordering};
623
624    use async_trait::async_trait;
625    use common_error::ext::{ErrorExt, PlainError};
626    use common_error::mock::MockError;
627    use common_error::status_code::StatusCode;
628    use common_test_util::temp_dir::create_temp_dir;
629    use futures::future::join_all;
630    use futures_util::FutureExt;
631    use futures_util::future::BoxFuture;
632    use object_store::{EntryMode, ObjectStore};
633    use tokio::sync::mpsc;
634    use tokio::sync::watch::Receiver;
635
636    use super::*;
637    use crate::local::{DynamicKeyLockGuard, test_util};
638    use crate::procedure::PoisonKeys;
639    use crate::store::proc_path;
640    use crate::test_util::InMemoryPoisonStore;
641    use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
642
643    const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
644
645    fn new_runner(
646        meta: ProcedureMetaRef,
647        procedure: BoxedProcedure,
648        store: Arc<ProcedureStore>,
649    ) -> Runner {
650        Runner {
651            meta,
652            procedure,
653            manager_ctx: Arc::new(ManagerContext::new(
654                Arc::new(InMemoryPoisonStore::default()),
655            )),
656            step: 0,
657            exponential_builder: ExponentialBuilder::default(),
658            store,
659            rolling_back: false,
660            event_recorder: None,
661        }
662    }
663
664    async fn check_files(
665        object_store: &ObjectStore,
666        procedure_store: &ProcedureStore,
667        procedure_id: ProcedureId,
668        files: &[&str],
669    ) {
670        let dir = proc_path!(procedure_store, "{procedure_id}/");
671        let lister = object_store.list(&dir).await.unwrap();
672        let mut files_in_dir: Vec<_> = lister
673            .into_iter()
674            .filter(|x| x.metadata().mode() == EntryMode::FILE)
675            .map(|de| de.name().to_string())
676            .collect();
677        files_in_dir.sort_unstable();
678        assert_eq!(files, files_in_dir);
679    }
680
681    fn context_with_provider(
682        procedure_id: ProcedureId,
683        provider: Arc<dyn ContextProvider>,
684    ) -> Context {
685        Context {
686            procedure_id,
687            provider,
688        }
689    }
690
691    fn context_without_provider(procedure_id: ProcedureId) -> Context {
692        struct MockProvider;
693
694        #[async_trait]
695        impl ContextProvider for MockProvider {
696            async fn procedure_state(
697                &self,
698                _procedure_id: ProcedureId,
699            ) -> Result<Option<ProcedureState>> {
700                unimplemented!()
701            }
702
703            async fn procedure_state_receiver(
704                &self,
705                _procedure_id: ProcedureId,
706            ) -> Result<Option<Receiver<ProcedureState>>> {
707                unimplemented!()
708            }
709
710            async fn try_put_poison(
711                &self,
712                _key: &PoisonKey,
713                _procedure_id: ProcedureId,
714            ) -> Result<()> {
715                unimplemented!()
716            }
717
718            async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
719                unimplemented!()
720            }
721        }
722
723        Context {
724            procedure_id,
725            provider: Arc::new(MockProvider),
726        }
727    }
728
729    type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
730
731    struct ProcedureAdapter<F> {
732        data: String,
733        lock_key: LockKey,
734        poison_keys: PoisonKeys,
735        exec_fn: F,
736        rollback_fn: Option<RollbackFn>,
737    }
738
739    impl<F> ProcedureAdapter<F> {
740        fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
741            let mut meta = test_util::procedure_meta_for_test();
742            meta.id = ProcedureId::parse_str(uuid).unwrap();
743            meta.lock_key = self.lock_key.clone();
744            meta.poison_keys = self.poison_keys.clone();
745
746            Arc::new(meta)
747        }
748    }
749
750    #[async_trait]
751    impl<F> Procedure for ProcedureAdapter<F>
752    where
753        F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
754    {
755        fn type_name(&self) -> &str {
756            "ProcedureAdapter"
757        }
758
759        async fn execute(&mut self, ctx: &Context) -> Result<Status> {
760            let f = (self.exec_fn)(ctx.clone());
761            f.await
762        }
763
764        async fn rollback(&mut self, ctx: &Context) -> Result<()> {
765            if let Some(f) = &mut self.rollback_fn {
766                return (f)(ctx.clone()).await;
767            }
768            Ok(())
769        }
770
771        fn rollback_supported(&self) -> bool {
772            self.rollback_fn.is_some()
773        }
774
775        fn dump(&self) -> Result<String> {
776            Ok(self.data.clone())
777        }
778
779        fn lock_key(&self) -> LockKey {
780            self.lock_key.clone()
781        }
782
783        fn poison_keys(&self) -> PoisonKeys {
784            self.poison_keys.clone()
785        }
786    }
787
788    async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
789        let mut times = 0;
790        let exec_fn = move |_| {
791            times += 1;
792            async move {
793                if times == 1 {
794                    Ok(Status::executing(persist))
795                } else {
796                    Ok(Status::done())
797                }
798            }
799            .boxed()
800        };
801        let normal = ProcedureAdapter {
802            data: "normal".to_string(),
803            lock_key: LockKey::single_exclusive("catalog.schema.table"),
804            poison_keys: PoisonKeys::default(),
805            exec_fn,
806            rollback_fn: None,
807        };
808
809        let dir = create_temp_dir("normal");
810        let meta = normal.new_meta(ROOT_ID);
811        let ctx = context_without_provider(meta.id);
812        let object_store = test_util::new_object_store(&dir);
813        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
814        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
815        runner.manager_ctx.start();
816
817        runner.execute_once(&ctx).await;
818        let state = runner.meta.state();
819        assert!(state.is_running(), "{state:?}");
820        check_files(
821            &object_store,
822            &procedure_store,
823            ctx.procedure_id,
824            first_files,
825        )
826        .await;
827
828        runner.execute_once(&ctx).await;
829        let state = runner.meta.state();
830        assert!(state.is_done(), "{state:?}");
831        check_files(
832            &object_store,
833            &procedure_store,
834            ctx.procedure_id,
835            second_files,
836        )
837        .await;
838    }
839
840    #[tokio::test]
841    async fn test_execute_once_normal() {
842        execute_once_normal(
843            true,
844            &["0000000000.step"],
845            &["0000000000.step", "0000000001.commit"],
846        )
847        .await;
848    }
849
850    #[tokio::test]
851    async fn test_execute_once_normal_skip_persist() {
852        execute_once_normal(false, &[], &["0000000000.commit"]).await;
853    }
854
855    #[tokio::test]
856    async fn test_on_suspend_empty() {
857        let exec_fn = move |_| {
858            async move {
859                Ok(Status::Suspended {
860                    subprocedures: Vec::new(),
861                    persist: false,
862                })
863            }
864            .boxed()
865        };
866        let suspend = ProcedureAdapter {
867            data: "suspend".to_string(),
868            lock_key: LockKey::single_exclusive("catalog.schema.table"),
869            poison_keys: PoisonKeys::default(),
870            exec_fn,
871            rollback_fn: None,
872        };
873
874        let dir = create_temp_dir("suspend");
875        let meta = suspend.new_meta(ROOT_ID);
876        let ctx = context_without_provider(meta.id);
877        let object_store = test_util::new_object_store(&dir);
878        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
879        let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
880        runner.manager_ctx.start();
881
882        runner.execute_once(&ctx).await;
883        let state = runner.meta.state();
884        assert!(state.is_running(), "{state:?}");
885    }
886
887    fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
888        let mut times = 0;
889        let exec_fn = move |_| {
890            times += 1;
891            async move {
892                if times == 1 {
893                    time::sleep(Duration::from_millis(200)).await;
894                    Ok(Status::executing(true))
895                } else {
896                    Ok(Status::done())
897                }
898            }
899            .boxed()
900        };
901        let child = ProcedureAdapter {
902            data: "child".to_string(),
903            lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
904            poison_keys: PoisonKeys::default(),
905            exec_fn,
906            rollback_fn: None,
907        };
908
909        ProcedureWithId {
910            id: procedure_id,
911            procedure: Box::new(child),
912        }
913    }
914
915    #[tokio::test]
916    async fn test_on_suspend_by_subprocedures() {
917        let mut times = 0;
918        let children_ids = [ProcedureId::random(), ProcedureId::random()];
919        let keys = [
920            &[
921                "catalog.schema.table.region-0",
922                "catalog.schema.table.region-1",
923            ],
924            &[
925                "catalog.schema.table.region-2",
926                "catalog.schema.table.region-3",
927            ],
928        ];
929
930        let exec_fn = move |ctx: Context| {
931            times += 1;
932            async move {
933                if times == 1 {
934                    // Submit subprocedures.
935                    Ok(Status::Suspended {
936                        subprocedures: children_ids
937                            .into_iter()
938                            .zip(keys)
939                            .map(|(id, key_slice)| new_child_procedure(id, key_slice))
940                            .collect(),
941                        persist: true,
942                    })
943                } else {
944                    // Wait for subprocedures.
945                    let mut all_child_done = true;
946                    for id in children_ids {
947                        let is_not_done = ctx
948                            .provider
949                            .procedure_state(id)
950                            .await
951                            .unwrap()
952                            .map(|s| !s.is_done())
953                            .unwrap_or(true);
954                        if is_not_done {
955                            all_child_done = false;
956                        }
957                    }
958                    if all_child_done {
959                        Ok(Status::done())
960                    } else {
961                        // Return suspended to wait for notify.
962                        Ok(Status::Suspended {
963                            subprocedures: Vec::new(),
964                            persist: false,
965                        })
966                    }
967                }
968            }
969            .boxed()
970        };
971        let parent = ProcedureAdapter {
972            data: "parent".to_string(),
973            lock_key: LockKey::single_exclusive("catalog.schema.table"),
974            poison_keys: PoisonKeys::default(),
975            exec_fn,
976            rollback_fn: None,
977        };
978
979        let dir = create_temp_dir("parent");
980        let meta = parent.new_meta(ROOT_ID);
981        let procedure_id = meta.id;
982
983        let object_store = test_util::new_object_store(&dir);
984        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
985        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
986        let poison_manager = Arc::new(InMemoryPoisonStore::default());
987        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
988        manager_ctx.start();
989        // Manually add this procedure to the manager ctx.
990        assert!(manager_ctx.try_insert_procedure(meta));
991        // Replace the manager ctx.
992        runner.manager_ctx = manager_ctx.clone();
993
994        runner.run().await;
995        assert!(manager_ctx.key_lock.is_empty());
996
997        // Check child procedures.
998        for child_id in children_ids {
999            let state = manager_ctx.state(child_id).unwrap();
1000            assert!(state.is_done(), "{state:?}");
1001        }
1002        let state = manager_ctx.state(procedure_id).unwrap();
1003        assert!(state.is_done(), "{state:?}");
1004        // Files are removed.
1005        check_files(&object_store, &procedure_store, procedure_id, &[]).await;
1006
1007        tokio::time::sleep(Duration::from_millis(5)).await;
1008        // Clean outdated meta.
1009        manager_ctx.remove_outdated_meta(Duration::from_millis(1));
1010        assert!(manager_ctx.state(procedure_id).is_none());
1011        assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
1012        for child_id in children_ids {
1013            assert!(manager_ctx.state(child_id).is_none());
1014        }
1015    }
1016
1017    #[tokio::test]
1018    async fn test_running_is_stopped() {
1019        let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
1020        let normal = ProcedureAdapter {
1021            data: "normal".to_string(),
1022            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1023            poison_keys: PoisonKeys::default(),
1024            exec_fn,
1025            rollback_fn: None,
1026        };
1027
1028        let dir = create_temp_dir("test_running_is_stopped");
1029        let meta = normal.new_meta(ROOT_ID);
1030        let ctx = context_without_provider(meta.id);
1031        let object_store = test_util::new_object_store(&dir);
1032        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1033        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1034        runner.manager_ctx.start();
1035
1036        runner.execute_once(&ctx).await;
1037        let state = runner.meta.state();
1038        assert!(state.is_running(), "{state:?}");
1039        check_files(
1040            &object_store,
1041            &procedure_store,
1042            ctx.procedure_id,
1043            &["0000000000.step"],
1044        )
1045        .await;
1046
1047        runner.manager_ctx.stop();
1048        runner.execute_once(&ctx).await;
1049        let state = runner.meta.state();
1050        assert!(state.is_failed(), "{state:?}");
1051        // Shouldn't write any files
1052        check_files(
1053            &object_store,
1054            &procedure_store,
1055            ctx.procedure_id,
1056            &["0000000000.step"],
1057        )
1058        .await;
1059    }
1060
1061    #[tokio::test]
1062    async fn test_running_is_stopped_on_error() {
1063        let exec_fn =
1064            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1065        let normal = ProcedureAdapter {
1066            data: "fail".to_string(),
1067            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1068            poison_keys: PoisonKeys::default(),
1069            exec_fn,
1070            rollback_fn: None,
1071        };
1072
1073        let dir = create_temp_dir("test_running_is_stopped_on_error");
1074        let meta = normal.new_meta(ROOT_ID);
1075        let ctx = context_without_provider(meta.id);
1076        let object_store = test_util::new_object_store(&dir);
1077        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1078        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1079        runner.manager_ctx.stop();
1080
1081        runner.execute_once(&ctx).await;
1082        let state = runner.meta.state();
1083        assert!(state.is_failed(), "{state:?}");
1084        // Shouldn't write any files
1085        check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1086    }
1087
1088    #[tokio::test]
1089    async fn test_execute_on_error() {
1090        let exec_fn =
1091            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1092        let fail = ProcedureAdapter {
1093            data: "fail".to_string(),
1094            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1095            poison_keys: PoisonKeys::default(),
1096            exec_fn,
1097            rollback_fn: None,
1098        };
1099
1100        let dir = create_temp_dir("fail");
1101        let meta = fail.new_meta(ROOT_ID);
1102        let ctx = context_without_provider(meta.id);
1103        let object_store = test_util::new_object_store(&dir);
1104        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1105        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1106        runner.manager_ctx.start();
1107
1108        runner.execute_once(&ctx).await;
1109        let state = runner.meta.state();
1110        assert!(state.is_failed(), "{state:?}");
1111        check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1112    }
1113
1114    #[tokio::test]
1115    async fn test_execute_with_rollback_on_error() {
1116        let exec_fn =
1117            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1118        let rollback_fn = move |_| async move { Ok(()) }.boxed();
1119        let fail = ProcedureAdapter {
1120            data: "fail".to_string(),
1121            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1122            poison_keys: PoisonKeys::default(),
1123            exec_fn,
1124            rollback_fn: Some(Box::new(rollback_fn)),
1125        };
1126
1127        let dir = create_temp_dir("fail");
1128        let meta = fail.new_meta(ROOT_ID);
1129        let ctx = context_without_provider(meta.id);
1130        let object_store = test_util::new_object_store(&dir);
1131        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1132        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1133        runner.manager_ctx.start();
1134
1135        runner.execute_once(&ctx).await;
1136        let state = runner.meta.state();
1137        assert!(state.is_prepare_rollback(), "{state:?}");
1138
1139        runner.execute_once(&ctx).await;
1140        let state = runner.meta.state();
1141        assert!(state.is_rolling_back(), "{state:?}");
1142
1143        runner.execute_once(&ctx).await;
1144        let state = runner.meta.state();
1145        assert!(state.is_failed(), "{state:?}");
1146        check_files(
1147            &object_store,
1148            &procedure_store,
1149            ctx.procedure_id,
1150            &["0000000000.rollback"],
1151        )
1152        .await;
1153    }
1154
1155    #[tokio::test]
1156    async fn test_execute_on_retry_later_error() {
1157        let mut times = 0;
1158
1159        let exec_fn = move |_| {
1160            times += 1;
1161            async move {
1162                if times == 1 {
1163                    Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1164                } else if times == 2 {
1165                    Ok(Status::executing(false))
1166                } else {
1167                    Ok(Status::done())
1168                }
1169            }
1170            .boxed()
1171        };
1172
1173        let retry_later = ProcedureAdapter {
1174            data: "retry_later".to_string(),
1175            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1176            poison_keys: PoisonKeys::default(),
1177            exec_fn,
1178            rollback_fn: None,
1179        };
1180
1181        let dir = create_temp_dir("retry_later");
1182        let meta = retry_later.new_meta(ROOT_ID);
1183        let ctx = context_without_provider(meta.id);
1184        let object_store = test_util::new_object_store(&dir);
1185        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1186        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1187        runner.manager_ctx.start();
1188        runner.execute_once(&ctx).await;
1189        let state = runner.meta.state();
1190        assert!(state.is_retrying(), "{state:?}");
1191
1192        runner.execute_once(&ctx).await;
1193        let state = runner.meta.state();
1194        assert!(state.is_running(), "{state:?}");
1195
1196        runner.execute_once(&ctx).await;
1197        let state = runner.meta.state();
1198        assert!(state.is_done(), "{state:?}");
1199        assert!(meta.state().is_done());
1200        check_files(
1201            &object_store,
1202            &procedure_store,
1203            ctx.procedure_id,
1204            &["0000000000.commit"],
1205        )
1206        .await;
1207    }
1208
1209    #[tokio::test(flavor = "multi_thread")]
1210    async fn test_execute_on_retry_later_error_with_child() {
1211        common_telemetry::init_default_ut_logging();
1212        let mut times = 0;
1213        let child_id = ProcedureId::random();
1214
1215        let exec_fn = move |_| {
1216            times += 1;
1217            async move {
1218                debug!("times: {}", times);
1219                if times == 1 {
1220                    Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1221                } else if times == 2 {
1222                    let exec_fn = |_| {
1223                        async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1224                            .boxed()
1225                    };
1226                    let fail = ProcedureAdapter {
1227                        data: "fail".to_string(),
1228                        lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1229                        poison_keys: PoisonKeys::default(),
1230                        exec_fn,
1231                        rollback_fn: None,
1232                    };
1233
1234                    Ok(Status::Suspended {
1235                        subprocedures: vec![ProcedureWithId {
1236                            id: child_id,
1237                            procedure: Box::new(fail),
1238                        }],
1239                        persist: true,
1240                    })
1241                } else {
1242                    Ok(Status::done())
1243                }
1244            }
1245            .boxed()
1246        };
1247
1248        let retry_later = ProcedureAdapter {
1249            data: "retry_later".to_string(),
1250            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1251            poison_keys: PoisonKeys::default(),
1252            exec_fn,
1253            rollback_fn: None,
1254        };
1255
1256        let dir = create_temp_dir("retry_later");
1257        let meta = retry_later.new_meta(ROOT_ID);
1258        let ctx = context_without_provider(meta.id);
1259        let object_store = test_util::new_object_store(&dir);
1260        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1261        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1262        runner.manager_ctx.start();
1263        debug!("execute_once 1");
1264        runner.execute_once(&ctx).await;
1265        let state = runner.meta.state();
1266        assert!(state.is_retrying(), "{state:?}");
1267
1268        let moved_meta = meta.clone();
1269        tokio::spawn(async move {
1270            moved_meta.child_notify.notify_one();
1271        });
1272        runner.execute_once(&ctx).await;
1273        let state = runner.meta.state();
1274        assert!(state.is_running(), "{state:?}");
1275
1276        runner.execute_once(&ctx).await;
1277        let state = runner.meta.state();
1278        assert!(state.is_done(), "{state:?}");
1279        assert!(meta.state().is_done());
1280        check_files(
1281            &object_store,
1282            &procedure_store,
1283            ctx.procedure_id,
1284            &["0000000000.step", "0000000001.commit"],
1285        )
1286        .await;
1287    }
1288
1289    #[tokio::test]
1290    async fn test_execute_exceed_max_retry_later() {
1291        let exec_fn =
1292            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1293
1294        let exceed_max_retry_later = ProcedureAdapter {
1295            data: "exceed_max_retry_later".to_string(),
1296            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1297            poison_keys: PoisonKeys::default(),
1298            exec_fn,
1299            rollback_fn: None,
1300        };
1301
1302        let dir = create_temp_dir("exceed_max_retry_later");
1303        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1304        let object_store = test_util::new_object_store(&dir);
1305        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1306        let mut runner = new_runner(
1307            meta.clone(),
1308            Box::new(exceed_max_retry_later),
1309            procedure_store,
1310        );
1311        runner.manager_ctx.start();
1312
1313        runner.exponential_builder = ExponentialBuilder::default()
1314            .with_min_delay(Duration::from_millis(1))
1315            .with_max_times(3);
1316
1317        // Run the runner and execute the procedure.
1318        runner.execute_procedure_in_loop().await;
1319        let err = meta.state().error().unwrap().to_string();
1320        assert!(err.contains("Procedure retry exceeded max times"));
1321    }
1322
1323    #[tokio::test]
1324    async fn test_rollback_exceed_max_retry_later() {
1325        let exec_fn =
1326            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1327        let rollback_fn = move |_| {
1328            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1329        };
1330        let exceed_max_retry_later = ProcedureAdapter {
1331            data: "exceed_max_rollback".to_string(),
1332            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1333            poison_keys: PoisonKeys::default(),
1334            exec_fn,
1335            rollback_fn: Some(Box::new(rollback_fn)),
1336        };
1337
1338        let dir = create_temp_dir("exceed_max_rollback");
1339        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1340        let object_store = test_util::new_object_store(&dir);
1341        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1342        let mut runner = new_runner(
1343            meta.clone(),
1344            Box::new(exceed_max_retry_later),
1345            procedure_store,
1346        );
1347        runner.manager_ctx.start();
1348        runner.exponential_builder = ExponentialBuilder::default()
1349            .with_min_delay(Duration::from_millis(1))
1350            .with_max_times(3);
1351
1352        // Run the runner and execute the procedure.
1353        runner.execute_procedure_in_loop().await;
1354        let err = meta.state().error().unwrap().to_string();
1355        assert!(err.contains("Procedure rollback exceeded max times"));
1356    }
1357
1358    #[tokio::test]
1359    async fn test_rollback_after_retry_fail() {
1360        let exec_fn = move |_| {
1361            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1362        };
1363
1364        let (tx, mut rx) = mpsc::channel(1);
1365        let rollback_fn = move |_| {
1366            let tx = tx.clone();
1367            async move {
1368                tx.send(()).await.unwrap();
1369                Ok(())
1370            }
1371            .boxed()
1372        };
1373        let retry_later = ProcedureAdapter {
1374            data: "rollback_after_retry_fail".to_string(),
1375            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1376            poison_keys: PoisonKeys::default(),
1377            exec_fn,
1378            rollback_fn: Some(Box::new(rollback_fn)),
1379        };
1380
1381        let dir = create_temp_dir("retry_later");
1382        let meta = retry_later.new_meta(ROOT_ID);
1383        let ctx = context_without_provider(meta.id);
1384        let object_store = test_util::new_object_store(&dir);
1385        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1386        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1387        runner.manager_ctx.start();
1388        runner.exponential_builder = ExponentialBuilder::default()
1389            .with_min_delay(Duration::from_millis(1))
1390            .with_max_times(3);
1391        // Run the runner and execute the procedure.
1392        runner.execute_procedure_in_loop().await;
1393        rx.recv().await.unwrap();
1394        assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1395        check_files(
1396            &object_store,
1397            &procedure_store,
1398            ctx.procedure_id,
1399            &["0000000000.rollback"],
1400        )
1401        .await;
1402    }
1403
1404    #[tokio::test]
1405    async fn test_child_error() {
1406        let mut times = 0;
1407        let child_id = ProcedureId::random();
1408        common_telemetry::init_default_ut_logging();
1409        let exec_fn = move |ctx: Context| {
1410            times += 1;
1411            async move {
1412                if times == 1 {
1413                    // Submit subprocedures.
1414                    let exec_fn = |_| {
1415                        async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1416                            .boxed()
1417                    };
1418                    let fail = ProcedureAdapter {
1419                        data: "fail".to_string(),
1420                        lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1421                        poison_keys: PoisonKeys::default(),
1422                        exec_fn,
1423                        rollback_fn: None,
1424                    };
1425
1426                    Ok(Status::Suspended {
1427                        subprocedures: vec![ProcedureWithId {
1428                            id: child_id,
1429                            procedure: Box::new(fail),
1430                        }],
1431                        persist: true,
1432                    })
1433                } else {
1434                    // Wait for subprocedures.
1435                    let state = ctx.provider.procedure_state(child_id).await.unwrap();
1436                    let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1437                    if is_failed {
1438                        // The parent procedure to abort itself if child procedure is failed.
1439                        Err(Error::from_error_ext(PlainError::new(
1440                            "subprocedure failed".to_string(),
1441                            StatusCode::Unexpected,
1442                        )))
1443                    } else {
1444                        // Return suspended to wait for notify.
1445                        Ok(Status::Suspended {
1446                            subprocedures: Vec::new(),
1447                            persist: false,
1448                        })
1449                    }
1450                }
1451            }
1452            .boxed()
1453        };
1454        let parent = ProcedureAdapter {
1455            data: "parent".to_string(),
1456            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1457            poison_keys: PoisonKeys::default(),
1458            exec_fn,
1459            rollback_fn: None,
1460        };
1461
1462        let dir = create_temp_dir("child_err");
1463        let meta = parent.new_meta(ROOT_ID);
1464
1465        let object_store = test_util::new_object_store(&dir);
1466        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1467        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1468        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1469        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1470        manager_ctx.start();
1471        // Manually add this procedure to the manager ctx.
1472        assert!(manager_ctx.try_insert_procedure(meta.clone()));
1473        // Replace the manager ctx.
1474        runner.manager_ctx = manager_ctx.clone();
1475
1476        // Run the runner and execute the procedure.
1477        runner.run().await;
1478        assert!(manager_ctx.key_lock.is_empty());
1479        let err = meta.state().error().unwrap().output_msg();
1480        assert!(err.contains("subprocedure failed"), "{err}");
1481    }
1482
1483    #[tokio::test]
1484    async fn test_execute_with_clean_poisons() {
1485        common_telemetry::init_default_ut_logging();
1486        let mut times = 0;
1487        let poison_key = PoisonKey::new("table/1024");
1488        let moved_poison_key = poison_key.clone();
1489        let exec_fn = move |ctx: Context| {
1490            times += 1;
1491            let poison_key = moved_poison_key.clone();
1492            async move {
1493                if times == 1 {
1494                    // Put the poison to the context.
1495                    ctx.provider
1496                        .try_put_poison(&poison_key, ctx.procedure_id)
1497                        .await
1498                        .unwrap();
1499
1500                    Ok(Status::executing(true))
1501                } else {
1502                    Ok(Status::executing_with_clean_poisons(true))
1503                }
1504            }
1505            .boxed()
1506        };
1507        let poison = ProcedureAdapter {
1508            data: "poison".to_string(),
1509            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1510            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1511            exec_fn,
1512            rollback_fn: None,
1513        };
1514
1515        let dir = create_temp_dir("clean_poisons");
1516        let meta = poison.new_meta(ROOT_ID);
1517
1518        let object_store = test_util::new_object_store(&dir);
1519        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1520        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1521
1522        // Use the manager ctx as the context provider.
1523        let ctx = context_with_provider(
1524            meta.id,
1525            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1526        );
1527        // Manually add this procedure to the manager ctx.
1528        runner
1529            .manager_ctx
1530            .procedures
1531            .write()
1532            .unwrap()
1533            .insert(meta.id, runner.meta.clone());
1534
1535        runner.manager_ctx.start();
1536        runner.execute_once(&ctx).await;
1537        let state = runner.meta.state();
1538        assert!(state.is_running(), "{state:?}");
1539
1540        let procedure_id = runner
1541            .manager_ctx
1542            .poison_manager
1543            .get_poison(&poison_key.to_string())
1544            .await
1545            .unwrap();
1546        // poison key should be exist.
1547        assert!(procedure_id.is_some());
1548
1549        runner.execute_once(&ctx).await;
1550        let state = runner.meta.state();
1551        assert!(state.is_running(), "{state:?}");
1552
1553        let procedure_id = runner
1554            .manager_ctx
1555            .poison_manager
1556            .get_poison(&poison_key.to_string())
1557            .await
1558            .unwrap();
1559        // poison key should be deleted.
1560        assert!(procedure_id.is_none());
1561    }
1562
1563    #[tokio::test]
1564    async fn test_execute_error_with_clean_poisons() {
1565        common_telemetry::init_default_ut_logging();
1566        let mut times = 0;
1567        let poison_key = PoisonKey::new("table/1024");
1568        let moved_poison_key = poison_key.clone();
1569        let exec_fn = move |ctx: Context| {
1570            times += 1;
1571            let poison_key = moved_poison_key.clone();
1572            async move {
1573                if times == 1 {
1574                    // Put the poison to the context.
1575                    ctx.provider
1576                        .try_put_poison(&poison_key, ctx.procedure_id)
1577                        .await
1578                        .unwrap();
1579
1580                    Ok(Status::executing(true))
1581                } else {
1582                    Err(Error::external_and_clean_poisons(MockError::new(
1583                        StatusCode::Unexpected,
1584                    )))
1585                }
1586            }
1587            .boxed()
1588        };
1589        let poison = ProcedureAdapter {
1590            data: "poison".to_string(),
1591            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1592            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1593            exec_fn,
1594            rollback_fn: None,
1595        };
1596
1597        let dir = create_temp_dir("error_with_clean_poisons");
1598        let meta = poison.new_meta(ROOT_ID);
1599
1600        let object_store = test_util::new_object_store(&dir);
1601        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1602        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1603
1604        // Use the manager ctx as the context provider.
1605        let ctx = context_with_provider(
1606            meta.id,
1607            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1608        );
1609        // Manually add this procedure to the manager ctx.
1610        runner
1611            .manager_ctx
1612            .procedures
1613            .write()
1614            .unwrap()
1615            .insert(meta.id, runner.meta.clone());
1616
1617        runner.manager_ctx.start();
1618        runner.execute_once(&ctx).await;
1619        let state = runner.meta.state();
1620        assert!(state.is_running(), "{state:?}");
1621
1622        let procedure_id = runner
1623            .manager_ctx
1624            .poison_manager
1625            .get_poison(&poison_key.to_string())
1626            .await
1627            .unwrap();
1628        // poison key should be exist.
1629        assert!(procedure_id.is_some());
1630
1631        runner.execute_once(&ctx).await;
1632        let state = runner.meta.state();
1633        assert!(state.is_failed(), "{state:?}");
1634
1635        let procedure_id = runner
1636            .manager_ctx
1637            .poison_manager
1638            .get_poison(&poison_key.to_string())
1639            .await
1640            .unwrap();
1641        // poison key should be deleted.
1642        assert!(procedure_id.is_none());
1643    }
1644
1645    #[tokio::test]
1646    async fn test_execute_failed_after_set_poison() {
1647        let mut times = 0;
1648        let poison_key = PoisonKey::new("table/1024");
1649        let moved_poison_key = poison_key.clone();
1650        let exec_fn = move |ctx: Context| {
1651            times += 1;
1652            let poison_key = moved_poison_key.clone();
1653            async move {
1654                if times == 1 {
1655                    Ok(Status::executing(true))
1656                } else {
1657                    // Put the poison to the context.
1658                    ctx.provider
1659                        .try_put_poison(&poison_key, ctx.procedure_id)
1660                        .await
1661                        .unwrap();
1662                    Err(Error::external(MockError::new(StatusCode::Unexpected)))
1663                }
1664            }
1665            .boxed()
1666        };
1667        let poison = ProcedureAdapter {
1668            data: "poison".to_string(),
1669            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1670            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1671            exec_fn,
1672            rollback_fn: None,
1673        };
1674
1675        let dir = create_temp_dir("poison");
1676        let meta = poison.new_meta(ROOT_ID);
1677
1678        let object_store = test_util::new_object_store(&dir);
1679        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1680        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1681
1682        // Use the manager ctx as the context provider.
1683        let ctx = context_with_provider(
1684            meta.id,
1685            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1686        );
1687        // Manually add this procedure to the manager ctx.
1688        runner
1689            .manager_ctx
1690            .procedures
1691            .write()
1692            .unwrap()
1693            .insert(meta.id, runner.meta.clone());
1694
1695        runner.manager_ctx.start();
1696        runner.execute_once(&ctx).await;
1697        let state = runner.meta.state();
1698        assert!(state.is_running(), "{state:?}");
1699
1700        runner.execute_once(&ctx).await;
1701        let state = runner.meta.state();
1702        assert!(state.is_failed(), "{state:?}");
1703        assert!(meta.state().is_failed());
1704
1705        // Check the poison is set.
1706        let procedure_id = runner
1707            .manager_ctx
1708            .poison_manager
1709            .get_poison(&poison_key.to_string())
1710            .await
1711            .unwrap()
1712            .unwrap();
1713
1714        // If the procedure is poisoned, the poison key shouldn't be deleted.
1715        assert_eq!(&procedure_id.clone(), ROOT_ID);
1716    }
1717
1718    #[tokio::test]
1719    async fn test_execute_exceed_max_retry_after_set_poison() {
1720        common_telemetry::init_default_ut_logging();
1721        let mut times = 0;
1722        let poison_key = PoisonKey::new("table/1024");
1723        let moved_poison_key = poison_key.clone();
1724        let exec_fn = move |ctx: Context| {
1725            times += 1;
1726            let poison_key = moved_poison_key.clone();
1727            async move {
1728                if times == 1 {
1729                    Ok(Status::executing(true))
1730                } else {
1731                    // Put the poison to the context.
1732                    ctx.provider
1733                        .try_put_poison(&poison_key, ctx.procedure_id)
1734                        .await
1735                        .unwrap();
1736                    Err(Error::retry_later_and_clean_poisons(MockError::new(
1737                        StatusCode::Unexpected,
1738                    )))
1739                }
1740            }
1741            .boxed()
1742        };
1743        let poison = ProcedureAdapter {
1744            data: "poison".to_string(),
1745            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1746            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1747            exec_fn,
1748            rollback_fn: None,
1749        };
1750
1751        let dir = create_temp_dir("exceed_max_after_set_poison");
1752        let meta = poison.new_meta(ROOT_ID);
1753        let object_store = test_util::new_object_store(&dir);
1754        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1755        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
1756        runner.manager_ctx.start();
1757        runner.exponential_builder = ExponentialBuilder::default()
1758            .with_min_delay(Duration::from_millis(1))
1759            .with_max_times(3);
1760        // Use the manager ctx as the context provider.
1761        let ctx = context_with_provider(
1762            meta.id,
1763            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1764        );
1765        // Manually add this procedure to the manager ctx.
1766        runner
1767            .manager_ctx
1768            .procedures
1769            .write()
1770            .unwrap()
1771            .insert(meta.id, runner.meta.clone());
1772        // Run the runner and execute the procedure.
1773        runner.execute_once_with_retry(&ctx).await;
1774        let err = meta.state().error().unwrap().clone();
1775        assert_matches!(&*err, Error::RetryTimesExceeded { .. });
1776
1777        // Check the poison is deleted.
1778        let procedure_id = runner
1779            .manager_ctx
1780            .poison_manager
1781            .get_poison(&poison_key.to_string())
1782            .await
1783            .unwrap();
1784        assert_eq!(procedure_id, None);
1785    }
1786
1787    #[tokio::test]
1788    async fn test_execute_poisoned() {
1789        let mut times = 0;
1790        let poison_key = PoisonKey::new("table/1024");
1791        let moved_poison_key = poison_key.clone();
1792        let exec_fn = move |ctx: Context| {
1793            times += 1;
1794            let poison_key = moved_poison_key.clone();
1795            async move {
1796                if times == 1 {
1797                    Ok(Status::executing(true))
1798                } else {
1799                    // Put the poison to the context.
1800                    ctx.provider
1801                        .try_put_poison(&poison_key, ctx.procedure_id)
1802                        .await
1803                        .unwrap();
1804                    Ok(Status::Poisoned {
1805                        keys: PoisonKeys::new(vec![poison_key.clone()]),
1806                        error: Error::external(MockError::new(StatusCode::Unexpected)),
1807                    })
1808                }
1809            }
1810            .boxed()
1811        };
1812        let poison = ProcedureAdapter {
1813            data: "poison".to_string(),
1814            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1815            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1816            exec_fn,
1817            rollback_fn: None,
1818        };
1819
1820        let dir = create_temp_dir("poison");
1821        let meta = poison.new_meta(ROOT_ID);
1822
1823        let object_store = test_util::new_object_store(&dir);
1824        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1825        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1826
1827        // Use the manager ctx as the context provider.
1828        let ctx = context_with_provider(
1829            meta.id,
1830            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1831        );
1832        // Manually add this procedure to the manager ctx.
1833        runner
1834            .manager_ctx
1835            .procedures
1836            .write()
1837            .unwrap()
1838            .insert(meta.id, runner.meta.clone());
1839
1840        runner.manager_ctx.start();
1841        runner.execute_once(&ctx).await;
1842        let state = runner.meta.state();
1843        assert!(state.is_running(), "{state:?}");
1844
1845        runner.execute_once(&ctx).await;
1846        let state = runner.meta.state();
1847        assert!(state.is_poisoned(), "{state:?}");
1848        assert!(meta.state().is_poisoned());
1849        check_files(
1850            &object_store,
1851            &procedure_store,
1852            ctx.procedure_id,
1853            &["0000000000.step"],
1854        )
1855        .await;
1856
1857        // Check the poison is set.
1858        let procedure_id = runner
1859            .manager_ctx
1860            .poison_manager
1861            .get_poison(&poison_key.to_string())
1862            .await
1863            .unwrap()
1864            .unwrap();
1865
1866        // If the procedure is poisoned, the poison key shouldn't be deleted.
1867        assert_eq!(procedure_id, ROOT_ID);
1868    }
1869
1870    fn test_procedure_with_dynamic_lock(
1871        shared_atomic_value: Arc<AtomicU64>,
1872        id: u64,
1873    ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1874        let exec_fn = move |ctx: Context| {
1875            let moved_shared_atomic_value = shared_atomic_value.clone();
1876            let moved_ctx = ctx.clone();
1877            async move {
1878                debug!("Acquiring write lock, id: {}", id);
1879                let key = StringKey::Exclusive("test_lock".to_string());
1880                let guard = moved_ctx.provider.acquire_lock(&key).await;
1881                debug!("Acquired write lock, id: {}", id);
1882                let millis = rand::rng().random_range(10..=50);
1883                tokio::time::sleep(Duration::from_millis(millis)).await;
1884                let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1885                moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1886                debug!("Dropping write lock, id: {}", id);
1887                drop(guard);
1888
1889                Ok(Status::done())
1890            }
1891            .boxed()
1892        };
1893
1894        let adapter = ProcedureAdapter {
1895            data: "dynamic_lock".to_string(),
1896            lock_key: LockKey::new_exclusive([]),
1897            poison_keys: PoisonKeys::new([]),
1898            exec_fn,
1899            rollback_fn: None,
1900        };
1901        let meta = adapter.new_meta(ROOT_ID);
1902
1903        (Box::new(adapter), meta)
1904    }
1905
1906    #[tokio::test(flavor = "multi_thread")]
1907    async fn test_execute_with_dynamic_lock() {
1908        common_telemetry::init_default_ut_logging();
1909        let shared_atomic_value = Arc::new(AtomicU64::new(0));
1910        let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1911        let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1912
1913        let dir = create_temp_dir("dynamic_lock");
1914        let object_store = test_util::new_object_store(&dir);
1915        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1916        let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
1917        let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
1918        let ctx1 = context_with_provider(
1919            meta1.id,
1920            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1921        );
1922        let ctx2 = context_with_provider(
1923            meta2.id,
1924            // use same manager ctx as runner1
1925            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1926        );
1927        let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
1928        join_all(tasks).await;
1929        assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
1930    }
1931}