tests_fuzz/utils/
procedure.rs1use std::time::Duration;
16
17use common_telemetry::info;
18use snafu::ResultExt;
19use sqlx::{MySql, Pool, Row};
20
21use super::wait::wait_condition_fn;
22use crate::error;
23
24pub async fn procedure_state(e: &Pool<MySql>, procedure_id: &str) -> String {
26 let sql = format!("admin procedure_state(\"{procedure_id}\");");
27 let result = sqlx::query(&sql)
28 .fetch_one(e)
29 .await
30 .context(error::ExecuteQuerySnafu { sql })
31 .unwrap();
32 result.try_get(0).unwrap()
33}
34
35pub async fn wait_for_procedure_finish(
37 greptime: &Pool<MySql>,
38 timeout: Duration,
39 procedure_id: String,
40) {
41 wait_condition_fn(
42 timeout,
43 || {
44 let greptime = greptime.clone();
45 let procedure_id = procedure_id.clone();
46 Box::pin(async move { procedure_state(&greptime, &procedure_id).await })
47 },
48 |output| {
49 info!("Procedure({procedure_id}) state: {:?}", output);
50 output.to_lowercase().contains("done")
51 },
52 Duration::from_secs(5),
53 )
54 .await
55}