common_procedure/local/
runner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_event_recorder::EventRecorderRef;
21use common_telemetry::tracing_context::{FutureExt, TracingContext};
22use common_telemetry::{debug, error, info, tracing};
23use rand::Rng;
24use snafu::ResultExt;
25use tokio::time;
26
27use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
28use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
29use crate::procedure::{Output, StringKey};
30use crate::rwlock::OwnedKeyRwLockGuard;
31use crate::store::{ProcedureMessage, ProcedureStore};
32use crate::{
33    BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
34};
35
36/// A guard to cleanup procedure state.
37struct ProcedureGuard {
38    meta: ProcedureMetaRef,
39    manager_ctx: Arc<ManagerContext>,
40    key_guards: Vec<OwnedKeyRwLockGuard>,
41    finish: bool,
42}
43
44impl ProcedureGuard {
45    /// Returns a new [ProcedureGuard].
46    fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
47        ProcedureGuard {
48            meta,
49            manager_ctx,
50            key_guards: vec![],
51            finish: false,
52        }
53    }
54
55    /// The procedure is finished successfully.
56    fn finish(mut self) {
57        self.finish = true;
58    }
59}
60
61impl Drop for ProcedureGuard {
62    fn drop(&mut self) {
63        if !self.finish {
64            error!("Procedure {} exits unexpectedly", self.meta.id);
65
66            // Set state to failed. This is useful in test as runtime may not abort when the runner task panics.
67            // See https://github.com/tokio-rs/tokio/issues/2002 .
68            // We set set_panic_hook() in the application's main function. But our tests don't have this panic hook.
69            let err = ProcedurePanicSnafu {
70                procedure_id: self.meta.id,
71            }
72            .build();
73            self.meta.set_state(ProcedureState::failed(Arc::new(err)));
74        }
75
76        // Notify parent procedure.
77        if let Some(parent_id) = self.meta.parent_id {
78            self.manager_ctx.notify_by_subprocedure(parent_id);
79        }
80
81        // Drops the key guards in the reverse order.
82        while !self.key_guards.is_empty() {
83            self.key_guards.pop();
84        }
85
86        // Clean the staled locks.
87        self.manager_ctx
88            .key_lock
89            .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
90    }
91}
92
93pub(crate) struct Runner {
94    pub(crate) meta: ProcedureMetaRef,
95    pub(crate) procedure: BoxedProcedure,
96    pub(crate) manager_ctx: Arc<ManagerContext>,
97    pub(crate) step: u32,
98    pub(crate) exponential_builder: ExponentialBuilder,
99    pub(crate) store: Arc<ProcedureStore>,
100    pub(crate) rolling_back: bool,
101    pub(crate) event_recorder: Option<EventRecorderRef>,
102}
103
104impl Runner {
105    /// Return `ProcedureManager` is running.
106    pub(crate) fn running(&self) -> bool {
107        self.manager_ctx.running()
108    }
109
110    /// Run the procedure.
111    pub(crate) async fn run(mut self) {
112        // Ensure we can update the procedure state.
113        let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
114
115        info!(
116            "Runner {}-{} starts",
117            self.procedure.type_name(),
118            self.meta.id
119        );
120
121        // TODO(yingwen): Detect recursive locking (and deadlock) if possible. Maybe we could detect
122        // recursive locking by adding a root procedure id to the meta.
123        for key in self.meta.lock_key.keys_to_lock() {
124            // Acquire lock for each key.
125            let key_guard = match key {
126                StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
127                StringKey::Exclusive(key) => {
128                    self.manager_ctx.key_lock.write(key.clone()).await.into()
129                }
130            };
131
132            guard.key_guards.push(key_guard);
133        }
134
135        // Execute the procedure. We need to release the lock whenever the execution
136        // is successful or fail.
137        self.meta.set_start_time_ms();
138        self.execute_procedure_in_loop().await;
139        self.meta.set_end_time_ms();
140
141        // We can't remove the metadata of the procedure now as users and its parent might
142        // need to query its state.
143        // TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store
144        // so we don't need to always store the metadata in memory after the procedure is done.
145
146        // Release locks and notify parent procedure.
147        guard.finish();
148
149        // If this is the root procedure, clean up message cache.
150        if self.meta.parent_id.is_none() {
151            let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
152            // Clean resources.
153            self.manager_ctx.on_procedures_finish(&procedure_ids);
154
155            // If `ProcedureManager` is stopped, it stops the current task immediately without deleting the procedure.
156            if !self.running() {
157                return;
158            }
159
160            for id in procedure_ids {
161                if let Err(e) = self.store.delete_procedure(id).await {
162                    error!(
163                        e;
164                        "Runner {}-{} failed to delete procedure {}",
165                        self.procedure.type_name(),
166                        self.meta.id,
167                        id,
168                    );
169                }
170            }
171        }
172
173        info!(
174            "Runner {}-{} exits",
175            self.procedure.type_name(),
176            self.meta.id
177        );
178    }
179
180    async fn execute_procedure_in_loop(&mut self) {
181        let ctx = Context {
182            procedure_id: self.meta.id,
183            provider: self.manager_ctx.clone(),
184        };
185
186        self.rolling_back = false;
187        self.execute_once_with_retry(&ctx).await;
188    }
189
190    async fn execute_once_with_retry(&mut self, ctx: &Context) {
191        let mut retry = self.exponential_builder.build();
192        let mut retry_times = 0;
193
194        let mut rollback = self.exponential_builder.build();
195        let mut rollback_times = 0;
196
197        loop {
198            // Don't store state if `ProcedureManager` is stopped.
199            if !self.running() {
200                self.meta.set_state(ProcedureState::failed(Arc::new(
201                    error::ManagerNotStartSnafu {}.build(),
202                )));
203                return;
204            }
205            let state = self.meta.state();
206            match state {
207                ProcedureState::Running => {}
208                ProcedureState::Retrying { error } => {
209                    retry_times += 1;
210                    if let Some(d) = retry.next() {
211                        let millis = d.as_millis() as u64;
212                        // Add random noise to the retry delay to avoid retry storms.
213                        let noise = rand::rng().random_range(0..(millis / 4) + 1);
214                        let d = d.add(Duration::from_millis(noise));
215
216                        self.wait_on_err(d, retry_times).await;
217                    } else {
218                        self.meta
219                            .set_state(ProcedureState::prepare_rollback(Arc::new(
220                                Error::RetryTimesExceeded {
221                                    source: error.clone(),
222                                    procedure_id: self.meta.id,
223                                },
224                            )));
225                    }
226                }
227                ProcedureState::PrepareRollback { error }
228                | ProcedureState::RollingBack { error } => {
229                    rollback_times += 1;
230                    if let Some(d) = rollback.next() {
231                        self.wait_on_err(d, rollback_times).await;
232                    } else {
233                        let err = Err::<(), Arc<Error>>(error)
234                            .context(RollbackTimesExceededSnafu {
235                                procedure_id: self.meta.id,
236                            })
237                            .unwrap_err();
238                        self.meta.set_state(ProcedureState::failed(Arc::new(err)));
239                        return;
240                    }
241                }
242                ProcedureState::Done { .. } => return,
243                ProcedureState::Failed { .. } => return,
244                ProcedureState::Poisoned { .. } => return,
245            }
246            self.execute_once(ctx).await;
247        }
248    }
249
250    async fn clean_poisons(&mut self) -> Result<()> {
251        let mut error = None;
252        for key in self.meta.poison_keys.iter() {
253            let key = key.to_string();
254            if let Err(e) = self
255                .manager_ctx
256                .poison_manager
257                .delete_poison(key, self.meta.id.to_string())
258                .await
259            {
260                error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
261                error = Some(e);
262            }
263        }
264
265        // returns the last error if any.
266        if let Some(e) = error {
267            return Err(e);
268        }
269        Ok(())
270    }
271
272    async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
273        if self.procedure.rollback_supported()
274            && let Err(e) = self.procedure.rollback(ctx).await
275        {
276            self.meta
277                .set_state(ProcedureState::rolling_back(Arc::new(e)));
278            return;
279        }
280        self.meta.set_state(ProcedureState::failed(err));
281    }
282
283    async fn prepare_rollback(&mut self, err: Arc<Error>) {
284        if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
285            self.meta
286                .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
287            return;
288        }
289        if self.procedure.rollback_supported() {
290            self.meta.set_state(ProcedureState::rolling_back(err));
291        } else {
292            self.meta.set_state(ProcedureState::failed(err));
293        }
294    }
295
296    async fn execute_once(&mut self, ctx: &Context) {
297        match self.meta.state() {
298            ProcedureState::Running | ProcedureState::Retrying { .. } => {
299                match self.procedure.execute(ctx).await {
300                    Ok(status) => {
301                        debug!(
302                            "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
303                            self.procedure.type_name(),
304                            self.meta.id,
305                            status,
306                            status.need_persist(),
307                        );
308
309                        // Don't store state if `ProcedureManager` is stopped.
310                        if !self.running() {
311                            self.meta.set_state(ProcedureState::failed(Arc::new(
312                                error::ManagerNotStartSnafu {}.build(),
313                            )));
314                            return;
315                        }
316
317                        // Cleans poisons before persist.
318                        if status.need_clean_poisons()
319                            && let Err(e) = self.clean_poisons().await
320                        {
321                            error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
322                            self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
323                            return;
324                        }
325
326                        if status.need_persist()
327                            && let Err(e) = self.persist_procedure().await
328                        {
329                            error!(e; "Failed to persist procedure: {}", self.meta.id);
330                            self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
331                            return;
332                        }
333
334                        match status {
335                            Status::Executing { .. } => {
336                                let prev_state = self.meta.state();
337                                if !matches!(prev_state, ProcedureState::Running) {
338                                    info!(
339                                        "Set Procedure {}-{} state to running, prev_state: {:?}",
340                                        self.procedure.type_name(),
341                                        self.meta.id,
342                                        prev_state
343                                    );
344                                    self.meta.set_state(ProcedureState::Running);
345                                }
346                            }
347                            Status::Suspended { subprocedures, .. } => {
348                                let prev_state = self.meta.state();
349                                if !matches!(prev_state, ProcedureState::Running) {
350                                    info!(
351                                        "Set Procedure {}-{} state to running, prev_state: {:?}",
352                                        self.procedure.type_name(),
353                                        self.meta.id,
354                                        prev_state
355                                    );
356                                    self.meta.set_state(ProcedureState::Running);
357                                }
358                                self.on_suspended(subprocedures).await;
359                            }
360                            Status::Done { output } => {
361                                if let Err(e) = self.commit_procedure().await {
362                                    error!(e; "Failed to commit procedure: {}", self.meta.id);
363                                    self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
364                                    return;
365                                }
366
367                                self.done(output);
368                            }
369                            Status::Poisoned { error, keys } => {
370                                error!(
371                                    error;
372                                    "Procedure {}-{} is poisoned, keys: {:?}",
373                                    self.procedure.type_name(),
374                                    self.meta.id,
375                                    keys,
376                                );
377                                self.meta
378                                    .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
379                            }
380                        }
381                    }
382                    Err(e) => {
383                        error!(
384                            e;
385                            "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
386                            self.procedure.type_name(),
387                            self.meta.id,
388                            e.is_retry_later(),
389                            e.need_clean_poisons(),
390                        );
391
392                        // Don't store state if `ProcedureManager` is stopped.
393                        if !self.running() {
394                            self.meta.set_state(ProcedureState::failed(Arc::new(
395                                error::ManagerNotStartSnafu {}.build(),
396                            )));
397                            return;
398                        }
399
400                        if e.need_clean_poisons() {
401                            if let Err(e) = self.clean_poisons().await {
402                                error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
403                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
404                                return;
405                            }
406                            debug!(
407                                "Procedure {}-{} cleaned poisons",
408                                self.procedure.type_name(),
409                                self.meta.id,
410                            );
411                        }
412
413                        if e.is_retry_later() {
414                            self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
415                            return;
416                        }
417
418                        if self.procedure.rollback_supported() {
419                            self.meta
420                                .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
421                        } else {
422                            self.meta.set_state(ProcedureState::failed(Arc::new(e)));
423                        }
424                    }
425                }
426            }
427            ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
428            ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
429            ProcedureState::Failed { .. }
430            | ProcedureState::Done { .. }
431            | ProcedureState::Poisoned { .. } => (),
432        }
433    }
434
435    /// Submit a subprocedure with specific `procedure_id`.
436    fn submit_subprocedure(
437        &self,
438        procedure_id: ProcedureId,
439        procedure_state: ProcedureState,
440        procedure: BoxedProcedure,
441    ) {
442        if self.manager_ctx.contains_procedure(procedure_id) {
443            // If the parent has already submitted this procedure, don't submit it again.
444            return;
445        }
446
447        let step = 0;
448
449        let meta = Arc::new(ProcedureMeta::new(
450            procedure_id,
451            procedure_state,
452            Some(self.meta.id),
453            procedure.lock_key(),
454            procedure.poison_keys(),
455            procedure.type_name(),
456            self.event_recorder.clone(),
457            procedure.user_metadata(),
458        ));
459        let runner = Runner {
460            meta: meta.clone(),
461            procedure,
462            manager_ctx: self.manager_ctx.clone(),
463            step,
464            exponential_builder: self.exponential_builder,
465            store: self.store.clone(),
466            rolling_back: false,
467            event_recorder: self.event_recorder.clone(),
468        };
469
470        // Insert the procedure. We already check the procedure existence before inserting
471        // so we add an assertion to ensure the procedure id is unique and no other procedures
472        // using the same procedure id.
473        assert!(
474            self.manager_ctx.try_insert_procedure(meta),
475            "Procedure {}-{} submit an existing procedure {}-{}",
476            self.procedure.type_name(),
477            self.meta.id,
478            runner.procedure.type_name(),
479            procedure_id,
480        );
481
482        // Add the id of the subprocedure to the metadata.
483        self.meta.push_child(procedure_id);
484        let parent_id = self.meta.id;
485
486        let tracing_context = TracingContext::from_current_span();
487        let _handle = common_runtime::spawn_global(async move {
488            let span = tracing_context.attach(tracing::info_span!(
489                "LocalManager::submit_subprocedure",
490                procedure_name = %runner.meta.type_name,
491                procedure_id = %runner.meta.id,
492                parent_id = %parent_id,
493            ));
494            // Run the root procedure.
495            // The task was moved to another runtime for execution.
496            // In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
497            runner.run().trace(span).await
498        });
499    }
500
501    /// Extend the retry time to wait for the next retry.
502    async fn wait_on_err(&mut self, d: Duration, i: u64) {
503        info!(
504            "Procedure {}-{} retry for the {} times after {} millis",
505            self.procedure.type_name(),
506            self.meta.id,
507            i,
508            d.as_millis(),
509        );
510        time::sleep(d).await;
511    }
512
513    async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
514        let has_child = !subprocedures.is_empty();
515        for subprocedure in subprocedures {
516            info!(
517                "Procedure {}-{} submit subprocedure {}-{}",
518                self.procedure.type_name(),
519                self.meta.id,
520                subprocedure.procedure.type_name(),
521                subprocedure.id,
522            );
523
524            self.submit_subprocedure(
525                subprocedure.id,
526                ProcedureState::Running,
527                subprocedure.procedure,
528            );
529        }
530
531        info!(
532            "Procedure {}-{} is waiting for subprocedures",
533            self.procedure.type_name(),
534            self.meta.id,
535        );
536
537        // Wait for subprocedures.
538        if has_child {
539            self.meta.child_notify.notified().await;
540
541            info!(
542                "Procedure {}-{} is waked up",
543                self.procedure.type_name(),
544                self.meta.id,
545            );
546        }
547    }
548
549    async fn persist_procedure(&mut self) -> Result<()> {
550        let type_name = self.procedure.type_name().to_string();
551        let data = self.procedure.dump()?;
552
553        self.store
554            .store_procedure(
555                self.meta.id,
556                self.step,
557                type_name,
558                data,
559                self.meta.parent_id,
560            )
561            .await
562            .map_err(|e| {
563                error!(
564                    e; "Failed to persist procedure {}-{}",
565                    self.procedure.type_name(),
566                    self.meta.id
567                );
568                e
569            })?;
570        self.step += 1;
571        Ok(())
572    }
573
574    async fn commit_procedure(&mut self) -> Result<()> {
575        self.store
576            .commit_procedure(self.meta.id, self.step)
577            .await
578            .map_err(|e| {
579                error!(
580                    e; "Failed to commit procedure {}-{}",
581                    self.procedure.type_name(),
582                    self.meta.id
583                );
584                e
585            })?;
586        self.step += 1;
587        Ok(())
588    }
589
590    async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
591        // Persists procedure state
592        let type_name = self.procedure.type_name().to_string();
593        let data = self.procedure.dump()?;
594        let message = ProcedureMessage {
595            type_name,
596            data,
597            parent_id: self.meta.parent_id,
598            step: self.step,
599            error: Some(error),
600        };
601        self.store
602            .rollback_procedure(self.meta.id, message)
603            .await
604            .map_err(|e| {
605                error!(
606                    e; "Failed to write rollback key for procedure {}-{}",
607                    self.procedure.type_name(),
608                    self.meta.id
609                );
610                e
611            })?;
612        self.step += 1;
613        Ok(())
614    }
615
616    fn done(&self, output: Option<Output>) {
617        // TODO(yingwen): Add files to remove list.
618        info!(
619            "Procedure {}-{} done",
620            self.procedure.type_name(),
621            self.meta.id,
622        );
623
624        // Mark the state of this procedure to done.
625        self.meta.set_state(ProcedureState::Done { output });
626    }
627}
628
629#[cfg(test)]
630mod tests {
631    use std::assert_matches::assert_matches;
632    use std::sync::Arc;
633    use std::sync::atomic::{AtomicU64, Ordering};
634
635    use async_trait::async_trait;
636    use common_error::ext::{ErrorExt, PlainError};
637    use common_error::mock::MockError;
638    use common_error::status_code::StatusCode;
639    use common_test_util::temp_dir::create_temp_dir;
640    use futures::future::join_all;
641    use futures_util::FutureExt;
642    use futures_util::future::BoxFuture;
643    use object_store::{EntryMode, ObjectStore};
644    use tokio::sync::mpsc;
645    use tokio::sync::watch::Receiver;
646
647    use super::*;
648    use crate::local::{DynamicKeyLockGuard, test_util};
649    use crate::procedure::PoisonKeys;
650    use crate::store::proc_path;
651    use crate::test_util::InMemoryPoisonStore;
652    use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
653
654    const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
655
656    fn new_runner(
657        meta: ProcedureMetaRef,
658        procedure: BoxedProcedure,
659        store: Arc<ProcedureStore>,
660    ) -> Runner {
661        Runner {
662            meta,
663            procedure,
664            manager_ctx: Arc::new(ManagerContext::new(
665                Arc::new(InMemoryPoisonStore::default()),
666            )),
667            step: 0,
668            exponential_builder: ExponentialBuilder::default(),
669            store,
670            rolling_back: false,
671            event_recorder: None,
672        }
673    }
674
675    async fn check_files(
676        object_store: &ObjectStore,
677        procedure_store: &ProcedureStore,
678        procedure_id: ProcedureId,
679        files: &[&str],
680    ) {
681        let dir = proc_path!(procedure_store, "{procedure_id}/");
682        let lister = object_store.list(&dir).await.unwrap();
683        let mut files_in_dir: Vec<_> = lister
684            .into_iter()
685            .filter(|x| x.metadata().mode() == EntryMode::FILE)
686            .map(|de| de.name().to_string())
687            .collect();
688        files_in_dir.sort_unstable();
689        assert_eq!(files, files_in_dir);
690    }
691
692    fn context_with_provider(
693        procedure_id: ProcedureId,
694        provider: Arc<dyn ContextProvider>,
695    ) -> Context {
696        Context {
697            procedure_id,
698            provider,
699        }
700    }
701
702    fn context_without_provider(procedure_id: ProcedureId) -> Context {
703        struct MockProvider;
704
705        #[async_trait]
706        impl ContextProvider for MockProvider {
707            async fn procedure_state(
708                &self,
709                _procedure_id: ProcedureId,
710            ) -> Result<Option<ProcedureState>> {
711                unimplemented!()
712            }
713
714            async fn procedure_state_receiver(
715                &self,
716                _procedure_id: ProcedureId,
717            ) -> Result<Option<Receiver<ProcedureState>>> {
718                unimplemented!()
719            }
720
721            async fn try_put_poison(
722                &self,
723                _key: &PoisonKey,
724                _procedure_id: ProcedureId,
725            ) -> Result<()> {
726                unimplemented!()
727            }
728
729            async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
730                unimplemented!()
731            }
732        }
733
734        Context {
735            procedure_id,
736            provider: Arc::new(MockProvider),
737        }
738    }
739
740    type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
741
742    struct ProcedureAdapter<F> {
743        data: String,
744        lock_key: LockKey,
745        poison_keys: PoisonKeys,
746        exec_fn: F,
747        rollback_fn: Option<RollbackFn>,
748    }
749
750    impl<F> ProcedureAdapter<F> {
751        fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
752            let mut meta = test_util::procedure_meta_for_test();
753            meta.id = ProcedureId::parse_str(uuid).unwrap();
754            meta.lock_key = self.lock_key.clone();
755            meta.poison_keys = self.poison_keys.clone();
756
757            Arc::new(meta)
758        }
759    }
760
761    #[async_trait]
762    impl<F> Procedure for ProcedureAdapter<F>
763    where
764        F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
765    {
766        fn type_name(&self) -> &str {
767            "ProcedureAdapter"
768        }
769
770        async fn execute(&mut self, ctx: &Context) -> Result<Status> {
771            let f = (self.exec_fn)(ctx.clone());
772            f.await
773        }
774
775        async fn rollback(&mut self, ctx: &Context) -> Result<()> {
776            if let Some(f) = &mut self.rollback_fn {
777                return (f)(ctx.clone()).await;
778            }
779            Ok(())
780        }
781
782        fn rollback_supported(&self) -> bool {
783            self.rollback_fn.is_some()
784        }
785
786        fn dump(&self) -> Result<String> {
787            Ok(self.data.clone())
788        }
789
790        fn lock_key(&self) -> LockKey {
791            self.lock_key.clone()
792        }
793
794        fn poison_keys(&self) -> PoisonKeys {
795            self.poison_keys.clone()
796        }
797    }
798
799    async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
800        let mut times = 0;
801        let exec_fn = move |_| {
802            times += 1;
803            async move {
804                if times == 1 {
805                    Ok(Status::executing(persist))
806                } else {
807                    Ok(Status::done())
808                }
809            }
810            .boxed()
811        };
812        let normal = ProcedureAdapter {
813            data: "normal".to_string(),
814            lock_key: LockKey::single_exclusive("catalog.schema.table"),
815            poison_keys: PoisonKeys::default(),
816            exec_fn,
817            rollback_fn: None,
818        };
819
820        let dir = create_temp_dir("normal");
821        let meta = normal.new_meta(ROOT_ID);
822        let ctx = context_without_provider(meta.id);
823        let object_store = test_util::new_object_store(&dir);
824        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
825        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
826        runner.manager_ctx.start();
827
828        runner.execute_once(&ctx).await;
829        let state = runner.meta.state();
830        assert!(state.is_running(), "{state:?}");
831        check_files(
832            &object_store,
833            &procedure_store,
834            ctx.procedure_id,
835            first_files,
836        )
837        .await;
838
839        runner.execute_once(&ctx).await;
840        let state = runner.meta.state();
841        assert!(state.is_done(), "{state:?}");
842        check_files(
843            &object_store,
844            &procedure_store,
845            ctx.procedure_id,
846            second_files,
847        )
848        .await;
849    }
850
851    #[tokio::test]
852    async fn test_execute_once_normal() {
853        execute_once_normal(
854            true,
855            &["0000000000.step"],
856            &["0000000000.step", "0000000001.commit"],
857        )
858        .await;
859    }
860
861    #[tokio::test]
862    async fn test_execute_once_normal_skip_persist() {
863        execute_once_normal(false, &[], &["0000000000.commit"]).await;
864    }
865
866    #[tokio::test]
867    async fn test_on_suspend_empty() {
868        let exec_fn = move |_| {
869            async move {
870                Ok(Status::Suspended {
871                    subprocedures: Vec::new(),
872                    persist: false,
873                })
874            }
875            .boxed()
876        };
877        let suspend = ProcedureAdapter {
878            data: "suspend".to_string(),
879            lock_key: LockKey::single_exclusive("catalog.schema.table"),
880            poison_keys: PoisonKeys::default(),
881            exec_fn,
882            rollback_fn: None,
883        };
884
885        let dir = create_temp_dir("suspend");
886        let meta = suspend.new_meta(ROOT_ID);
887        let ctx = context_without_provider(meta.id);
888        let object_store = test_util::new_object_store(&dir);
889        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
890        let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
891        runner.manager_ctx.start();
892
893        runner.execute_once(&ctx).await;
894        let state = runner.meta.state();
895        assert!(state.is_running(), "{state:?}");
896    }
897
898    fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
899        let mut times = 0;
900        let exec_fn = move |_| {
901            times += 1;
902            async move {
903                if times == 1 {
904                    time::sleep(Duration::from_millis(200)).await;
905                    Ok(Status::executing(true))
906                } else {
907                    Ok(Status::done())
908                }
909            }
910            .boxed()
911        };
912        let child = ProcedureAdapter {
913            data: "child".to_string(),
914            lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
915            poison_keys: PoisonKeys::default(),
916            exec_fn,
917            rollback_fn: None,
918        };
919
920        ProcedureWithId {
921            id: procedure_id,
922            procedure: Box::new(child),
923        }
924    }
925
926    #[tokio::test]
927    async fn test_on_suspend_by_subprocedures() {
928        let mut times = 0;
929        let children_ids = [ProcedureId::random(), ProcedureId::random()];
930        let keys = [
931            &[
932                "catalog.schema.table.region-0",
933                "catalog.schema.table.region-1",
934            ],
935            &[
936                "catalog.schema.table.region-2",
937                "catalog.schema.table.region-3",
938            ],
939        ];
940
941        let exec_fn = move |ctx: Context| {
942            times += 1;
943            async move {
944                if times == 1 {
945                    // Submit subprocedures.
946                    Ok(Status::Suspended {
947                        subprocedures: children_ids
948                            .into_iter()
949                            .zip(keys)
950                            .map(|(id, key_slice)| new_child_procedure(id, key_slice))
951                            .collect(),
952                        persist: true,
953                    })
954                } else {
955                    // Wait for subprocedures.
956                    let mut all_child_done = true;
957                    for id in children_ids {
958                        let is_not_done = ctx
959                            .provider
960                            .procedure_state(id)
961                            .await
962                            .unwrap()
963                            .map(|s| !s.is_done())
964                            .unwrap_or(true);
965                        if is_not_done {
966                            all_child_done = false;
967                        }
968                    }
969                    if all_child_done {
970                        Ok(Status::done())
971                    } else {
972                        // Return suspended to wait for notify.
973                        Ok(Status::Suspended {
974                            subprocedures: Vec::new(),
975                            persist: false,
976                        })
977                    }
978                }
979            }
980            .boxed()
981        };
982        let parent = ProcedureAdapter {
983            data: "parent".to_string(),
984            lock_key: LockKey::single_exclusive("catalog.schema.table"),
985            poison_keys: PoisonKeys::default(),
986            exec_fn,
987            rollback_fn: None,
988        };
989
990        let dir = create_temp_dir("parent");
991        let meta = parent.new_meta(ROOT_ID);
992        let procedure_id = meta.id;
993
994        let object_store = test_util::new_object_store(&dir);
995        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
996        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
997        let poison_manager = Arc::new(InMemoryPoisonStore::default());
998        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
999        manager_ctx.start();
1000        // Manually add this procedure to the manager ctx.
1001        assert!(manager_ctx.try_insert_procedure(meta));
1002        // Replace the manager ctx.
1003        runner.manager_ctx = manager_ctx.clone();
1004
1005        runner.run().await;
1006        assert!(manager_ctx.key_lock.is_empty());
1007
1008        // Check child procedures.
1009        for child_id in children_ids {
1010            let state = manager_ctx.state(child_id).unwrap();
1011            assert!(state.is_done(), "{state:?}");
1012        }
1013        let state = manager_ctx.state(procedure_id).unwrap();
1014        assert!(state.is_done(), "{state:?}");
1015        // Files are removed.
1016        check_files(&object_store, &procedure_store, procedure_id, &[]).await;
1017
1018        tokio::time::sleep(Duration::from_millis(5)).await;
1019        // Clean outdated meta.
1020        manager_ctx.remove_outdated_meta(Duration::from_millis(1));
1021        assert!(manager_ctx.state(procedure_id).is_none());
1022        assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
1023        for child_id in children_ids {
1024            assert!(manager_ctx.state(child_id).is_none());
1025        }
1026    }
1027
1028    #[tokio::test]
1029    async fn test_running_is_stopped() {
1030        let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
1031        let normal = ProcedureAdapter {
1032            data: "normal".to_string(),
1033            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1034            poison_keys: PoisonKeys::default(),
1035            exec_fn,
1036            rollback_fn: None,
1037        };
1038
1039        let dir = create_temp_dir("test_running_is_stopped");
1040        let meta = normal.new_meta(ROOT_ID);
1041        let ctx = context_without_provider(meta.id);
1042        let object_store = test_util::new_object_store(&dir);
1043        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1044        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1045        runner.manager_ctx.start();
1046
1047        runner.execute_once(&ctx).await;
1048        let state = runner.meta.state();
1049        assert!(state.is_running(), "{state:?}");
1050        check_files(
1051            &object_store,
1052            &procedure_store,
1053            ctx.procedure_id,
1054            &["0000000000.step"],
1055        )
1056        .await;
1057
1058        runner.manager_ctx.stop();
1059        runner.execute_once(&ctx).await;
1060        let state = runner.meta.state();
1061        assert!(state.is_failed(), "{state:?}");
1062        // Shouldn't write any files
1063        check_files(
1064            &object_store,
1065            &procedure_store,
1066            ctx.procedure_id,
1067            &["0000000000.step"],
1068        )
1069        .await;
1070    }
1071
1072    #[tokio::test]
1073    async fn test_running_is_stopped_on_error() {
1074        let exec_fn =
1075            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1076        let normal = ProcedureAdapter {
1077            data: "fail".to_string(),
1078            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1079            poison_keys: PoisonKeys::default(),
1080            exec_fn,
1081            rollback_fn: None,
1082        };
1083
1084        let dir = create_temp_dir("test_running_is_stopped_on_error");
1085        let meta = normal.new_meta(ROOT_ID);
1086        let ctx = context_without_provider(meta.id);
1087        let object_store = test_util::new_object_store(&dir);
1088        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1089        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1090        runner.manager_ctx.stop();
1091
1092        runner.execute_once(&ctx).await;
1093        let state = runner.meta.state();
1094        assert!(state.is_failed(), "{state:?}");
1095        // Shouldn't write any files
1096        check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1097    }
1098
1099    #[tokio::test]
1100    async fn test_execute_on_error() {
1101        let exec_fn =
1102            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1103        let fail = ProcedureAdapter {
1104            data: "fail".to_string(),
1105            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1106            poison_keys: PoisonKeys::default(),
1107            exec_fn,
1108            rollback_fn: None,
1109        };
1110
1111        let dir = create_temp_dir("fail");
1112        let meta = fail.new_meta(ROOT_ID);
1113        let ctx = context_without_provider(meta.id);
1114        let object_store = test_util::new_object_store(&dir);
1115        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1116        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1117        runner.manager_ctx.start();
1118
1119        runner.execute_once(&ctx).await;
1120        let state = runner.meta.state();
1121        assert!(state.is_failed(), "{state:?}");
1122        check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1123    }
1124
1125    #[tokio::test]
1126    async fn test_execute_with_rollback_on_error() {
1127        let exec_fn =
1128            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1129        let rollback_fn = move |_| async move { Ok(()) }.boxed();
1130        let fail = ProcedureAdapter {
1131            data: "fail".to_string(),
1132            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1133            poison_keys: PoisonKeys::default(),
1134            exec_fn,
1135            rollback_fn: Some(Box::new(rollback_fn)),
1136        };
1137
1138        let dir = create_temp_dir("fail");
1139        let meta = fail.new_meta(ROOT_ID);
1140        let ctx = context_without_provider(meta.id);
1141        let object_store = test_util::new_object_store(&dir);
1142        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1143        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1144        runner.manager_ctx.start();
1145
1146        runner.execute_once(&ctx).await;
1147        let state = runner.meta.state();
1148        assert!(state.is_prepare_rollback(), "{state:?}");
1149
1150        runner.execute_once(&ctx).await;
1151        let state = runner.meta.state();
1152        assert!(state.is_rolling_back(), "{state:?}");
1153
1154        runner.execute_once(&ctx).await;
1155        let state = runner.meta.state();
1156        assert!(state.is_failed(), "{state:?}");
1157        check_files(
1158            &object_store,
1159            &procedure_store,
1160            ctx.procedure_id,
1161            &["0000000000.rollback"],
1162        )
1163        .await;
1164    }
1165
1166    #[tokio::test]
1167    async fn test_execute_on_retry_later_error() {
1168        let mut times = 0;
1169
1170        let exec_fn = move |_| {
1171            times += 1;
1172            async move {
1173                if times == 1 {
1174                    Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1175                } else if times == 2 {
1176                    Ok(Status::executing(false))
1177                } else {
1178                    Ok(Status::done())
1179                }
1180            }
1181            .boxed()
1182        };
1183
1184        let retry_later = ProcedureAdapter {
1185            data: "retry_later".to_string(),
1186            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1187            poison_keys: PoisonKeys::default(),
1188            exec_fn,
1189            rollback_fn: None,
1190        };
1191
1192        let dir = create_temp_dir("retry_later");
1193        let meta = retry_later.new_meta(ROOT_ID);
1194        let ctx = context_without_provider(meta.id);
1195        let object_store = test_util::new_object_store(&dir);
1196        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1197        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1198        runner.manager_ctx.start();
1199        runner.execute_once(&ctx).await;
1200        let state = runner.meta.state();
1201        assert!(state.is_retrying(), "{state:?}");
1202
1203        runner.execute_once(&ctx).await;
1204        let state = runner.meta.state();
1205        assert!(state.is_running(), "{state:?}");
1206
1207        runner.execute_once(&ctx).await;
1208        let state = runner.meta.state();
1209        assert!(state.is_done(), "{state:?}");
1210        assert!(meta.state().is_done());
1211        check_files(
1212            &object_store,
1213            &procedure_store,
1214            ctx.procedure_id,
1215            &["0000000000.commit"],
1216        )
1217        .await;
1218    }
1219
1220    #[tokio::test(flavor = "multi_thread")]
1221    async fn test_execute_on_retry_later_error_with_child() {
1222        common_telemetry::init_default_ut_logging();
1223        let mut times = 0;
1224        let child_id = ProcedureId::random();
1225
1226        let exec_fn = move |_| {
1227            times += 1;
1228            async move {
1229                debug!("times: {}", times);
1230                if times == 1 {
1231                    Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1232                } else if times == 2 {
1233                    let exec_fn = |_| {
1234                        async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1235                            .boxed()
1236                    };
1237                    let fail = ProcedureAdapter {
1238                        data: "fail".to_string(),
1239                        lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1240                        poison_keys: PoisonKeys::default(),
1241                        exec_fn,
1242                        rollback_fn: None,
1243                    };
1244
1245                    Ok(Status::Suspended {
1246                        subprocedures: vec![ProcedureWithId {
1247                            id: child_id,
1248                            procedure: Box::new(fail),
1249                        }],
1250                        persist: true,
1251                    })
1252                } else {
1253                    Ok(Status::done())
1254                }
1255            }
1256            .boxed()
1257        };
1258
1259        let retry_later = ProcedureAdapter {
1260            data: "retry_later".to_string(),
1261            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1262            poison_keys: PoisonKeys::default(),
1263            exec_fn,
1264            rollback_fn: None,
1265        };
1266
1267        let dir = create_temp_dir("retry_later");
1268        let meta = retry_later.new_meta(ROOT_ID);
1269        let ctx = context_without_provider(meta.id);
1270        let object_store = test_util::new_object_store(&dir);
1271        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1272        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1273        runner.manager_ctx.start();
1274        debug!("execute_once 1");
1275        runner.execute_once(&ctx).await;
1276        let state = runner.meta.state();
1277        assert!(state.is_retrying(), "{state:?}");
1278
1279        let moved_meta = meta.clone();
1280        tokio::spawn(async move {
1281            moved_meta.child_notify.notify_one();
1282        });
1283        runner.execute_once(&ctx).await;
1284        let state = runner.meta.state();
1285        assert!(state.is_running(), "{state:?}");
1286
1287        runner.execute_once(&ctx).await;
1288        let state = runner.meta.state();
1289        assert!(state.is_done(), "{state:?}");
1290        assert!(meta.state().is_done());
1291        check_files(
1292            &object_store,
1293            &procedure_store,
1294            ctx.procedure_id,
1295            &["0000000000.step", "0000000001.commit"],
1296        )
1297        .await;
1298    }
1299
1300    #[tokio::test]
1301    async fn test_execute_exceed_max_retry_later() {
1302        let exec_fn =
1303            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1304
1305        let exceed_max_retry_later = ProcedureAdapter {
1306            data: "exceed_max_retry_later".to_string(),
1307            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1308            poison_keys: PoisonKeys::default(),
1309            exec_fn,
1310            rollback_fn: None,
1311        };
1312
1313        let dir = create_temp_dir("exceed_max_retry_later");
1314        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1315        let object_store = test_util::new_object_store(&dir);
1316        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1317        let mut runner = new_runner(
1318            meta.clone(),
1319            Box::new(exceed_max_retry_later),
1320            procedure_store,
1321        );
1322        runner.manager_ctx.start();
1323
1324        runner.exponential_builder = ExponentialBuilder::default()
1325            .with_min_delay(Duration::from_millis(1))
1326            .with_max_times(3);
1327
1328        // Run the runner and execute the procedure.
1329        runner.execute_procedure_in_loop().await;
1330        let err = meta.state().error().unwrap().to_string();
1331        assert!(err.contains("Procedure retry exceeded max times"));
1332    }
1333
1334    #[tokio::test]
1335    async fn test_rollback_exceed_max_retry_later() {
1336        let exec_fn =
1337            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1338        let rollback_fn = move |_| {
1339            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1340        };
1341        let exceed_max_retry_later = ProcedureAdapter {
1342            data: "exceed_max_rollback".to_string(),
1343            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1344            poison_keys: PoisonKeys::default(),
1345            exec_fn,
1346            rollback_fn: Some(Box::new(rollback_fn)),
1347        };
1348
1349        let dir = create_temp_dir("exceed_max_rollback");
1350        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1351        let object_store = test_util::new_object_store(&dir);
1352        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1353        let mut runner = new_runner(
1354            meta.clone(),
1355            Box::new(exceed_max_retry_later),
1356            procedure_store,
1357        );
1358        runner.manager_ctx.start();
1359        runner.exponential_builder = ExponentialBuilder::default()
1360            .with_min_delay(Duration::from_millis(1))
1361            .with_max_times(3);
1362
1363        // Run the runner and execute the procedure.
1364        runner.execute_procedure_in_loop().await;
1365        let err = meta.state().error().unwrap().to_string();
1366        assert!(err.contains("Procedure rollback exceeded max times"));
1367    }
1368
1369    #[tokio::test]
1370    async fn test_rollback_after_retry_fail() {
1371        let exec_fn = move |_| {
1372            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1373        };
1374
1375        let (tx, mut rx) = mpsc::channel(1);
1376        let rollback_fn = move |_| {
1377            let tx = tx.clone();
1378            async move {
1379                tx.send(()).await.unwrap();
1380                Ok(())
1381            }
1382            .boxed()
1383        };
1384        let retry_later = ProcedureAdapter {
1385            data: "rollback_after_retry_fail".to_string(),
1386            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1387            poison_keys: PoisonKeys::default(),
1388            exec_fn,
1389            rollback_fn: Some(Box::new(rollback_fn)),
1390        };
1391
1392        let dir = create_temp_dir("retry_later");
1393        let meta = retry_later.new_meta(ROOT_ID);
1394        let ctx = context_without_provider(meta.id);
1395        let object_store = test_util::new_object_store(&dir);
1396        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1397        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1398        runner.manager_ctx.start();
1399        runner.exponential_builder = ExponentialBuilder::default()
1400            .with_min_delay(Duration::from_millis(1))
1401            .with_max_times(3);
1402        // Run the runner and execute the procedure.
1403        runner.execute_procedure_in_loop().await;
1404        rx.recv().await.unwrap();
1405        assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1406        check_files(
1407            &object_store,
1408            &procedure_store,
1409            ctx.procedure_id,
1410            &["0000000000.rollback"],
1411        )
1412        .await;
1413    }
1414
1415    #[tokio::test]
1416    async fn test_child_error() {
1417        let mut times = 0;
1418        let child_id = ProcedureId::random();
1419        common_telemetry::init_default_ut_logging();
1420        let exec_fn = move |ctx: Context| {
1421            times += 1;
1422            async move {
1423                if times == 1 {
1424                    // Submit subprocedures.
1425                    let exec_fn = |_| {
1426                        async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1427                            .boxed()
1428                    };
1429                    let fail = ProcedureAdapter {
1430                        data: "fail".to_string(),
1431                        lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1432                        poison_keys: PoisonKeys::default(),
1433                        exec_fn,
1434                        rollback_fn: None,
1435                    };
1436
1437                    Ok(Status::Suspended {
1438                        subprocedures: vec![ProcedureWithId {
1439                            id: child_id,
1440                            procedure: Box::new(fail),
1441                        }],
1442                        persist: true,
1443                    })
1444                } else {
1445                    // Wait for subprocedures.
1446                    let state = ctx.provider.procedure_state(child_id).await.unwrap();
1447                    let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1448                    if is_failed {
1449                        // The parent procedure to abort itself if child procedure is failed.
1450                        Err(Error::from_error_ext(PlainError::new(
1451                            "subprocedure failed".to_string(),
1452                            StatusCode::Unexpected,
1453                        )))
1454                    } else {
1455                        // Return suspended to wait for notify.
1456                        Ok(Status::Suspended {
1457                            subprocedures: Vec::new(),
1458                            persist: false,
1459                        })
1460                    }
1461                }
1462            }
1463            .boxed()
1464        };
1465        let parent = ProcedureAdapter {
1466            data: "parent".to_string(),
1467            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1468            poison_keys: PoisonKeys::default(),
1469            exec_fn,
1470            rollback_fn: None,
1471        };
1472
1473        let dir = create_temp_dir("child_err");
1474        let meta = parent.new_meta(ROOT_ID);
1475
1476        let object_store = test_util::new_object_store(&dir);
1477        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1478        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1479        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1480        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1481        manager_ctx.start();
1482        // Manually add this procedure to the manager ctx.
1483        assert!(manager_ctx.try_insert_procedure(meta.clone()));
1484        // Replace the manager ctx.
1485        runner.manager_ctx = manager_ctx.clone();
1486
1487        // Run the runner and execute the procedure.
1488        runner.run().await;
1489        assert!(manager_ctx.key_lock.is_empty());
1490        let err = meta.state().error().unwrap().output_msg();
1491        assert!(err.contains("subprocedure failed"), "{err}");
1492    }
1493
1494    #[tokio::test]
1495    async fn test_execute_with_clean_poisons() {
1496        common_telemetry::init_default_ut_logging();
1497        let mut times = 0;
1498        let poison_key = PoisonKey::new("table/1024");
1499        let moved_poison_key = poison_key.clone();
1500        let exec_fn = move |ctx: Context| {
1501            times += 1;
1502            let poison_key = moved_poison_key.clone();
1503            async move {
1504                if times == 1 {
1505                    // Put the poison to the context.
1506                    ctx.provider
1507                        .try_put_poison(&poison_key, ctx.procedure_id)
1508                        .await
1509                        .unwrap();
1510
1511                    Ok(Status::executing(true))
1512                } else {
1513                    Ok(Status::executing_with_clean_poisons(true))
1514                }
1515            }
1516            .boxed()
1517        };
1518        let poison = ProcedureAdapter {
1519            data: "poison".to_string(),
1520            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1521            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1522            exec_fn,
1523            rollback_fn: None,
1524        };
1525
1526        let dir = create_temp_dir("clean_poisons");
1527        let meta = poison.new_meta(ROOT_ID);
1528
1529        let object_store = test_util::new_object_store(&dir);
1530        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1531        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1532
1533        // Use the manager ctx as the context provider.
1534        let ctx = context_with_provider(
1535            meta.id,
1536            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1537        );
1538        // Manually add this procedure to the manager ctx.
1539        runner
1540            .manager_ctx
1541            .procedures
1542            .write()
1543            .unwrap()
1544            .insert(meta.id, runner.meta.clone());
1545
1546        runner.manager_ctx.start();
1547        runner.execute_once(&ctx).await;
1548        let state = runner.meta.state();
1549        assert!(state.is_running(), "{state:?}");
1550
1551        let procedure_id = runner
1552            .manager_ctx
1553            .poison_manager
1554            .get_poison(&poison_key.to_string())
1555            .await
1556            .unwrap();
1557        // poison key should be exist.
1558        assert!(procedure_id.is_some());
1559
1560        runner.execute_once(&ctx).await;
1561        let state = runner.meta.state();
1562        assert!(state.is_running(), "{state:?}");
1563
1564        let procedure_id = runner
1565            .manager_ctx
1566            .poison_manager
1567            .get_poison(&poison_key.to_string())
1568            .await
1569            .unwrap();
1570        // poison key should be deleted.
1571        assert!(procedure_id.is_none());
1572    }
1573
1574    #[tokio::test]
1575    async fn test_execute_error_with_clean_poisons() {
1576        common_telemetry::init_default_ut_logging();
1577        let mut times = 0;
1578        let poison_key = PoisonKey::new("table/1024");
1579        let moved_poison_key = poison_key.clone();
1580        let exec_fn = move |ctx: Context| {
1581            times += 1;
1582            let poison_key = moved_poison_key.clone();
1583            async move {
1584                if times == 1 {
1585                    // Put the poison to the context.
1586                    ctx.provider
1587                        .try_put_poison(&poison_key, ctx.procedure_id)
1588                        .await
1589                        .unwrap();
1590
1591                    Ok(Status::executing(true))
1592                } else {
1593                    Err(Error::external_and_clean_poisons(MockError::new(
1594                        StatusCode::Unexpected,
1595                    )))
1596                }
1597            }
1598            .boxed()
1599        };
1600        let poison = ProcedureAdapter {
1601            data: "poison".to_string(),
1602            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1603            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1604            exec_fn,
1605            rollback_fn: None,
1606        };
1607
1608        let dir = create_temp_dir("error_with_clean_poisons");
1609        let meta = poison.new_meta(ROOT_ID);
1610
1611        let object_store = test_util::new_object_store(&dir);
1612        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1613        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1614
1615        // Use the manager ctx as the context provider.
1616        let ctx = context_with_provider(
1617            meta.id,
1618            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1619        );
1620        // Manually add this procedure to the manager ctx.
1621        runner
1622            .manager_ctx
1623            .procedures
1624            .write()
1625            .unwrap()
1626            .insert(meta.id, runner.meta.clone());
1627
1628        runner.manager_ctx.start();
1629        runner.execute_once(&ctx).await;
1630        let state = runner.meta.state();
1631        assert!(state.is_running(), "{state:?}");
1632
1633        let procedure_id = runner
1634            .manager_ctx
1635            .poison_manager
1636            .get_poison(&poison_key.to_string())
1637            .await
1638            .unwrap();
1639        // poison key should be exist.
1640        assert!(procedure_id.is_some());
1641
1642        runner.execute_once(&ctx).await;
1643        let state = runner.meta.state();
1644        assert!(state.is_failed(), "{state:?}");
1645
1646        let procedure_id = runner
1647            .manager_ctx
1648            .poison_manager
1649            .get_poison(&poison_key.to_string())
1650            .await
1651            .unwrap();
1652        // poison key should be deleted.
1653        assert!(procedure_id.is_none());
1654    }
1655
1656    #[tokio::test]
1657    async fn test_execute_failed_after_set_poison() {
1658        let mut times = 0;
1659        let poison_key = PoisonKey::new("table/1024");
1660        let moved_poison_key = poison_key.clone();
1661        let exec_fn = move |ctx: Context| {
1662            times += 1;
1663            let poison_key = moved_poison_key.clone();
1664            async move {
1665                if times == 1 {
1666                    Ok(Status::executing(true))
1667                } else {
1668                    // Put the poison to the context.
1669                    ctx.provider
1670                        .try_put_poison(&poison_key, ctx.procedure_id)
1671                        .await
1672                        .unwrap();
1673                    Err(Error::external(MockError::new(StatusCode::Unexpected)))
1674                }
1675            }
1676            .boxed()
1677        };
1678        let poison = ProcedureAdapter {
1679            data: "poison".to_string(),
1680            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1681            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1682            exec_fn,
1683            rollback_fn: None,
1684        };
1685
1686        let dir = create_temp_dir("poison");
1687        let meta = poison.new_meta(ROOT_ID);
1688
1689        let object_store = test_util::new_object_store(&dir);
1690        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1691        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1692
1693        // Use the manager ctx as the context provider.
1694        let ctx = context_with_provider(
1695            meta.id,
1696            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1697        );
1698        // Manually add this procedure to the manager ctx.
1699        runner
1700            .manager_ctx
1701            .procedures
1702            .write()
1703            .unwrap()
1704            .insert(meta.id, runner.meta.clone());
1705
1706        runner.manager_ctx.start();
1707        runner.execute_once(&ctx).await;
1708        let state = runner.meta.state();
1709        assert!(state.is_running(), "{state:?}");
1710
1711        runner.execute_once(&ctx).await;
1712        let state = runner.meta.state();
1713        assert!(state.is_failed(), "{state:?}");
1714        assert!(meta.state().is_failed());
1715
1716        // Check the poison is set.
1717        let procedure_id = runner
1718            .manager_ctx
1719            .poison_manager
1720            .get_poison(&poison_key.to_string())
1721            .await
1722            .unwrap()
1723            .unwrap();
1724
1725        // If the procedure is poisoned, the poison key shouldn't be deleted.
1726        assert_eq!(&procedure_id.clone(), ROOT_ID);
1727    }
1728
1729    #[tokio::test]
1730    async fn test_execute_exceed_max_retry_after_set_poison() {
1731        common_telemetry::init_default_ut_logging();
1732        let mut times = 0;
1733        let poison_key = PoisonKey::new("table/1024");
1734        let moved_poison_key = poison_key.clone();
1735        let exec_fn = move |ctx: Context| {
1736            times += 1;
1737            let poison_key = moved_poison_key.clone();
1738            async move {
1739                if times == 1 {
1740                    Ok(Status::executing(true))
1741                } else {
1742                    // Put the poison to the context.
1743                    ctx.provider
1744                        .try_put_poison(&poison_key, ctx.procedure_id)
1745                        .await
1746                        .unwrap();
1747                    Err(Error::retry_later_and_clean_poisons(MockError::new(
1748                        StatusCode::Unexpected,
1749                    )))
1750                }
1751            }
1752            .boxed()
1753        };
1754        let poison = ProcedureAdapter {
1755            data: "poison".to_string(),
1756            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1757            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1758            exec_fn,
1759            rollback_fn: None,
1760        };
1761
1762        let dir = create_temp_dir("exceed_max_after_set_poison");
1763        let meta = poison.new_meta(ROOT_ID);
1764        let object_store = test_util::new_object_store(&dir);
1765        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1766        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
1767        runner.manager_ctx.start();
1768        runner.exponential_builder = ExponentialBuilder::default()
1769            .with_min_delay(Duration::from_millis(1))
1770            .with_max_times(3);
1771        // Use the manager ctx as the context provider.
1772        let ctx = context_with_provider(
1773            meta.id,
1774            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1775        );
1776        // Manually add this procedure to the manager ctx.
1777        runner
1778            .manager_ctx
1779            .procedures
1780            .write()
1781            .unwrap()
1782            .insert(meta.id, runner.meta.clone());
1783        // Run the runner and execute the procedure.
1784        runner.execute_once_with_retry(&ctx).await;
1785        let err = meta.state().error().unwrap().clone();
1786        assert_matches!(&*err, Error::RetryTimesExceeded { .. });
1787
1788        // Check the poison is deleted.
1789        let procedure_id = runner
1790            .manager_ctx
1791            .poison_manager
1792            .get_poison(&poison_key.to_string())
1793            .await
1794            .unwrap();
1795        assert_eq!(procedure_id, None);
1796    }
1797
1798    #[tokio::test]
1799    async fn test_execute_poisoned() {
1800        let mut times = 0;
1801        let poison_key = PoisonKey::new("table/1024");
1802        let moved_poison_key = poison_key.clone();
1803        let exec_fn = move |ctx: Context| {
1804            times += 1;
1805            let poison_key = moved_poison_key.clone();
1806            async move {
1807                if times == 1 {
1808                    Ok(Status::executing(true))
1809                } else {
1810                    // Put the poison to the context.
1811                    ctx.provider
1812                        .try_put_poison(&poison_key, ctx.procedure_id)
1813                        .await
1814                        .unwrap();
1815                    Ok(Status::Poisoned {
1816                        keys: PoisonKeys::new(vec![poison_key.clone()]),
1817                        error: Error::external(MockError::new(StatusCode::Unexpected)),
1818                    })
1819                }
1820            }
1821            .boxed()
1822        };
1823        let poison = ProcedureAdapter {
1824            data: "poison".to_string(),
1825            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1826            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1827            exec_fn,
1828            rollback_fn: None,
1829        };
1830
1831        let dir = create_temp_dir("poison");
1832        let meta = poison.new_meta(ROOT_ID);
1833
1834        let object_store = test_util::new_object_store(&dir);
1835        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1836        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1837
1838        // Use the manager ctx as the context provider.
1839        let ctx = context_with_provider(
1840            meta.id,
1841            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1842        );
1843        // Manually add this procedure to the manager ctx.
1844        runner
1845            .manager_ctx
1846            .procedures
1847            .write()
1848            .unwrap()
1849            .insert(meta.id, runner.meta.clone());
1850
1851        runner.manager_ctx.start();
1852        runner.execute_once(&ctx).await;
1853        let state = runner.meta.state();
1854        assert!(state.is_running(), "{state:?}");
1855
1856        runner.execute_once(&ctx).await;
1857        let state = runner.meta.state();
1858        assert!(state.is_poisoned(), "{state:?}");
1859        assert!(meta.state().is_poisoned());
1860        check_files(
1861            &object_store,
1862            &procedure_store,
1863            ctx.procedure_id,
1864            &["0000000000.step"],
1865        )
1866        .await;
1867
1868        // Check the poison is set.
1869        let procedure_id = runner
1870            .manager_ctx
1871            .poison_manager
1872            .get_poison(&poison_key.to_string())
1873            .await
1874            .unwrap()
1875            .unwrap();
1876
1877        // If the procedure is poisoned, the poison key shouldn't be deleted.
1878        assert_eq!(procedure_id, ROOT_ID);
1879    }
1880
1881    fn test_procedure_with_dynamic_lock(
1882        shared_atomic_value: Arc<AtomicU64>,
1883        id: u64,
1884    ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1885        let exec_fn = move |ctx: Context| {
1886            let moved_shared_atomic_value = shared_atomic_value.clone();
1887            let moved_ctx = ctx.clone();
1888            async move {
1889                debug!("Acquiring write lock, id: {}", id);
1890                let key = StringKey::Exclusive("test_lock".to_string());
1891                let guard = moved_ctx.provider.acquire_lock(&key).await;
1892                debug!("Acquired write lock, id: {}", id);
1893                let millis = rand::rng().random_range(10..=50);
1894                tokio::time::sleep(Duration::from_millis(millis)).await;
1895                let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1896                moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1897                debug!("Dropping write lock, id: {}", id);
1898                drop(guard);
1899
1900                Ok(Status::done())
1901            }
1902            .boxed()
1903        };
1904
1905        let adapter = ProcedureAdapter {
1906            data: "dynamic_lock".to_string(),
1907            lock_key: LockKey::new_exclusive([]),
1908            poison_keys: PoisonKeys::new([]),
1909            exec_fn,
1910            rollback_fn: None,
1911        };
1912        let meta = adapter.new_meta(ROOT_ID);
1913
1914        (Box::new(adapter), meta)
1915    }
1916
1917    #[tokio::test(flavor = "multi_thread")]
1918    async fn test_execute_with_dynamic_lock() {
1919        common_telemetry::init_default_ut_logging();
1920        let shared_atomic_value = Arc::new(AtomicU64::new(0));
1921        let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1922        let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1923
1924        let dir = create_temp_dir("dynamic_lock");
1925        let object_store = test_util::new_object_store(&dir);
1926        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1927        let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
1928        let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
1929        let ctx1 = context_with_provider(
1930            meta1.id,
1931            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1932        );
1933        let ctx2 = context_with_provider(
1934            meta2.id,
1935            // use same manager ctx as runner1
1936            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
1937        );
1938        let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
1939        join_all(tasks).await;
1940        assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
1941    }
1942}