tests_fuzz/utils/
procedure.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use common_telemetry::info;
18use snafu::ResultExt;
19use sqlx::{MySql, Pool, Row};
20
21use super::wait::wait_condition_fn;
22use crate::error;
23
24/// Fetches the state of a procedure.
25pub async fn procedure_state(e: &Pool<MySql>, procedure_id: &str) -> String {
26    let sql = format!("admin procedure_state(\"{procedure_id}\");");
27    let result = sqlx::query(&sql)
28        .fetch_one(e)
29        .await
30        .context(error::ExecuteQuerySnafu { sql })
31        .unwrap();
32    result.try_get(0).unwrap()
33}
34
35/// Waits for a procedure to finish within a specified timeout period.
36pub async fn wait_for_procedure_finish(
37    greptime: &Pool<MySql>,
38    timeout: Duration,
39    procedure_id: String,
40) {
41    wait_condition_fn(
42        timeout,
43        || {
44            let greptime = greptime.clone();
45            let procedure_id = procedure_id.clone();
46            Box::pin(async move { procedure_state(&greptime, &procedure_id).await })
47        },
48        |output| {
49            info!("Procedure({procedure_id}) state: {:?}", output);
50            output.to_lowercase().contains("done")
51        },
52        Duration::from_secs(5),
53    )
54    .await
55}