common_procedure/local/
runner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Add;
16use std::sync::Arc;
17use std::time::Duration;
18
19use backon::{BackoffBuilder, ExponentialBuilder};
20use common_error::ext::PlainError;
21use common_error::status_code::StatusCode;
22use common_event_recorder::EventRecorderRef;
23use common_telemetry::tracing_context::{FutureExt, TracingContext};
24use common_telemetry::{debug, error, info, tracing};
25use rand::Rng;
26use snafu::ResultExt;
27use tokio::time;
28
29use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
30use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
31use crate::procedure::{Output, StringKey};
32use crate::rwlock::OwnedKeyRwLockGuard;
33use crate::store::{ProcedureMessage, ProcedureStore};
34use crate::{
35    BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
36};
37
38/// A guard to cleanup procedure state.
39struct ProcedureGuard {
40    meta: ProcedureMetaRef,
41    manager_ctx: Arc<ManagerContext>,
42    key_guards: Vec<OwnedKeyRwLockGuard>,
43    finish: bool,
44}
45
46impl ProcedureGuard {
47    /// Returns a new [ProcedureGuard].
48    fn new(meta: ProcedureMetaRef, manager_ctx: Arc<ManagerContext>) -> ProcedureGuard {
49        ProcedureGuard {
50            meta,
51            manager_ctx,
52            key_guards: vec![],
53            finish: false,
54        }
55    }
56
57    /// The procedure is finished successfully.
58    fn finish(mut self) {
59        self.finish = true;
60    }
61}
62
63impl Drop for ProcedureGuard {
64    fn drop(&mut self) {
65        if !self.finish {
66            error!("Procedure {} exits unexpectedly", self.meta.id);
67
68            // Set state to failed. This is useful in test as runtime may not abort when the runner task panics.
69            // See https://github.com/tokio-rs/tokio/issues/2002 .
70            // We set set_panic_hook() in the application's main function. But our tests don't have this panic hook.
71            let err = ProcedurePanicSnafu {
72                procedure_id: self.meta.id,
73            }
74            .build();
75            self.meta.set_state(ProcedureState::failed(Arc::new(err)));
76        }
77
78        // Notify parent procedure.
79        if let Some(parent_id) = self.meta.parent_id {
80            self.manager_ctx.notify_by_subprocedure(parent_id);
81        }
82
83        // Drops the key guards in the reverse order.
84        while !self.key_guards.is_empty() {
85            self.key_guards.pop();
86        }
87
88        // Clean the staled locks.
89        self.manager_ctx
90            .key_lock
91            .clean_keys(self.meta.lock_key.keys_to_lock().map(|k| k.as_string()));
92    }
93}
94
95/// Returns a list of conflicting lock keys between a parent and a child procedure.
96/// Evaluates the Read/Write lock compatibility matrix:
97/// - Share + Share => Compatible
98/// - Exclusive + Any => Conflict
99/// - Any + Exclusive => Conflict
100fn find_lock_conflicts<'a>(
101    parent_keys: impl Iterator<Item = &'a StringKey>,
102    child_keys: impl Iterator<Item = &'a StringKey>,
103) -> Vec<String> {
104    use std::collections::HashMap;
105
106    // Map from key string slice (&str) to a boolean indicating if the parent holds it EXCLUSIVELY.
107    let mut parent_map = HashMap::new();
108    for key in parent_keys {
109        match key {
110            StringKey::Exclusive(k) => {
111                parent_map.insert(k.as_str(), true);
112            }
113            StringKey::Share(k) => {
114                parent_map.entry(k.as_str()).or_insert(false);
115            }
116        }
117    }
118
119    child_keys
120        .filter_map(|child_key| match child_key {
121            StringKey::Exclusive(k) | StringKey::Share(k)
122                if parent_map.get(k.as_str()) == Some(&true) =>
123            {
124                Some(k.clone())
125            }
126            StringKey::Exclusive(k) if parent_map.get(k.as_str()) == Some(&false) => {
127                Some(k.clone())
128            }
129            _ => None,
130        })
131        .collect()
132}
133
134pub(crate) struct Runner {
135    pub(crate) meta: ProcedureMetaRef,
136    pub(crate) procedure: BoxedProcedure,
137    pub(crate) manager_ctx: Arc<ManagerContext>,
138    pub(crate) step: u32,
139    pub(crate) exponential_builder: ExponentialBuilder,
140    pub(crate) store: Arc<ProcedureStore>,
141    pub(crate) rolling_back: bool,
142    pub(crate) event_recorder: Option<EventRecorderRef>,
143}
144
145impl Runner {
146    /// Return `ProcedureManager` is running.
147    pub(crate) fn running(&self) -> bool {
148        self.manager_ctx.running()
149    }
150
151    /// Run the procedure.
152    pub(crate) async fn run(mut self) {
153        // Ensure we can update the procedure state.
154        let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
155
156        info!(
157            "Runner {}-{} starts",
158            self.procedure.type_name(),
159            self.meta.id
160        );
161
162        // TODO(yingwen): Detect recursive locking (and deadlock) if possible. Maybe we could detect
163        // recursive locking by adding a root procedure id to the meta.
164        for key in self.meta.lock_key.keys_to_lock() {
165            // Acquire lock for each key.
166            let key_guard = match key {
167                StringKey::Share(key) => self.manager_ctx.key_lock.read(key.clone()).await.into(),
168                StringKey::Exclusive(key) => {
169                    self.manager_ctx.key_lock.write(key.clone()).await.into()
170                }
171            };
172
173            guard.key_guards.push(key_guard);
174        }
175
176        // Execute the procedure. We need to release the lock whenever the execution
177        // is successful or fail.
178        self.meta.set_start_time_ms();
179        self.execute_procedure_in_loop().await;
180        self.meta.set_end_time_ms();
181
182        // We can't remove the metadata of the procedure now as users and its parent might
183        // need to query its state.
184        // TODO(yingwen): 1. Add TTL to the metadata; 2. Only keep state in the procedure store
185        // so we don't need to always store the metadata in memory after the procedure is done.
186
187        // Release locks and notify parent procedure.
188        guard.finish();
189
190        // If this is the root procedure, clean up message cache.
191        if self.meta.parent_id.is_none() {
192            let procedure_ids = self.manager_ctx.procedures_in_tree(&self.meta);
193            // Clean resources.
194            self.manager_ctx.on_procedures_finish(&procedure_ids);
195
196            // If `ProcedureManager` is stopped, it stops the current task immediately without deleting the procedure.
197            if !self.running() {
198                return;
199            }
200
201            for id in procedure_ids {
202                if let Err(e) = self.store.delete_procedure(id).await {
203                    error!(
204                        e;
205                        "Runner {}-{} failed to delete procedure {}",
206                        self.procedure.type_name(),
207                        self.meta.id,
208                        id,
209                    );
210                }
211            }
212        }
213
214        info!(
215            "Runner {}-{} exits",
216            self.procedure.type_name(),
217            self.meta.id
218        );
219    }
220
221    async fn execute_procedure_in_loop(&mut self) {
222        let ctx = Context {
223            procedure_id: self.meta.id,
224            provider: self.manager_ctx.clone(),
225        };
226
227        self.rolling_back = false;
228        self.execute_once_with_retry(&ctx).await;
229    }
230
231    async fn execute_once_with_retry(&mut self, ctx: &Context) {
232        let mut retry = self.exponential_builder.build();
233        let mut retry_times = 0;
234
235        let mut rollback = self.exponential_builder.build();
236        let mut rollback_times = 0;
237
238        loop {
239            // Don't store state if `ProcedureManager` is stopped.
240            if !self.running() {
241                self.meta.set_state(ProcedureState::failed(Arc::new(
242                    error::ManagerNotStartSnafu {}.build(),
243                )));
244                return;
245            }
246            let state = self.meta.state();
247            match state {
248                ProcedureState::Running => {}
249                ProcedureState::Retrying { error } => {
250                    retry_times += 1;
251                    if let Some(d) = retry.next() {
252                        let millis = d.as_millis() as u64;
253                        // Add random noise to the retry delay to avoid retry storms.
254                        let noise = rand::rng().random_range(0..(millis / 4) + 1);
255                        let d = d.add(Duration::from_millis(noise));
256
257                        self.wait_on_err(d, retry_times).await;
258                    } else {
259                        self.meta
260                            .set_state(ProcedureState::prepare_rollback(Arc::new(
261                                Error::RetryTimesExceeded {
262                                    source: error.clone(),
263                                    procedure_id: self.meta.id,
264                                },
265                            )));
266                    }
267                }
268                ProcedureState::PrepareRollback { error }
269                | ProcedureState::RollingBack { error } => {
270                    rollback_times += 1;
271                    if let Some(d) = rollback.next() {
272                        self.wait_on_err(d, rollback_times).await;
273                    } else {
274                        let err = Err::<(), Arc<Error>>(error)
275                            .context(RollbackTimesExceededSnafu {
276                                procedure_id: self.meta.id,
277                            })
278                            .unwrap_err();
279                        self.meta.set_state(ProcedureState::failed(Arc::new(err)));
280                        return;
281                    }
282                }
283                ProcedureState::Done { .. } => return,
284                ProcedureState::Failed { .. } => return,
285                ProcedureState::Poisoned { .. } => return,
286            }
287            self.execute_once(ctx).await;
288        }
289    }
290
291    async fn clean_poisons(&mut self) -> Result<()> {
292        let mut error = None;
293        for key in self.meta.poison_keys.iter() {
294            let key = key.to_string();
295            if let Err(e) = self
296                .manager_ctx
297                .poison_manager
298                .delete_poison(key, self.meta.id.to_string())
299                .await
300            {
301                error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
302                error = Some(e);
303            }
304        }
305
306        // returns the last error if any.
307        if let Some(e) = error {
308            return Err(e);
309        }
310        Ok(())
311    }
312
313    async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
314        if self.procedure.rollback_supported()
315            && let Err(e) = self.procedure.rollback(ctx).await
316        {
317            self.meta
318                .set_state(ProcedureState::rolling_back(Arc::new(e)));
319            return;
320        }
321        self.meta.set_state(ProcedureState::failed(err));
322    }
323
324    async fn prepare_rollback(&mut self, err: Arc<Error>) {
325        if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
326            self.meta
327                .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
328            return;
329        }
330        if self.procedure.rollback_supported() {
331            self.meta.set_state(ProcedureState::rolling_back(err));
332        } else {
333            self.meta.set_state(ProcedureState::failed(err));
334        }
335    }
336
337    async fn execute_once(&mut self, ctx: &Context) {
338        match self.meta.state() {
339            ProcedureState::Running | ProcedureState::Retrying { .. } => {
340                match self.procedure.execute(ctx).await {
341                    Ok(status) => {
342                        debug!(
343                            "Execute procedure {}-{} once, status: {:?}, need_persist: {}",
344                            self.procedure.type_name(),
345                            self.meta.id,
346                            status,
347                            status.need_persist(),
348                        );
349
350                        // Don't store state if `ProcedureManager` is stopped.
351                        if !self.running() {
352                            self.meta.set_state(ProcedureState::failed(Arc::new(
353                                error::ManagerNotStartSnafu {}.build(),
354                            )));
355                            return;
356                        }
357
358                        // Cleans poisons before persist.
359                        if status.need_clean_poisons()
360                            && let Err(e) = self.clean_poisons().await
361                        {
362                            error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
363                            self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
364                            return;
365                        }
366
367                        if status.need_persist()
368                            && let Err(e) = self.persist_procedure().await
369                        {
370                            error!(e; "Failed to persist procedure: {}", self.meta.id);
371                            self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
372                            return;
373                        }
374
375                        match status {
376                            Status::Executing { .. } => {
377                                let prev_state = self.meta.state();
378                                if !matches!(prev_state, ProcedureState::Running) {
379                                    info!(
380                                        "Set Procedure {}-{} state to running, prev_state: {:?}",
381                                        self.procedure.type_name(),
382                                        self.meta.id,
383                                        prev_state
384                                    );
385                                    self.meta.set_state(ProcedureState::Running);
386                                }
387                            }
388                            Status::Suspended { subprocedures, .. } => {
389                                let prev_state = self.meta.state();
390                                if !matches!(prev_state, ProcedureState::Running) {
391                                    info!(
392                                        "Set Procedure {}-{} state to running, prev_state: {:?}",
393                                        self.procedure.type_name(),
394                                        self.meta.id,
395                                        prev_state
396                                    );
397                                    self.meta.set_state(ProcedureState::Running);
398                                }
399                                self.on_suspended(subprocedures).await;
400                            }
401                            Status::Done { output } => {
402                                if let Err(e) = self.commit_procedure().await {
403                                    error!(e; "Failed to commit procedure: {}", self.meta.id);
404                                    self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
405                                    return;
406                                }
407
408                                self.done(output);
409                            }
410                            Status::Poisoned { error, keys } => {
411                                error!(
412                                    error;
413                                    "Procedure {}-{} is poisoned, keys: {:?}",
414                                    self.procedure.type_name(),
415                                    self.meta.id,
416                                    keys,
417                                );
418                                self.meta
419                                    .set_state(ProcedureState::poisoned(keys, Arc::new(error)));
420                            }
421                        }
422                    }
423                    Err(e) => {
424                        error!(
425                            e;
426                            "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
427                            self.procedure.type_name(),
428                            self.meta.id,
429                            e.is_retry_later(),
430                            e.need_clean_poisons(),
431                        );
432
433                        // Don't store state if `ProcedureManager` is stopped.
434                        if !self.running() {
435                            self.meta.set_state(ProcedureState::failed(Arc::new(
436                                error::ManagerNotStartSnafu {}.build(),
437                            )));
438                            return;
439                        }
440
441                        if e.need_clean_poisons() {
442                            if let Err(e) = self.clean_poisons().await {
443                                error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
444                                self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
445                                return;
446                            }
447                            debug!(
448                                "Procedure {}-{} cleaned poisons",
449                                self.procedure.type_name(),
450                                self.meta.id,
451                            );
452                        }
453
454                        if e.is_retry_later() {
455                            self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
456                            return;
457                        }
458
459                        if self.procedure.rollback_supported() {
460                            self.meta
461                                .set_state(ProcedureState::prepare_rollback(Arc::new(e)));
462                        } else {
463                            self.meta.set_state(ProcedureState::failed(Arc::new(e)));
464                        }
465                    }
466                }
467            }
468            ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
469            ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
470            ProcedureState::Failed { .. }
471            | ProcedureState::Done { .. }
472            | ProcedureState::Poisoned { .. } => (),
473        }
474    }
475
476    /// Submit a subprocedure with specific `procedure_id`.
477    fn submit_subprocedure(
478        &self,
479        procedure_id: ProcedureId,
480        procedure_state: ProcedureState,
481        procedure: BoxedProcedure,
482    ) {
483        if self.manager_ctx.contains_procedure(procedure_id) {
484            // If the parent has already submitted this procedure, don't submit it again.
485            return;
486        }
487
488        let step = 0;
489
490        let meta = Arc::new(ProcedureMeta::new(
491            procedure_id,
492            procedure_state,
493            Some(self.meta.id),
494            procedure.lock_key(),
495            procedure.poison_keys(),
496            procedure.type_name(),
497            self.event_recorder.clone(),
498            procedure.user_metadata(),
499        ));
500        let runner = Runner {
501            meta: meta.clone(),
502            procedure,
503            manager_ctx: self.manager_ctx.clone(),
504            step,
505            exponential_builder: self.exponential_builder,
506            store: self.store.clone(),
507            rolling_back: false,
508            event_recorder: self.event_recorder.clone(),
509        };
510
511        // Insert the procedure. We already check the procedure existence before inserting
512        // so we add an assertion to ensure the procedure id is unique and no other procedures
513        // using the same procedure id.
514        assert!(
515            self.manager_ctx.try_insert_procedure(meta),
516            "Procedure {}-{} submit an existing procedure {}-{}",
517            self.procedure.type_name(),
518            self.meta.id,
519            runner.procedure.type_name(),
520            procedure_id,
521        );
522
523        // Add the id of the subprocedure to the metadata.
524        self.meta.push_child(procedure_id);
525        let parent_id = self.meta.id;
526
527        let tracing_context = TracingContext::from_current_span();
528        let _handle = common_runtime::spawn_global(async move {
529            let span = tracing_context.attach(tracing::info_span!(
530                "LocalManager::submit_subprocedure",
531                procedure_name = %runner.meta.type_name,
532                procedure_id = %runner.meta.id,
533                parent_id = %parent_id,
534            ));
535            // Run the root procedure.
536            // The task was moved to another runtime for execution.
537            // In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
538            runner.run().trace(span).await
539        });
540    }
541
542    /// Extend the retry time to wait for the next retry.
543    async fn wait_on_err(&mut self, d: Duration, i: u64) {
544        info!(
545            "Procedure {}-{} retry for the {} times after {} millis",
546            self.procedure.type_name(),
547            self.meta.id,
548            i,
549            d.as_millis(),
550        );
551        time::sleep(d).await;
552    }
553
554    async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
555        let has_child = !subprocedures.is_empty();
556
557        // Pre-check: detect potential deadlocks BEFORE submitting any subprocedure.
558        // If a child shares conflicting lock keys with the parent, submitting it would
559        // cause a Hold-and-Wait deadlock — the child blocks on lock acquisition while
560        // the parent holds the lock and waits for the child to finish.
561        for sub in &subprocedures {
562            let conflicting = find_lock_conflicts(
563                self.meta.lock_key.keys_to_lock(),
564                sub.procedure.lock_key().keys_to_lock(),
565            );
566            if !conflicting.is_empty() {
567                let err_msg = format!(
568                    "Deadlock prevented: subprocedure {}-{} shares conflicting lock key(s) {:?} \
569                     with parent {}-{}. Parent holds these locks and would wait for child \
570                     completion, but child cannot acquire them.",
571                    sub.procedure.type_name(),
572                    sub.id,
573                    conflicting,
574                    self.procedure.type_name(),
575                    self.meta.id,
576                );
577                error!("{}", err_msg);
578                let err = Arc::new(Error::external(PlainError::new(
579                    err_msg,
580                    StatusCode::Internal,
581                )));
582                if self.procedure.rollback_supported() {
583                    self.meta.set_state(ProcedureState::prepare_rollback(err));
584                } else {
585                    self.meta.set_state(ProcedureState::failed(err));
586                }
587                return;
588            }
589        }
590
591        for subprocedure in subprocedures {
592            info!(
593                "Procedure {}-{} submit subprocedure {}-{}",
594                self.procedure.type_name(),
595                self.meta.id,
596                subprocedure.procedure.type_name(),
597                subprocedure.id,
598            );
599
600            self.submit_subprocedure(
601                subprocedure.id,
602                ProcedureState::Running,
603                subprocedure.procedure,
604            );
605        }
606
607        info!(
608            "Procedure {}-{} is waiting for subprocedures",
609            self.procedure.type_name(),
610            self.meta.id,
611        );
612
613        // Wait for subprocedures.
614        if has_child {
615            self.meta.child_notify.notified().await;
616
617            info!(
618                "Procedure {}-{} is waked up",
619                self.procedure.type_name(),
620                self.meta.id,
621            );
622        }
623    }
624
625    async fn persist_procedure(&mut self) -> Result<()> {
626        let type_name = self.procedure.type_name().to_string();
627        let data = self.procedure.dump()?;
628
629        self.store
630            .store_procedure(
631                self.meta.id,
632                self.step,
633                type_name,
634                data,
635                self.meta.parent_id,
636            )
637            .await
638            .map_err(|e| {
639                error!(
640                    e; "Failed to persist procedure {}-{}",
641                    self.procedure.type_name(),
642                    self.meta.id
643                );
644                e
645            })?;
646        self.step += 1;
647        Ok(())
648    }
649
650    async fn commit_procedure(&mut self) -> Result<()> {
651        self.store
652            .commit_procedure(self.meta.id, self.step)
653            .await
654            .map_err(|e| {
655                error!(
656                    e; "Failed to commit procedure {}-{}",
657                    self.procedure.type_name(),
658                    self.meta.id
659                );
660                e
661            })?;
662        self.step += 1;
663        Ok(())
664    }
665
666    async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
667        // Persists procedure state
668        let type_name = self.procedure.type_name().to_string();
669        let data = self.procedure.dump()?;
670        let message = ProcedureMessage {
671            type_name,
672            data,
673            parent_id: self.meta.parent_id,
674            step: self.step,
675            error: Some(error),
676        };
677        self.store
678            .rollback_procedure(self.meta.id, message)
679            .await
680            .map_err(|e| {
681                error!(
682                    e; "Failed to write rollback key for procedure {}-{}",
683                    self.procedure.type_name(),
684                    self.meta.id
685                );
686                e
687            })?;
688        self.step += 1;
689        Ok(())
690    }
691
692    fn done(&self, output: Option<Output>) {
693        // TODO(yingwen): Add files to remove list.
694        info!(
695            "Procedure {}-{} done",
696            self.procedure.type_name(),
697            self.meta.id,
698        );
699
700        // Mark the state of this procedure to done.
701        self.meta.set_state(ProcedureState::Done { output });
702    }
703}
704
705#[cfg(test)]
706mod tests {
707    use std::assert_matches::assert_matches;
708    use std::sync::Arc;
709    use std::sync::atomic::{AtomicU64, Ordering};
710
711    use async_trait::async_trait;
712    use common_error::ext::{ErrorExt, PlainError};
713    use common_error::mock::MockError;
714    use common_error::status_code::StatusCode;
715    use common_test_util::temp_dir::create_temp_dir;
716    use futures::future::join_all;
717    use futures_util::FutureExt;
718    use futures_util::future::BoxFuture;
719    use object_store::{EntryMode, ObjectStore};
720    use tokio::sync::mpsc;
721    use tokio::sync::watch::Receiver;
722
723    use super::*;
724    use crate::local::{DynamicKeyLockGuard, test_util};
725    use crate::procedure::PoisonKeys;
726    use crate::store::proc_path;
727    use crate::test_util::InMemoryPoisonStore;
728    use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
729
730    const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
731
732    fn new_runner(
733        meta: ProcedureMetaRef,
734        procedure: BoxedProcedure,
735        store: Arc<ProcedureStore>,
736    ) -> Runner {
737        Runner {
738            meta,
739            procedure,
740            manager_ctx: Arc::new(ManagerContext::new(
741                Arc::new(InMemoryPoisonStore::default()),
742            )),
743            step: 0,
744            exponential_builder: ExponentialBuilder::default(),
745            store,
746            rolling_back: false,
747            event_recorder: None,
748        }
749    }
750
751    async fn check_files(
752        object_store: &ObjectStore,
753        procedure_store: &ProcedureStore,
754        procedure_id: ProcedureId,
755        files: &[&str],
756    ) {
757        let dir = proc_path!(procedure_store, "{procedure_id}/");
758        let lister = object_store.list(&dir).await.unwrap();
759        let mut files_in_dir: Vec<_> = lister
760            .into_iter()
761            .filter(|x| x.metadata().mode() == EntryMode::FILE)
762            .map(|de| de.name().to_string())
763            .collect();
764        files_in_dir.sort_unstable();
765        assert_eq!(files, files_in_dir);
766    }
767
768    fn context_with_provider(
769        procedure_id: ProcedureId,
770        provider: Arc<dyn ContextProvider>,
771    ) -> Context {
772        Context {
773            procedure_id,
774            provider,
775        }
776    }
777
778    fn context_without_provider(procedure_id: ProcedureId) -> Context {
779        struct MockProvider;
780
781        #[async_trait]
782        impl ContextProvider for MockProvider {
783            async fn procedure_state(
784                &self,
785                _procedure_id: ProcedureId,
786            ) -> Result<Option<ProcedureState>> {
787                unimplemented!()
788            }
789
790            async fn procedure_state_receiver(
791                &self,
792                _procedure_id: ProcedureId,
793            ) -> Result<Option<Receiver<ProcedureState>>> {
794                unimplemented!()
795            }
796
797            async fn try_put_poison(
798                &self,
799                _key: &PoisonKey,
800                _procedure_id: ProcedureId,
801            ) -> Result<()> {
802                unimplemented!()
803            }
804
805            async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
806                unimplemented!()
807            }
808        }
809
810        Context {
811            procedure_id,
812            provider: Arc::new(MockProvider),
813        }
814    }
815
816    type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
817
818    struct ProcedureAdapter<F> {
819        data: String,
820        lock_key: LockKey,
821        poison_keys: PoisonKeys,
822        exec_fn: F,
823        rollback_fn: Option<RollbackFn>,
824    }
825
826    impl<F> ProcedureAdapter<F> {
827        fn new_meta(&self, uuid: &str) -> ProcedureMetaRef {
828            let mut meta = test_util::procedure_meta_for_test();
829            meta.id = ProcedureId::parse_str(uuid).unwrap();
830            meta.lock_key = self.lock_key.clone();
831            meta.poison_keys = self.poison_keys.clone();
832
833            Arc::new(meta)
834        }
835    }
836
837    #[async_trait]
838    impl<F> Procedure for ProcedureAdapter<F>
839    where
840        F: FnMut(Context) -> BoxFuture<'static, Result<Status>> + Send + Sync,
841    {
842        fn type_name(&self) -> &str {
843            "ProcedureAdapter"
844        }
845
846        async fn execute(&mut self, ctx: &Context) -> Result<Status> {
847            let f = (self.exec_fn)(ctx.clone());
848            f.await
849        }
850
851        async fn rollback(&mut self, ctx: &Context) -> Result<()> {
852            if let Some(f) = &mut self.rollback_fn {
853                return (f)(ctx.clone()).await;
854            }
855            Ok(())
856        }
857
858        fn rollback_supported(&self) -> bool {
859            self.rollback_fn.is_some()
860        }
861
862        fn dump(&self) -> Result<String> {
863            Ok(self.data.clone())
864        }
865
866        fn lock_key(&self) -> LockKey {
867            self.lock_key.clone()
868        }
869
870        fn poison_keys(&self) -> PoisonKeys {
871            self.poison_keys.clone()
872        }
873    }
874
875    async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
876        let mut times = 0;
877        let exec_fn = move |_| {
878            times += 1;
879            async move {
880                if times == 1 {
881                    Ok(Status::executing(persist))
882                } else {
883                    Ok(Status::done())
884                }
885            }
886            .boxed()
887        };
888        let normal = ProcedureAdapter {
889            data: "normal".to_string(),
890            lock_key: LockKey::single_exclusive("catalog.schema.table"),
891            poison_keys: PoisonKeys::default(),
892            exec_fn,
893            rollback_fn: None,
894        };
895
896        let dir = create_temp_dir("normal");
897        let meta = normal.new_meta(ROOT_ID);
898        let ctx = context_without_provider(meta.id);
899        let object_store = test_util::new_object_store(&dir);
900        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
901        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
902        runner.manager_ctx.start();
903
904        runner.execute_once(&ctx).await;
905        let state = runner.meta.state();
906        assert!(state.is_running(), "{state:?}");
907        check_files(
908            &object_store,
909            &procedure_store,
910            ctx.procedure_id,
911            first_files,
912        )
913        .await;
914
915        runner.execute_once(&ctx).await;
916        let state = runner.meta.state();
917        assert!(state.is_done(), "{state:?}");
918        check_files(
919            &object_store,
920            &procedure_store,
921            ctx.procedure_id,
922            second_files,
923        )
924        .await;
925    }
926
927    #[tokio::test]
928    async fn test_execute_once_normal() {
929        execute_once_normal(
930            true,
931            &["0000000000.step"],
932            &["0000000000.step", "0000000001.commit"],
933        )
934        .await;
935    }
936
937    #[tokio::test]
938    async fn test_execute_once_normal_skip_persist() {
939        execute_once_normal(false, &[], &["0000000000.commit"]).await;
940    }
941
942    #[tokio::test]
943    async fn test_on_suspend_empty() {
944        let exec_fn = move |_| {
945            async move {
946                Ok(Status::Suspended {
947                    subprocedures: Vec::new(),
948                    persist: false,
949                })
950            }
951            .boxed()
952        };
953        let suspend = ProcedureAdapter {
954            data: "suspend".to_string(),
955            lock_key: LockKey::single_exclusive("catalog.schema.table"),
956            poison_keys: PoisonKeys::default(),
957            exec_fn,
958            rollback_fn: None,
959        };
960
961        let dir = create_temp_dir("suspend");
962        let meta = suspend.new_meta(ROOT_ID);
963        let ctx = context_without_provider(meta.id);
964        let object_store = test_util::new_object_store(&dir);
965        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
966        let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
967        runner.manager_ctx.start();
968
969        runner.execute_once(&ctx).await;
970        let state = runner.meta.state();
971        assert!(state.is_running(), "{state:?}");
972    }
973
974    fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
975        let mut times = 0;
976        let exec_fn = move |_| {
977            times += 1;
978            async move {
979                if times == 1 {
980                    time::sleep(Duration::from_millis(200)).await;
981                    Ok(Status::executing(true))
982                } else {
983                    Ok(Status::done())
984                }
985            }
986            .boxed()
987        };
988        let child = ProcedureAdapter {
989            data: "child".to_string(),
990            lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
991            poison_keys: PoisonKeys::default(),
992            exec_fn,
993            rollback_fn: None,
994        };
995
996        ProcedureWithId {
997            id: procedure_id,
998            procedure: Box::new(child),
999        }
1000    }
1001
1002    #[tokio::test]
1003    async fn test_on_suspend_by_subprocedures() {
1004        let mut times = 0;
1005        let children_ids = [ProcedureId::random(), ProcedureId::random()];
1006        let keys = [
1007            &[
1008                "catalog.schema.table.region-0",
1009                "catalog.schema.table.region-1",
1010            ],
1011            &[
1012                "catalog.schema.table.region-2",
1013                "catalog.schema.table.region-3",
1014            ],
1015        ];
1016
1017        let exec_fn = move |ctx: Context| {
1018            times += 1;
1019            async move {
1020                if times == 1 {
1021                    // Submit subprocedures.
1022                    Ok(Status::Suspended {
1023                        subprocedures: children_ids
1024                            .into_iter()
1025                            .zip(keys)
1026                            .map(|(id, key_slice)| new_child_procedure(id, key_slice))
1027                            .collect(),
1028                        persist: true,
1029                    })
1030                } else {
1031                    // Wait for subprocedures.
1032                    let mut all_child_done = true;
1033                    for id in children_ids {
1034                        let is_not_done = ctx
1035                            .provider
1036                            .procedure_state(id)
1037                            .await
1038                            .unwrap()
1039                            .map(|s| !s.is_done())
1040                            .unwrap_or(true);
1041                        if is_not_done {
1042                            all_child_done = false;
1043                        }
1044                    }
1045                    if all_child_done {
1046                        Ok(Status::done())
1047                    } else {
1048                        // Return suspended to wait for notify.
1049                        Ok(Status::Suspended {
1050                            subprocedures: Vec::new(),
1051                            persist: false,
1052                        })
1053                    }
1054                }
1055            }
1056            .boxed()
1057        };
1058        let parent = ProcedureAdapter {
1059            data: "parent".to_string(),
1060            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1061            poison_keys: PoisonKeys::default(),
1062            exec_fn,
1063            rollback_fn: None,
1064        };
1065
1066        let dir = create_temp_dir("parent");
1067        let meta = parent.new_meta(ROOT_ID);
1068        let procedure_id = meta.id;
1069
1070        let object_store = test_util::new_object_store(&dir);
1071        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1072        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
1073        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1074        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1075        manager_ctx.start();
1076        // Manually add this procedure to the manager ctx.
1077        assert!(manager_ctx.try_insert_procedure(meta));
1078        // Replace the manager ctx.
1079        runner.manager_ctx = manager_ctx.clone();
1080
1081        runner.run().await;
1082        assert!(manager_ctx.key_lock.is_empty());
1083
1084        // Check child procedures.
1085        for child_id in children_ids {
1086            let state = manager_ctx.state(child_id).unwrap();
1087            assert!(state.is_done(), "{state:?}");
1088        }
1089        let state = manager_ctx.state(procedure_id).unwrap();
1090        assert!(state.is_done(), "{state:?}");
1091        // Files are removed.
1092        check_files(&object_store, &procedure_store, procedure_id, &[]).await;
1093
1094        tokio::time::sleep(Duration::from_millis(5)).await;
1095        // Clean outdated meta.
1096        manager_ctx.remove_outdated_meta(Duration::from_millis(1));
1097        assert!(manager_ctx.state(procedure_id).is_none());
1098        assert!(manager_ctx.finished_procedures.lock().unwrap().is_empty());
1099        for child_id in children_ids {
1100            assert!(manager_ctx.state(child_id).is_none());
1101        }
1102    }
1103
1104    #[tokio::test]
1105    async fn test_running_is_stopped() {
1106        let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
1107        let normal = ProcedureAdapter {
1108            data: "normal".to_string(),
1109            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1110            poison_keys: PoisonKeys::default(),
1111            exec_fn,
1112            rollback_fn: None,
1113        };
1114
1115        let dir = create_temp_dir("test_running_is_stopped");
1116        let meta = normal.new_meta(ROOT_ID);
1117        let ctx = context_without_provider(meta.id);
1118        let object_store = test_util::new_object_store(&dir);
1119        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1120        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1121        runner.manager_ctx.start();
1122
1123        runner.execute_once(&ctx).await;
1124        let state = runner.meta.state();
1125        assert!(state.is_running(), "{state:?}");
1126        check_files(
1127            &object_store,
1128            &procedure_store,
1129            ctx.procedure_id,
1130            &["0000000000.step"],
1131        )
1132        .await;
1133
1134        runner.manager_ctx.stop();
1135        runner.execute_once(&ctx).await;
1136        let state = runner.meta.state();
1137        assert!(state.is_failed(), "{state:?}");
1138        // Shouldn't write any files
1139        check_files(
1140            &object_store,
1141            &procedure_store,
1142            ctx.procedure_id,
1143            &["0000000000.step"],
1144        )
1145        .await;
1146    }
1147
1148    #[tokio::test]
1149    async fn test_running_is_stopped_on_error() {
1150        let exec_fn =
1151            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1152        let normal = ProcedureAdapter {
1153            data: "fail".to_string(),
1154            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1155            poison_keys: PoisonKeys::default(),
1156            exec_fn,
1157            rollback_fn: None,
1158        };
1159
1160        let dir = create_temp_dir("test_running_is_stopped_on_error");
1161        let meta = normal.new_meta(ROOT_ID);
1162        let ctx = context_without_provider(meta.id);
1163        let object_store = test_util::new_object_store(&dir);
1164        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1165        let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
1166        runner.manager_ctx.stop();
1167
1168        runner.execute_once(&ctx).await;
1169        let state = runner.meta.state();
1170        assert!(state.is_failed(), "{state:?}");
1171        // Shouldn't write any files
1172        check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1173    }
1174
1175    #[tokio::test]
1176    async fn test_execute_on_error() {
1177        let exec_fn =
1178            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1179        let fail = ProcedureAdapter {
1180            data: "fail".to_string(),
1181            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1182            poison_keys: PoisonKeys::default(),
1183            exec_fn,
1184            rollback_fn: None,
1185        };
1186
1187        let dir = create_temp_dir("fail");
1188        let meta = fail.new_meta(ROOT_ID);
1189        let ctx = context_without_provider(meta.id);
1190        let object_store = test_util::new_object_store(&dir);
1191        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1192        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1193        runner.manager_ctx.start();
1194
1195        runner.execute_once(&ctx).await;
1196        let state = runner.meta.state();
1197        assert!(state.is_failed(), "{state:?}");
1198        check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
1199    }
1200
1201    #[tokio::test]
1202    async fn test_execute_with_rollback_on_error() {
1203        let exec_fn =
1204            |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
1205        let rollback_fn = move |_| async move { Ok(()) }.boxed();
1206        let fail = ProcedureAdapter {
1207            data: "fail".to_string(),
1208            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1209            poison_keys: PoisonKeys::default(),
1210            exec_fn,
1211            rollback_fn: Some(Box::new(rollback_fn)),
1212        };
1213
1214        let dir = create_temp_dir("fail");
1215        let meta = fail.new_meta(ROOT_ID);
1216        let ctx = context_without_provider(meta.id);
1217        let object_store = test_util::new_object_store(&dir);
1218        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1219        let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
1220        runner.manager_ctx.start();
1221
1222        runner.execute_once(&ctx).await;
1223        let state = runner.meta.state();
1224        assert!(state.is_prepare_rollback(), "{state:?}");
1225
1226        runner.execute_once(&ctx).await;
1227        let state = runner.meta.state();
1228        assert!(state.is_rolling_back(), "{state:?}");
1229
1230        runner.execute_once(&ctx).await;
1231        let state = runner.meta.state();
1232        assert!(state.is_failed(), "{state:?}");
1233        check_files(
1234            &object_store,
1235            &procedure_store,
1236            ctx.procedure_id,
1237            &["0000000000.rollback"],
1238        )
1239        .await;
1240    }
1241
1242    #[tokio::test]
1243    async fn test_execute_on_retry_later_error() {
1244        let mut times = 0;
1245
1246        let exec_fn = move |_| {
1247            times += 1;
1248            async move {
1249                if times == 1 {
1250                    Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1251                } else if times == 2 {
1252                    Ok(Status::executing(false))
1253                } else {
1254                    Ok(Status::done())
1255                }
1256            }
1257            .boxed()
1258        };
1259
1260        let retry_later = ProcedureAdapter {
1261            data: "retry_later".to_string(),
1262            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1263            poison_keys: PoisonKeys::default(),
1264            exec_fn,
1265            rollback_fn: None,
1266        };
1267
1268        let dir = create_temp_dir("retry_later");
1269        let meta = retry_later.new_meta(ROOT_ID);
1270        let ctx = context_without_provider(meta.id);
1271        let object_store = test_util::new_object_store(&dir);
1272        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1273        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1274        runner.manager_ctx.start();
1275        runner.execute_once(&ctx).await;
1276        let state = runner.meta.state();
1277        assert!(state.is_retrying(), "{state:?}");
1278
1279        runner.execute_once(&ctx).await;
1280        let state = runner.meta.state();
1281        assert!(state.is_running(), "{state:?}");
1282
1283        runner.execute_once(&ctx).await;
1284        let state = runner.meta.state();
1285        assert!(state.is_done(), "{state:?}");
1286        assert!(meta.state().is_done());
1287        check_files(
1288            &object_store,
1289            &procedure_store,
1290            ctx.procedure_id,
1291            &["0000000000.commit"],
1292        )
1293        .await;
1294    }
1295
1296    #[tokio::test(flavor = "multi_thread")]
1297    async fn test_execute_on_retry_later_error_with_child() {
1298        common_telemetry::init_default_ut_logging();
1299        let mut times = 0;
1300        let child_id = ProcedureId::random();
1301
1302        let exec_fn = move |_| {
1303            times += 1;
1304            async move {
1305                debug!("times: {}", times);
1306                if times == 1 {
1307                    Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
1308                } else if times == 2 {
1309                    let exec_fn = |_| {
1310                        async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1311                            .boxed()
1312                    };
1313                    let fail = ProcedureAdapter {
1314                        data: "fail".to_string(),
1315                        lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1316                        poison_keys: PoisonKeys::default(),
1317                        exec_fn,
1318                        rollback_fn: None,
1319                    };
1320
1321                    Ok(Status::Suspended {
1322                        subprocedures: vec![ProcedureWithId {
1323                            id: child_id,
1324                            procedure: Box::new(fail),
1325                        }],
1326                        persist: true,
1327                    })
1328                } else {
1329                    Ok(Status::done())
1330                }
1331            }
1332            .boxed()
1333        };
1334
1335        let retry_later = ProcedureAdapter {
1336            data: "retry_later".to_string(),
1337            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1338            poison_keys: PoisonKeys::default(),
1339            exec_fn,
1340            rollback_fn: None,
1341        };
1342
1343        let dir = create_temp_dir("retry_later");
1344        let meta = retry_later.new_meta(ROOT_ID);
1345        let ctx = context_without_provider(meta.id);
1346        let object_store = test_util::new_object_store(&dir);
1347        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1348        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1349        runner.manager_ctx.start();
1350        debug!("execute_once 1");
1351        runner.execute_once(&ctx).await;
1352        let state = runner.meta.state();
1353        assert!(state.is_retrying(), "{state:?}");
1354
1355        let moved_meta = meta.clone();
1356        tokio::spawn(async move {
1357            moved_meta.child_notify.notify_one();
1358        });
1359        runner.execute_once(&ctx).await;
1360        let state = runner.meta.state();
1361        assert!(state.is_running(), "{state:?}");
1362
1363        runner.execute_once(&ctx).await;
1364        let state = runner.meta.state();
1365        assert!(state.is_done(), "{state:?}");
1366        assert!(meta.state().is_done());
1367        check_files(
1368            &object_store,
1369            &procedure_store,
1370            ctx.procedure_id,
1371            &["0000000000.step", "0000000001.commit"],
1372        )
1373        .await;
1374    }
1375
1376    #[tokio::test]
1377    async fn test_execute_exceed_max_retry_later() {
1378        let exec_fn =
1379            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1380
1381        let exceed_max_retry_later = ProcedureAdapter {
1382            data: "exceed_max_retry_later".to_string(),
1383            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1384            poison_keys: PoisonKeys::default(),
1385            exec_fn,
1386            rollback_fn: None,
1387        };
1388
1389        let dir = create_temp_dir("exceed_max_retry_later");
1390        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1391        let object_store = test_util::new_object_store(&dir);
1392        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1393        let mut runner = new_runner(
1394            meta.clone(),
1395            Box::new(exceed_max_retry_later),
1396            procedure_store,
1397        );
1398        runner.manager_ctx.start();
1399
1400        runner.exponential_builder = ExponentialBuilder::default()
1401            .with_min_delay(Duration::from_millis(1))
1402            .with_max_times(3);
1403
1404        // Run the runner and execute the procedure.
1405        runner.execute_procedure_in_loop().await;
1406        let err = meta.state().error().unwrap().to_string();
1407        assert!(err.contains("Procedure retry exceeded max times"));
1408    }
1409
1410    #[tokio::test]
1411    async fn test_rollback_exceed_max_retry_later() {
1412        let exec_fn =
1413            |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
1414        let rollback_fn = move |_| {
1415            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1416        };
1417        let exceed_max_retry_later = ProcedureAdapter {
1418            data: "exceed_max_rollback".to_string(),
1419            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1420            poison_keys: PoisonKeys::default(),
1421            exec_fn,
1422            rollback_fn: Some(Box::new(rollback_fn)),
1423        };
1424
1425        let dir = create_temp_dir("exceed_max_rollback");
1426        let meta = exceed_max_retry_later.new_meta(ROOT_ID);
1427        let object_store = test_util::new_object_store(&dir);
1428        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1429        let mut runner = new_runner(
1430            meta.clone(),
1431            Box::new(exceed_max_retry_later),
1432            procedure_store,
1433        );
1434        runner.manager_ctx.start();
1435        runner.exponential_builder = ExponentialBuilder::default()
1436            .with_min_delay(Duration::from_millis(1))
1437            .with_max_times(3);
1438
1439        // Run the runner and execute the procedure.
1440        runner.execute_procedure_in_loop().await;
1441        let err = meta.state().error().unwrap().to_string();
1442        assert!(err.contains("Procedure rollback exceeded max times"));
1443    }
1444
1445    #[tokio::test]
1446    async fn test_rollback_after_retry_fail() {
1447        let exec_fn = move |_| {
1448            async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
1449        };
1450
1451        let (tx, mut rx) = mpsc::channel(1);
1452        let rollback_fn = move |_| {
1453            let tx = tx.clone();
1454            async move {
1455                tx.send(()).await.unwrap();
1456                Ok(())
1457            }
1458            .boxed()
1459        };
1460        let retry_later = ProcedureAdapter {
1461            data: "rollback_after_retry_fail".to_string(),
1462            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1463            poison_keys: PoisonKeys::default(),
1464            exec_fn,
1465            rollback_fn: Some(Box::new(rollback_fn)),
1466        };
1467
1468        let dir = create_temp_dir("retry_later");
1469        let meta = retry_later.new_meta(ROOT_ID);
1470        let ctx = context_without_provider(meta.id);
1471        let object_store = test_util::new_object_store(&dir);
1472        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1473        let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
1474        runner.manager_ctx.start();
1475        runner.exponential_builder = ExponentialBuilder::default()
1476            .with_min_delay(Duration::from_millis(1))
1477            .with_max_times(3);
1478        // Run the runner and execute the procedure.
1479        runner.execute_procedure_in_loop().await;
1480        rx.recv().await.unwrap();
1481        assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
1482        check_files(
1483            &object_store,
1484            &procedure_store,
1485            ctx.procedure_id,
1486            &["0000000000.rollback"],
1487        )
1488        .await;
1489    }
1490
1491    #[tokio::test]
1492    async fn test_child_error() {
1493        let mut times = 0;
1494        let child_id = ProcedureId::random();
1495        common_telemetry::init_default_ut_logging();
1496        let exec_fn = move |ctx: Context| {
1497            times += 1;
1498            async move {
1499                if times == 1 {
1500                    // Submit subprocedures.
1501                    let exec_fn = |_| {
1502                        async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
1503                            .boxed()
1504                    };
1505                    let fail = ProcedureAdapter {
1506                        data: "fail".to_string(),
1507                        lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
1508                        poison_keys: PoisonKeys::default(),
1509                        exec_fn,
1510                        rollback_fn: None,
1511                    };
1512
1513                    Ok(Status::Suspended {
1514                        subprocedures: vec![ProcedureWithId {
1515                            id: child_id,
1516                            procedure: Box::new(fail),
1517                        }],
1518                        persist: true,
1519                    })
1520                } else {
1521                    // Wait for subprocedures.
1522                    let state = ctx.provider.procedure_state(child_id).await.unwrap();
1523                    let is_failed = state.map(|s| s.is_failed()).unwrap_or(false);
1524                    if is_failed {
1525                        // The parent procedure to abort itself if child procedure is failed.
1526                        Err(Error::from_error_ext(PlainError::new(
1527                            "subprocedure failed".to_string(),
1528                            StatusCode::Unexpected,
1529                        )))
1530                    } else {
1531                        // Return suspended to wait for notify.
1532                        Ok(Status::Suspended {
1533                            subprocedures: Vec::new(),
1534                            persist: false,
1535                        })
1536                    }
1537                }
1538            }
1539            .boxed()
1540        };
1541        let parent = ProcedureAdapter {
1542            data: "parent".to_string(),
1543            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1544            poison_keys: PoisonKeys::default(),
1545            exec_fn,
1546            rollback_fn: None,
1547        };
1548
1549        let dir = create_temp_dir("child_err");
1550        let meta = parent.new_meta(ROOT_ID);
1551
1552        let object_store = test_util::new_object_store(&dir);
1553        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1554        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
1555        let poison_manager = Arc::new(InMemoryPoisonStore::default());
1556        let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
1557        manager_ctx.start();
1558        // Manually add this procedure to the manager ctx.
1559        assert!(manager_ctx.try_insert_procedure(meta.clone()));
1560        // Replace the manager ctx.
1561        runner.manager_ctx = manager_ctx.clone();
1562
1563        // Run the runner and execute the procedure.
1564        runner.run().await;
1565        assert!(manager_ctx.key_lock.is_empty());
1566        let err = meta.state().error().unwrap().output_msg();
1567        assert!(err.contains("subprocedure failed"), "{err}");
1568    }
1569
1570    #[tokio::test]
1571    async fn test_execute_with_clean_poisons() {
1572        common_telemetry::init_default_ut_logging();
1573        let mut times = 0;
1574        let poison_key = PoisonKey::new("table/1024");
1575        let moved_poison_key = poison_key.clone();
1576        let exec_fn = move |ctx: Context| {
1577            times += 1;
1578            let poison_key = moved_poison_key.clone();
1579            async move {
1580                if times == 1 {
1581                    // Put the poison to the context.
1582                    ctx.provider
1583                        .try_put_poison(&poison_key, ctx.procedure_id)
1584                        .await
1585                        .unwrap();
1586
1587                    Ok(Status::executing(true))
1588                } else {
1589                    Ok(Status::executing_with_clean_poisons(true))
1590                }
1591            }
1592            .boxed()
1593        };
1594        let poison = ProcedureAdapter {
1595            data: "poison".to_string(),
1596            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1597            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1598            exec_fn,
1599            rollback_fn: None,
1600        };
1601
1602        let dir = create_temp_dir("clean_poisons");
1603        let meta = poison.new_meta(ROOT_ID);
1604
1605        let object_store = test_util::new_object_store(&dir);
1606        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1607        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1608
1609        // Use the manager ctx as the context provider.
1610        let ctx = context_with_provider(
1611            meta.id,
1612            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1613        );
1614        // Manually add this procedure to the manager ctx.
1615        runner
1616            .manager_ctx
1617            .procedures
1618            .write()
1619            .unwrap()
1620            .insert(meta.id, runner.meta.clone());
1621
1622        runner.manager_ctx.start();
1623        runner.execute_once(&ctx).await;
1624        let state = runner.meta.state();
1625        assert!(state.is_running(), "{state:?}");
1626
1627        let procedure_id = runner
1628            .manager_ctx
1629            .poison_manager
1630            .get_poison(&poison_key.to_string())
1631            .await
1632            .unwrap();
1633        // poison key should be exist.
1634        assert!(procedure_id.is_some());
1635
1636        runner.execute_once(&ctx).await;
1637        let state = runner.meta.state();
1638        assert!(state.is_running(), "{state:?}");
1639
1640        let procedure_id = runner
1641            .manager_ctx
1642            .poison_manager
1643            .get_poison(&poison_key.to_string())
1644            .await
1645            .unwrap();
1646        // poison key should be deleted.
1647        assert!(procedure_id.is_none());
1648    }
1649
1650    #[tokio::test]
1651    async fn test_execute_error_with_clean_poisons() {
1652        common_telemetry::init_default_ut_logging();
1653        let mut times = 0;
1654        let poison_key = PoisonKey::new("table/1024");
1655        let moved_poison_key = poison_key.clone();
1656        let exec_fn = move |ctx: Context| {
1657            times += 1;
1658            let poison_key = moved_poison_key.clone();
1659            async move {
1660                if times == 1 {
1661                    // Put the poison to the context.
1662                    ctx.provider
1663                        .try_put_poison(&poison_key, ctx.procedure_id)
1664                        .await
1665                        .unwrap();
1666
1667                    Ok(Status::executing(true))
1668                } else {
1669                    Err(Error::external_and_clean_poisons(MockError::new(
1670                        StatusCode::Unexpected,
1671                    )))
1672                }
1673            }
1674            .boxed()
1675        };
1676        let poison = ProcedureAdapter {
1677            data: "poison".to_string(),
1678            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1679            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1680            exec_fn,
1681            rollback_fn: None,
1682        };
1683
1684        let dir = create_temp_dir("error_with_clean_poisons");
1685        let meta = poison.new_meta(ROOT_ID);
1686
1687        let object_store = test_util::new_object_store(&dir);
1688        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1689        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1690
1691        // Use the manager ctx as the context provider.
1692        let ctx = context_with_provider(
1693            meta.id,
1694            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1695        );
1696        // Manually add this procedure to the manager ctx.
1697        runner
1698            .manager_ctx
1699            .procedures
1700            .write()
1701            .unwrap()
1702            .insert(meta.id, runner.meta.clone());
1703
1704        runner.manager_ctx.start();
1705        runner.execute_once(&ctx).await;
1706        let state = runner.meta.state();
1707        assert!(state.is_running(), "{state:?}");
1708
1709        let procedure_id = runner
1710            .manager_ctx
1711            .poison_manager
1712            .get_poison(&poison_key.to_string())
1713            .await
1714            .unwrap();
1715        // poison key should be exist.
1716        assert!(procedure_id.is_some());
1717
1718        runner.execute_once(&ctx).await;
1719        let state = runner.meta.state();
1720        assert!(state.is_failed(), "{state:?}");
1721
1722        let procedure_id = runner
1723            .manager_ctx
1724            .poison_manager
1725            .get_poison(&poison_key.to_string())
1726            .await
1727            .unwrap();
1728        // poison key should be deleted.
1729        assert!(procedure_id.is_none());
1730    }
1731
1732    #[tokio::test]
1733    async fn test_execute_failed_after_set_poison() {
1734        let mut times = 0;
1735        let poison_key = PoisonKey::new("table/1024");
1736        let moved_poison_key = poison_key.clone();
1737        let exec_fn = move |ctx: Context| {
1738            times += 1;
1739            let poison_key = moved_poison_key.clone();
1740            async move {
1741                if times == 1 {
1742                    Ok(Status::executing(true))
1743                } else {
1744                    // Put the poison to the context.
1745                    ctx.provider
1746                        .try_put_poison(&poison_key, ctx.procedure_id)
1747                        .await
1748                        .unwrap();
1749                    Err(Error::external(MockError::new(StatusCode::Unexpected)))
1750                }
1751            }
1752            .boxed()
1753        };
1754        let poison = ProcedureAdapter {
1755            data: "poison".to_string(),
1756            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1757            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1758            exec_fn,
1759            rollback_fn: None,
1760        };
1761
1762        let dir = create_temp_dir("poison");
1763        let meta = poison.new_meta(ROOT_ID);
1764
1765        let object_store = test_util::new_object_store(&dir);
1766        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1767        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1768
1769        // Use the manager ctx as the context provider.
1770        let ctx = context_with_provider(
1771            meta.id,
1772            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1773        );
1774        // Manually add this procedure to the manager ctx.
1775        runner
1776            .manager_ctx
1777            .procedures
1778            .write()
1779            .unwrap()
1780            .insert(meta.id, runner.meta.clone());
1781
1782        runner.manager_ctx.start();
1783        runner.execute_once(&ctx).await;
1784        let state = runner.meta.state();
1785        assert!(state.is_running(), "{state:?}");
1786
1787        runner.execute_once(&ctx).await;
1788        let state = runner.meta.state();
1789        assert!(state.is_failed(), "{state:?}");
1790        assert!(meta.state().is_failed());
1791
1792        // Check the poison is set.
1793        let procedure_id = runner
1794            .manager_ctx
1795            .poison_manager
1796            .get_poison(&poison_key.to_string())
1797            .await
1798            .unwrap()
1799            .unwrap();
1800
1801        // If the procedure is poisoned, the poison key shouldn't be deleted.
1802        assert_eq!(&procedure_id.clone(), ROOT_ID);
1803    }
1804
1805    #[tokio::test]
1806    async fn test_execute_exceed_max_retry_after_set_poison() {
1807        common_telemetry::init_default_ut_logging();
1808        let mut times = 0;
1809        let poison_key = PoisonKey::new("table/1024");
1810        let moved_poison_key = poison_key.clone();
1811        let exec_fn = move |ctx: Context| {
1812            times += 1;
1813            let poison_key = moved_poison_key.clone();
1814            async move {
1815                if times == 1 {
1816                    Ok(Status::executing(true))
1817                } else {
1818                    // Put the poison to the context.
1819                    ctx.provider
1820                        .try_put_poison(&poison_key, ctx.procedure_id)
1821                        .await
1822                        .unwrap();
1823                    Err(Error::retry_later_and_clean_poisons(MockError::new(
1824                        StatusCode::Unexpected,
1825                    )))
1826                }
1827            }
1828            .boxed()
1829        };
1830        let poison = ProcedureAdapter {
1831            data: "poison".to_string(),
1832            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1833            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1834            exec_fn,
1835            rollback_fn: None,
1836        };
1837
1838        let dir = create_temp_dir("exceed_max_after_set_poison");
1839        let meta = poison.new_meta(ROOT_ID);
1840        let object_store = test_util::new_object_store(&dir);
1841        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1842        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
1843        runner.manager_ctx.start();
1844        runner.exponential_builder = ExponentialBuilder::default()
1845            .with_min_delay(Duration::from_millis(1))
1846            .with_max_times(3);
1847        // Use the manager ctx as the context provider.
1848        let ctx = context_with_provider(
1849            meta.id,
1850            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1851        );
1852        // Manually add this procedure to the manager ctx.
1853        runner
1854            .manager_ctx
1855            .procedures
1856            .write()
1857            .unwrap()
1858            .insert(meta.id, runner.meta.clone());
1859        // Run the runner and execute the procedure.
1860        runner.execute_once_with_retry(&ctx).await;
1861        let err = meta.state().error().unwrap().clone();
1862        assert_matches!(&*err, Error::RetryTimesExceeded { .. });
1863
1864        // Check the poison is deleted.
1865        let procedure_id = runner
1866            .manager_ctx
1867            .poison_manager
1868            .get_poison(&poison_key.to_string())
1869            .await
1870            .unwrap();
1871        assert_eq!(procedure_id, None);
1872    }
1873
1874    #[tokio::test]
1875    async fn test_execute_poisoned() {
1876        let mut times = 0;
1877        let poison_key = PoisonKey::new("table/1024");
1878        let moved_poison_key = poison_key.clone();
1879        let exec_fn = move |ctx: Context| {
1880            times += 1;
1881            let poison_key = moved_poison_key.clone();
1882            async move {
1883                if times == 1 {
1884                    Ok(Status::executing(true))
1885                } else {
1886                    // Put the poison to the context.
1887                    ctx.provider
1888                        .try_put_poison(&poison_key, ctx.procedure_id)
1889                        .await
1890                        .unwrap();
1891                    Ok(Status::Poisoned {
1892                        keys: PoisonKeys::new(vec![poison_key.clone()]),
1893                        error: Error::external(MockError::new(StatusCode::Unexpected)),
1894                    })
1895                }
1896            }
1897            .boxed()
1898        };
1899        let poison = ProcedureAdapter {
1900            data: "poison".to_string(),
1901            lock_key: LockKey::single_exclusive("catalog.schema.table"),
1902            poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
1903            exec_fn,
1904            rollback_fn: None,
1905        };
1906
1907        let dir = create_temp_dir("poison");
1908        let meta = poison.new_meta(ROOT_ID);
1909
1910        let object_store = test_util::new_object_store(&dir);
1911        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
1912        let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
1913
1914        // Use the manager ctx as the context provider.
1915        let ctx = context_with_provider(
1916            meta.id,
1917            runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
1918        );
1919        // Manually add this procedure to the manager ctx.
1920        runner
1921            .manager_ctx
1922            .procedures
1923            .write()
1924            .unwrap()
1925            .insert(meta.id, runner.meta.clone());
1926
1927        runner.manager_ctx.start();
1928        runner.execute_once(&ctx).await;
1929        let state = runner.meta.state();
1930        assert!(state.is_running(), "{state:?}");
1931
1932        runner.execute_once(&ctx).await;
1933        let state = runner.meta.state();
1934        assert!(state.is_poisoned(), "{state:?}");
1935        assert!(meta.state().is_poisoned());
1936        check_files(
1937            &object_store,
1938            &procedure_store,
1939            ctx.procedure_id,
1940            &["0000000000.step"],
1941        )
1942        .await;
1943
1944        // Check the poison is set.
1945        let procedure_id = runner
1946            .manager_ctx
1947            .poison_manager
1948            .get_poison(&poison_key.to_string())
1949            .await
1950            .unwrap()
1951            .unwrap();
1952
1953        // If the procedure is poisoned, the poison key shouldn't be deleted.
1954        assert_eq!(procedure_id, ROOT_ID);
1955    }
1956
1957    fn test_procedure_with_dynamic_lock(
1958        shared_atomic_value: Arc<AtomicU64>,
1959        id: u64,
1960    ) -> (BoxedProcedure, Arc<ProcedureMeta>) {
1961        let exec_fn = move |ctx: Context| {
1962            let moved_shared_atomic_value = shared_atomic_value.clone();
1963            let moved_ctx = ctx.clone();
1964            async move {
1965                debug!("Acquiring write lock, id: {}", id);
1966                let key = StringKey::Exclusive("test_lock".to_string());
1967                let guard = moved_ctx.provider.acquire_lock(&key).await;
1968                debug!("Acquired write lock, id: {}", id);
1969                let millis = rand::rng().random_range(10..=50);
1970                tokio::time::sleep(Duration::from_millis(millis)).await;
1971                let value = moved_shared_atomic_value.load(Ordering::Relaxed);
1972                moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
1973                debug!("Dropping write lock, id: {}", id);
1974                drop(guard);
1975
1976                Ok(Status::done())
1977            }
1978            .boxed()
1979        };
1980
1981        let adapter = ProcedureAdapter {
1982            data: "dynamic_lock".to_string(),
1983            lock_key: LockKey::new_exclusive([]),
1984            poison_keys: PoisonKeys::new([]),
1985            exec_fn,
1986            rollback_fn: None,
1987        };
1988        let meta = adapter.new_meta(ROOT_ID);
1989
1990        (Box::new(adapter), meta)
1991    }
1992
1993    #[tokio::test(flavor = "multi_thread")]
1994    async fn test_execute_with_dynamic_lock() {
1995        common_telemetry::init_default_ut_logging();
1996        let shared_atomic_value = Arc::new(AtomicU64::new(0));
1997        let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
1998        let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
1999
2000        let dir = create_temp_dir("dynamic_lock");
2001        let object_store = test_util::new_object_store(&dir);
2002        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
2003        let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
2004        let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
2005        let ctx1 = context_with_provider(
2006            meta1.id,
2007            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
2008        );
2009        let ctx2 = context_with_provider(
2010            meta2.id,
2011            // use same manager ctx as runner1
2012            runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
2013        );
2014        let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
2015        join_all(tasks).await;
2016        assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
2017    }
2018    #[tokio::test]
2019    async fn test_on_suspend_deadlock_detected_no_rollback() {
2020        // Parent holds Exclusive("catalog.schema.table"), child also requests Exclusive("catalog.schema.table").
2021        // Since parent does NOT support rollback, state should become Failed.
2022        let child_id = ProcedureId::random();
2023        let exec_fn = move |_| {
2024            async move {
2025                let child_exec_fn = |_| async { Ok(Status::done()) }.boxed();
2026                let child = ProcedureAdapter {
2027                    data: "child".to_string(),
2028                    lock_key: LockKey::single_exclusive("catalog.schema.table"),
2029                    poison_keys: PoisonKeys::default(),
2030                    exec_fn: child_exec_fn,
2031                    rollback_fn: None,
2032                };
2033                Ok(Status::Suspended {
2034                    subprocedures: vec![ProcedureWithId {
2035                        id: child_id,
2036                        procedure: Box::new(child),
2037                    }],
2038                    persist: false,
2039                })
2040            }
2041            .boxed()
2042        };
2043        let parent = ProcedureAdapter {
2044            data: "parent".to_string(),
2045            lock_key: LockKey::single_exclusive("catalog.schema.table"),
2046            poison_keys: PoisonKeys::default(),
2047            exec_fn,
2048            rollback_fn: None, // No rollback support
2049        };
2050
2051        let dir = create_temp_dir("deadlock_no_rollback");
2052        let meta = parent.new_meta(ROOT_ID);
2053        let ctx = context_without_provider(meta.id);
2054        let object_store = test_util::new_object_store(&dir);
2055        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
2056        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
2057        runner.manager_ctx.start();
2058
2059        runner.execute_once(&ctx).await;
2060        let state = runner.meta.state();
2061        assert!(state.is_failed(), "Expected Failed, got {state:?}");
2062        // Verify the error exists
2063        assert!(
2064            state.error().is_some(),
2065            "Failed state should contain an error"
2066        );
2067        // Child should NOT have been submitted
2068        assert!(
2069            !runner.manager_ctx.contains_procedure(child_id),
2070            "Child procedure should not be submitted when deadlock is detected"
2071        );
2072    }
2073
2074    #[tokio::test]
2075    async fn test_on_suspend_deadlock_detected_with_rollback() {
2076        // Parent holds Exclusive("catalog.schema.table"), child also requests Exclusive("catalog.schema.table").
2077        // Since parent DOES support rollback, state should become PrepareRollback.
2078        let child_id = ProcedureId::random();
2079        let exec_fn = move |_| {
2080            async move {
2081                let child_exec_fn = |_| async { Ok(Status::done()) }.boxed();
2082                let child = ProcedureAdapter {
2083                    data: "child".to_string(),
2084                    lock_key: LockKey::single_exclusive("catalog.schema.table"),
2085                    poison_keys: PoisonKeys::default(),
2086                    exec_fn: child_exec_fn,
2087                    rollback_fn: None,
2088                };
2089                Ok(Status::Suspended {
2090                    subprocedures: vec![ProcedureWithId {
2091                        id: child_id,
2092                        procedure: Box::new(child),
2093                    }],
2094                    persist: false,
2095                })
2096            }
2097            .boxed()
2098        };
2099        let rollback_fn = move |_| async move { Ok(()) }.boxed();
2100        let parent = ProcedureAdapter {
2101            data: "parent".to_string(),
2102            lock_key: LockKey::single_exclusive("catalog.schema.table"),
2103            poison_keys: PoisonKeys::default(),
2104            exec_fn,
2105            rollback_fn: Some(Box::new(rollback_fn)), // Supports rollback
2106        };
2107
2108        let dir = create_temp_dir("deadlock_with_rollback");
2109        let meta = parent.new_meta(ROOT_ID);
2110        let ctx = context_without_provider(meta.id);
2111        let object_store = test_util::new_object_store(&dir);
2112        let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
2113        let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
2114        runner.manager_ctx.start();
2115
2116        runner.execute_once(&ctx).await;
2117        let state = runner.meta.state();
2118        assert!(
2119            state.is_prepare_rollback(),
2120            "Expected PrepareRollback, got {state:?}"
2121        );
2122        // Verify the error exists in PrepareRollback variant
2123        match &state {
2124            ProcedureState::PrepareRollback { error } => {
2125                assert!(!error.to_string().is_empty(), "Error should not be empty");
2126            }
2127            _ => panic!("Expected PrepareRollback, got {state:?}"),
2128        }
2129        // Child should NOT have been submitted
2130        assert!(
2131            !runner.manager_ctx.contains_procedure(child_id),
2132            "Child procedure should not be submitted when deadlock is detected"
2133        );
2134    }
2135
2136    #[test]
2137    fn test_find_lock_conflicts() {
2138        use crate::procedure::StringKey;
2139
2140        // 1. Share + Share = No conflict (Compatible)
2141        let parent = [StringKey::Share("A".to_string())];
2142        let child = [StringKey::Share("A".to_string())];
2143        assert!(super::find_lock_conflicts(parent.iter(), child.iter()).is_empty());
2144
2145        // 2. Share + Exclusive = Conflict
2146        let parent = [StringKey::Share("A".to_string())];
2147        let child = [StringKey::Exclusive("A".to_string())];
2148        assert_eq!(
2149            super::find_lock_conflicts(parent.iter(), child.iter()),
2150            vec!["A".to_string()]
2151        );
2152
2153        // 3. Exclusive + Share = Conflict
2154        let parent = [StringKey::Exclusive("A".to_string())];
2155        let child = [StringKey::Share("A".to_string())];
2156        assert_eq!(
2157            super::find_lock_conflicts(parent.iter(), child.iter()),
2158            vec!["A".to_string()]
2159        );
2160
2161        // 4. Exclusive + Exclusive = Conflict
2162        let parent = [StringKey::Exclusive("A".to_string())];
2163        let child = [StringKey::Exclusive("A".to_string())];
2164        assert_eq!(
2165            super::find_lock_conflicts(parent.iter(), child.iter()),
2166            vec!["A".to_string()]
2167        );
2168
2169        // 5. Multiple keys, partial overlap
2170        let parent = [
2171            StringKey::Share("A".to_string()),
2172            StringKey::Exclusive("B".to_string()),
2173        ];
2174        let child = [
2175            StringKey::Exclusive("A".to_string()), // Conflict with Share("A")
2176            StringKey::Share("B".to_string()),     // Conflict with Exclusive("B")
2177            StringKey::Exclusive("C".to_string()), // No conflict, parent doesn't hold C
2178        ];
2179        let mut conflicts = super::find_lock_conflicts(parent.iter(), child.iter());
2180        conflicts.sort();
2181        assert_eq!(conflicts, vec!["A".to_string(), "B".to_string()]);
2182    }
2183}