1use api::v1::meta::heartbeat_request::NodeWorkloads;
16use api::v1::meta::{ErrorCode, ResponseHeader};
17use common_meta::cluster::{NodeInfo, NodeStatus};
18use common_meta::peer::Peer;
19use common_time::util::SystemTimer;
20use tonic::{Code, Status};
21
22pub(crate) fn is_unreachable(status: &Status) -> bool {
23 status.code() == Code::Unavailable || status.code() == Code::DeadlineExceeded
24}
25
26pub(crate) fn is_not_leader(header: &Option<ResponseHeader>) -> bool {
27 let Some(header) = header else {
28 return false;
29 };
30
31 let Some(err) = header.error.as_ref() else {
32 return false;
33 };
34
35 err.code == ErrorCode::NotLeader as i32
36}
37
38fn is_active_node(
39 timer: &impl SystemTimer,
40 last_activity_ts: i64,
41 active_duration: std::time::Duration,
42) -> bool {
43 let now = timer.current_time_millis();
44 let elapsed = now.checked_sub(last_activity_ts).unwrap_or(0) as u64;
45 elapsed < active_duration.as_millis() as u64
46}
47
48pub(crate) fn alive_frontends(
49 timer: &impl SystemTimer,
50 nodes: Vec<NodeInfo>,
51 active_duration: std::time::Duration,
52) -> Vec<Peer> {
53 nodes
54 .into_iter()
55 .filter_map(|node| {
56 if matches!(node.status, NodeStatus::Frontend(_))
57 && is_active_node(timer, node.last_activity_ts, active_duration)
58 {
59 Some(node.peer)
60 } else {
61 None
62 }
63 })
64 .collect()
65}
66
67pub(crate) fn alive_datanodes(
68 timer: &impl SystemTimer,
69 nodes: Vec<NodeInfo>,
70 active_duration: std::time::Duration,
71 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
72) -> Vec<Peer> {
73 let filter = filter.unwrap_or(|_| true);
74
75 nodes
76 .into_iter()
77 .filter_map(|node| {
78 if let NodeStatus::Datanode(status) = node.status
79 && is_active_node(timer, node.last_activity_ts, active_duration)
80 {
81 let workloads = NodeWorkloads::Datanode(status.workloads);
82 filter(&workloads).then_some(node.peer)
83 } else {
84 None
85 }
86 })
87 .collect()
88}
89
90pub(crate) fn alive_flownodes(
91 timer: &impl SystemTimer,
92 nodes: Vec<NodeInfo>,
93 active_duration: std::time::Duration,
94 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
95) -> Vec<Peer> {
96 let filter = filter.unwrap_or(|_| true);
97
98 nodes
99 .into_iter()
100 .filter_map(|node| {
101 if let NodeStatus::Flownode(status) = node.status
102 && is_active_node(timer, node.last_activity_ts, active_duration)
103 {
104 let workloads = NodeWorkloads::Flownode(status.workloads);
105 filter(&workloads).then_some(node.peer)
106 } else {
107 None
108 }
109 })
110 .collect()
111}
112
113#[cfg(test)]
114mod tests {
115 use std::time::Duration;
116
117 use api::v1::meta::heartbeat_request::NodeWorkloads;
118 use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, Peer};
119 use common_meta::cluster::{
120 DatanodeStatus, FlownodeStatus, FrontendStatus, MetasrvStatus, NodeInfo, NodeStatus, Role,
121 };
122 use common_time::util::SystemTimer;
123
124 use super::*;
125
126 struct MockSystemTimer(i64);
127
128 impl MockSystemTimer {
129 fn new(now: i64) -> Self {
130 Self(now)
131 }
132 }
133
134 impl SystemTimer for MockSystemTimer {
135 fn current_time_millis(&self) -> i64 {
136 self.0
137 }
138
139 fn current_time_rfc3339(&self) -> String {
140 "1970-01-01T00:00:00Z".to_string()
141 }
142 }
143
144 fn node_info(role: Role, id: u64, addr: &str, last_activity_ts: i64) -> NodeInfo {
145 let status = match role {
146 Role::Frontend => NodeStatus::Frontend(FrontendStatus {}),
147 Role::Datanode => NodeStatus::Datanode(DatanodeStatus {
148 rcus: 0,
149 wcus: 0,
150 leader_regions: 0,
151 follower_regions: 0,
152 workloads: DatanodeWorkloads { types: vec![] },
153 }),
154 Role::Flownode => NodeStatus::Flownode(FlownodeStatus {
155 workloads: FlownodeWorkloads { types: vec![] },
156 }),
157 Role::Metasrv => NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
158 };
159
160 NodeInfo {
161 peer: Peer::new(id, addr),
162 last_activity_ts,
163 status,
164 version: String::new(),
165 git_commit: String::new(),
166 start_time_ms: 0,
167 total_cpu_millicores: 0,
168 total_memory_bytes: 0,
169 cpu_usage_millicores: 0,
170 memory_usage_bytes: 0,
171 hostname: String::new(),
172 }
173 }
174
175 fn ingest_only(workloads: &NodeWorkloads) -> bool {
176 matches!(
177 workloads,
178 NodeWorkloads::Datanode(DatanodeWorkloads { types }) if types.as_slice() == [1]
179 )
180 }
181
182 fn empty_flownode_workloads(workloads: &NodeWorkloads) -> bool {
183 matches!(
184 workloads,
185 NodeWorkloads::Flownode(FlownodeWorkloads { types }) if types.is_empty()
186 )
187 }
188
189 #[test]
190 fn test_alive_frontends_filters_by_activity_and_role() {
191 let timer = MockSystemTimer::new(100);
192 let peers = alive_frontends(
193 &timer,
194 vec![
195 node_info(Role::Frontend, 1, "127.0.0.1:3001", 95),
196 node_info(Role::Frontend, 2, "127.0.0.1:3002", 89),
197 node_info(Role::Datanode, 3, "127.0.0.1:4001", 99),
198 ],
199 Duration::from_millis(10),
200 );
201
202 assert_eq!(
203 vec![1],
204 peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
205 );
206 }
207
208 #[test]
209 fn test_alive_datanodes_filters_by_activity_and_workload() {
210 let timer = MockSystemTimer::new(100);
211 let mut first = node_info(Role::Datanode, 1, "127.0.0.1:4001", 95);
212 let mut second = node_info(Role::Datanode, 2, "127.0.0.1:4002", 95);
213 let stale = node_info(Role::Datanode, 3, "127.0.0.1:4003", 89);
214
215 if let NodeStatus::Datanode(status) = &mut first.status {
216 status.workloads = DatanodeWorkloads { types: vec![1] };
217 }
218 if let NodeStatus::Datanode(status) = &mut second.status {
219 status.workloads = DatanodeWorkloads { types: vec![2] };
220 }
221
222 let peers = alive_datanodes(
223 &timer,
224 vec![first, second, stale],
225 Duration::from_millis(10),
226 Some(ingest_only),
227 );
228
229 assert_eq!(
230 vec![1],
231 peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
232 );
233 }
234
235 #[test]
236 fn test_alive_flownodes_uses_empty_workload_semantics() {
237 let timer = MockSystemTimer::new(100);
238 let peers = alive_flownodes(
239 &timer,
240 vec![
241 node_info(Role::Flownode, 1, "127.0.0.1:5001", 95),
242 node_info(Role::Flownode, 2, "127.0.0.1:5002", 89),
243 node_info(Role::Frontend, 3, "127.0.0.1:3001", 99),
244 ],
245 Duration::from_millis(10),
246 Some(empty_flownode_workloads),
247 );
248
249 assert_eq!(
250 vec![1],
251 peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
252 );
253 }
254
255 #[test]
256 fn test_alive_flownodes_filters_by_workloads() {
257 let timer = MockSystemTimer::new(100);
258 let mut first = node_info(Role::Flownode, 1, "127.0.0.1:5001", 95);
259 let mut second = node_info(Role::Flownode, 2, "127.0.0.1:5002", 95);
260
261 if let NodeStatus::Flownode(status) = &mut first.status {
262 status.workloads = FlownodeWorkloads { types: vec![7] };
263 }
264 if let NodeStatus::Flownode(status) = &mut second.status {
265 status.workloads = FlownodeWorkloads { types: vec![8] };
266 }
267
268 fn workload_type_is_7(workloads: &NodeWorkloads) -> bool {
269 matches!(
270 workloads,
271 NodeWorkloads::Flownode(FlownodeWorkloads { types }) if types.as_slice() == [7]
272 )
273 }
274
275 let peers = alive_flownodes(
276 &timer,
277 vec![first, second],
278 Duration::from_millis(10),
279 Some(workload_type_is_7),
280 );
281
282 assert_eq!(
283 vec![1],
284 peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
285 );
286 }
287}