tests_fuzz/utils/
cluster_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use common_telemetry::info;
18use humantime::parse_duration;
19use snafu::ResultExt;
20use sqlx::MySqlPool;
21
22use super::wait::wait_condition_fn;
23use crate::error::{self, Result};
24
25pub const PEER_TYPE_DATANODE: &str = "DATANODE";
26
27#[derive(Debug, sqlx::FromRow)]
28pub struct NodeInfo {
29    pub peer_id: i64,
30    pub peer_addr: String,
31    pub peer_type: String,
32    pub active_time: Option<String>,
33}
34
35/// Returns all [NodeInfo] in the cluster.
36pub async fn fetch_nodes(db: &MySqlPool) -> Result<Vec<NodeInfo>> {
37    let sql = "select * from information_schema.cluster_info";
38    sqlx::query_as::<_, NodeInfo>(sql)
39        .fetch_all(db)
40        .await
41        .context(error::ExecuteQuerySnafu { sql })
42}
43
44/// Waits until all datanodes are online within a specified timeout period.
45///
46/// This function repeatedly checks the status of all datanodes and waits until all of them are online
47/// or the timeout period elapses. A datanode is considered online if its `active_time` is less than 3 seconds.
48pub async fn wait_for_all_datanode_online(greptime: MySqlPool, timeout: Duration) {
49    wait_condition_fn(
50        timeout,
51        || {
52            let greptime = greptime.clone();
53            Box::pin(async move {
54                let nodes = fetch_nodes(&greptime)
55                    .await
56                    .unwrap()
57                    .into_iter()
58                    .flat_map(|node| {
59                        if node.peer_type == PEER_TYPE_DATANODE {
60                            Some(node)
61                        } else {
62                            None
63                        }
64                    })
65                    .collect::<Vec<_>>();
66                info!("Waits for all datanode online: {nodes:?}");
67                nodes
68            })
69        },
70        |nodes| {
71            nodes
72                .into_iter()
73                .map(|node| parse_duration(&node.active_time.unwrap()).unwrap())
74                .all(|duration| duration < Duration::from_secs(3))
75        },
76        Duration::from_secs(5),
77    )
78    .await
79}