tests_fuzz/utils/
cluster_info.rs1use std::time::Duration;
16
17use common_telemetry::info;
18use humantime::parse_duration;
19use snafu::ResultExt;
20use sqlx::MySqlPool;
21
22use super::wait::wait_condition_fn;
23use crate::error::{self, Result};
24
25pub const PEER_TYPE_DATANODE: &str = "DATANODE";
26
27#[derive(Debug, sqlx::FromRow)]
28pub struct NodeInfo {
29 pub peer_id: i64,
30 pub peer_addr: String,
31 pub peer_type: String,
32 pub active_time: Option<String>,
33}
34
35pub async fn fetch_nodes(db: &MySqlPool) -> Result<Vec<NodeInfo>> {
37 let sql = "select * from information_schema.cluster_info";
38 sqlx::query_as::<_, NodeInfo>(sql)
39 .fetch_all(db)
40 .await
41 .context(error::ExecuteQuerySnafu { sql })
42}
43
44pub async fn wait_for_all_datanode_online(greptime: MySqlPool, timeout: Duration) {
49 wait_condition_fn(
50 timeout,
51 || {
52 let greptime = greptime.clone();
53 Box::pin(async move {
54 let nodes = fetch_nodes(&greptime)
55 .await
56 .unwrap()
57 .into_iter()
58 .flat_map(|node| {
59 if node.peer_type == PEER_TYPE_DATANODE {
60 Some(node)
61 } else {
62 None
63 }
64 })
65 .collect::<Vec<_>>();
66 info!("Waits for all datanode online: {nodes:?}");
67 nodes
68 })
69 },
70 |nodes| {
71 nodes
72 .into_iter()
73 .map(|node| parse_duration(&node.active_time.unwrap()).unwrap())
74 .all(|duration| duration < Duration::from_secs(3))
75 },
76 Duration::from_secs(5),
77 )
78 .await
79}
80
81pub async fn wait_for_all_datanode_offline(greptime: MySqlPool, timeout: Duration) {
82 wait_condition_fn(
83 timeout,
84 || {
85 let greptime = greptime.clone();
86 Box::pin(async move {
87 let nodes = fetch_nodes(&greptime)
88 .await
89 .unwrap()
90 .into_iter()
91 .flat_map(|node| {
92 if node.peer_type == PEER_TYPE_DATANODE {
93 Some(node)
94 } else {
95 None
96 }
97 })
98 .collect::<Vec<_>>();
99 info!("Waits for datanode offline: {nodes:?}");
100 nodes
101 })
102 },
103 |nodes| {
104 nodes
105 .into_iter()
106 .map(|node| {
107 info!(
108 "Waits for datanode {} offline, active_time: {:?}",
109 node.peer_id, node.active_time
110 );
111 parse_duration(&node.active_time.unwrap()).unwrap()
112 })
113 .all(|duration| duration >= Duration::from_secs(3))
114 },
115 Duration::from_secs(2),
116 )
117 .await
118}