Skip to main content

meta_client/client/
util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::heartbeat_request::NodeWorkloads;
16use api::v1::meta::{ErrorCode, ResponseHeader};
17use common_meta::cluster::{NodeInfo, NodeStatus};
18use common_meta::peer::Peer;
19use common_time::util::SystemTimer;
20use tonic::{Code, Status};
21
22pub(crate) fn is_unreachable(status: &Status) -> bool {
23    status.code() == Code::Unavailable || status.code() == Code::DeadlineExceeded
24}
25
26pub(crate) fn is_not_leader(header: &Option<ResponseHeader>) -> bool {
27    let Some(header) = header else {
28        return false;
29    };
30
31    let Some(err) = header.error.as_ref() else {
32        return false;
33    };
34
35    err.code == ErrorCode::NotLeader as i32
36}
37
38fn is_active_node(
39    timer: &impl SystemTimer,
40    last_activity_ts: i64,
41    active_duration: std::time::Duration,
42) -> bool {
43    let now = timer.current_time_millis();
44    let elapsed = now.checked_sub(last_activity_ts).unwrap_or(0) as u64;
45    elapsed < active_duration.as_millis() as u64
46}
47
48pub(crate) fn alive_frontends(
49    timer: &impl SystemTimer,
50    nodes: Vec<NodeInfo>,
51    active_duration: std::time::Duration,
52) -> Vec<Peer> {
53    nodes
54        .into_iter()
55        .filter_map(|node| {
56            if matches!(node.status, NodeStatus::Frontend(_))
57                && is_active_node(timer, node.last_activity_ts, active_duration)
58            {
59                Some(node.peer)
60            } else {
61                None
62            }
63        })
64        .collect()
65}
66
67pub(crate) fn alive_datanodes(
68    timer: &impl SystemTimer,
69    nodes: Vec<NodeInfo>,
70    active_duration: std::time::Duration,
71    filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
72) -> Vec<Peer> {
73    let filter = filter.unwrap_or(|_| true);
74
75    nodes
76        .into_iter()
77        .filter_map(|node| {
78            if let NodeStatus::Datanode(status) = node.status
79                && is_active_node(timer, node.last_activity_ts, active_duration)
80            {
81                let workloads = NodeWorkloads::Datanode(status.workloads);
82                filter(&workloads).then_some(node.peer)
83            } else {
84                None
85            }
86        })
87        .collect()
88}
89
90pub(crate) fn alive_flownodes(
91    timer: &impl SystemTimer,
92    nodes: Vec<NodeInfo>,
93    active_duration: std::time::Duration,
94    filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
95) -> Vec<Peer> {
96    let filter = filter.unwrap_or(|_| true);
97
98    nodes
99        .into_iter()
100        .filter_map(|node| {
101            if let NodeStatus::Flownode(status) = node.status
102                && is_active_node(timer, node.last_activity_ts, active_duration)
103            {
104                let workloads = NodeWorkloads::Flownode(status.workloads);
105                filter(&workloads).then_some(node.peer)
106            } else {
107                None
108            }
109        })
110        .collect()
111}
112
113#[cfg(test)]
114mod tests {
115    use std::time::Duration;
116
117    use api::v1::meta::heartbeat_request::NodeWorkloads;
118    use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, Peer};
119    use common_meta::cluster::{
120        DatanodeStatus, FlownodeStatus, FrontendStatus, MetasrvStatus, NodeInfo, NodeStatus, Role,
121    };
122    use common_time::util::SystemTimer;
123
124    use super::*;
125
126    struct MockSystemTimer(i64);
127
128    impl MockSystemTimer {
129        fn new(now: i64) -> Self {
130            Self(now)
131        }
132    }
133
134    impl SystemTimer for MockSystemTimer {
135        fn current_time_millis(&self) -> i64 {
136            self.0
137        }
138
139        fn current_time_rfc3339(&self) -> String {
140            "1970-01-01T00:00:00Z".to_string()
141        }
142    }
143
144    fn node_info(role: Role, id: u64, addr: &str, last_activity_ts: i64) -> NodeInfo {
145        let status = match role {
146            Role::Frontend => NodeStatus::Frontend(FrontendStatus {}),
147            Role::Datanode => NodeStatus::Datanode(DatanodeStatus {
148                rcus: 0,
149                wcus: 0,
150                leader_regions: 0,
151                follower_regions: 0,
152                workloads: DatanodeWorkloads { types: vec![] },
153            }),
154            Role::Flownode => NodeStatus::Flownode(FlownodeStatus {
155                workloads: FlownodeWorkloads { types: vec![] },
156            }),
157            Role::Metasrv => NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
158        };
159
160        NodeInfo {
161            peer: Peer::new(id, addr),
162            last_activity_ts,
163            status,
164            version: String::new(),
165            git_commit: String::new(),
166            start_time_ms: 0,
167            total_cpu_millicores: 0,
168            total_memory_bytes: 0,
169            cpu_usage_millicores: 0,
170            memory_usage_bytes: 0,
171            hostname: String::new(),
172        }
173    }
174
175    fn ingest_only(workloads: &NodeWorkloads) -> bool {
176        matches!(
177            workloads,
178            NodeWorkloads::Datanode(DatanodeWorkloads { types }) if types.as_slice() == [1]
179        )
180    }
181
182    fn empty_flownode_workloads(workloads: &NodeWorkloads) -> bool {
183        matches!(
184            workloads,
185            NodeWorkloads::Flownode(FlownodeWorkloads { types }) if types.is_empty()
186        )
187    }
188
189    #[test]
190    fn test_alive_frontends_filters_by_activity_and_role() {
191        let timer = MockSystemTimer::new(100);
192        let peers = alive_frontends(
193            &timer,
194            vec![
195                node_info(Role::Frontend, 1, "127.0.0.1:3001", 95),
196                node_info(Role::Frontend, 2, "127.0.0.1:3002", 89),
197                node_info(Role::Datanode, 3, "127.0.0.1:4001", 99),
198            ],
199            Duration::from_millis(10),
200        );
201
202        assert_eq!(
203            vec![1],
204            peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
205        );
206    }
207
208    #[test]
209    fn test_alive_datanodes_filters_by_activity_and_workload() {
210        let timer = MockSystemTimer::new(100);
211        let mut first = node_info(Role::Datanode, 1, "127.0.0.1:4001", 95);
212        let mut second = node_info(Role::Datanode, 2, "127.0.0.1:4002", 95);
213        let stale = node_info(Role::Datanode, 3, "127.0.0.1:4003", 89);
214
215        if let NodeStatus::Datanode(status) = &mut first.status {
216            status.workloads = DatanodeWorkloads { types: vec![1] };
217        }
218        if let NodeStatus::Datanode(status) = &mut second.status {
219            status.workloads = DatanodeWorkloads { types: vec![2] };
220        }
221
222        let peers = alive_datanodes(
223            &timer,
224            vec![first, second, stale],
225            Duration::from_millis(10),
226            Some(ingest_only),
227        );
228
229        assert_eq!(
230            vec![1],
231            peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
232        );
233    }
234
235    #[test]
236    fn test_alive_flownodes_uses_empty_workload_semantics() {
237        let timer = MockSystemTimer::new(100);
238        let peers = alive_flownodes(
239            &timer,
240            vec![
241                node_info(Role::Flownode, 1, "127.0.0.1:5001", 95),
242                node_info(Role::Flownode, 2, "127.0.0.1:5002", 89),
243                node_info(Role::Frontend, 3, "127.0.0.1:3001", 99),
244            ],
245            Duration::from_millis(10),
246            Some(empty_flownode_workloads),
247        );
248
249        assert_eq!(
250            vec![1],
251            peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
252        );
253    }
254
255    #[test]
256    fn test_alive_flownodes_filters_by_workloads() {
257        let timer = MockSystemTimer::new(100);
258        let mut first = node_info(Role::Flownode, 1, "127.0.0.1:5001", 95);
259        let mut second = node_info(Role::Flownode, 2, "127.0.0.1:5002", 95);
260
261        if let NodeStatus::Flownode(status) = &mut first.status {
262            status.workloads = FlownodeWorkloads { types: vec![7] };
263        }
264        if let NodeStatus::Flownode(status) = &mut second.status {
265            status.workloads = FlownodeWorkloads { types: vec![8] };
266        }
267
268        fn workload_type_is_7(workloads: &NodeWorkloads) -> bool {
269            matches!(
270                workloads,
271                NodeWorkloads::Flownode(FlownodeWorkloads { types }) if types.as_slice() == [7]
272            )
273        }
274
275        let peers = alive_flownodes(
276            &timer,
277            vec![first, second],
278            Duration::from_millis(10),
279            Some(workload_type_is_7),
280        );
281
282        assert_eq!(
283            vec![1],
284            peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
285        );
286    }
287}