meta_srv/discovery/
utils.rs1use std::time::Duration;
16
17use api::v1::meta::heartbeat_request::NodeWorkloads;
18use common_meta::DatanodeId;
19use common_meta::cluster::NodeInfo;
20use common_meta::kv_backend::KvBackendRef;
21use common_meta::peer::Peer;
22use common_time::util::{DefaultSystemTimer, SystemTimer};
23use common_workload::DatanodeWorkloadType;
24use snafu::ResultExt;
25
26use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
27use crate::discovery::node_info::{NodeInfoAccessor, NodeInfoType};
28use crate::error::{KvBackendSnafu, Result};
29use crate::key::{DatanodeLeaseKey, LeaseValue};
30
31impl LastActiveTs for LeaseValue {
32 fn last_active_ts(&self) -> i64 {
33 self.timestamp_millis
34 }
35}
36
37impl LastActiveTs for NodeInfo {
38 fn last_active_ts(&self) -> i64 {
39 self.last_activity_ts
40 }
41}
42
43pub trait LastActiveTs {
45 fn last_active_ts(&self) -> i64;
47}
48
49pub fn build_active_filter<T: LastActiveTs>(
56 timer: impl SystemTimer,
57 active_duration: Duration,
58) -> impl Fn(&T) -> bool {
59 let now = timer.current_time_millis();
60 let active_duration = active_duration.as_millis() as u64;
61 move |item: &T| {
62 let elapsed = now.saturating_sub(item.last_active_ts()) as u64;
63 elapsed < active_duration
64 }
65}
66
67pub async fn alive_datanodes(
69 accessor: &impl LeaseValueAccessor,
70 active_duration: Duration,
71 condition: Option<fn(&NodeWorkloads) -> bool>,
72) -> Result<Vec<Peer>> {
73 let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
74 let condition = condition.unwrap_or(|_| true);
75 Ok(accessor
76 .lease_values(LeaseValueType::Datanode)
77 .await?
78 .into_iter()
79 .filter_map(|(peer_id, lease_value)| {
80 if active_filter(&lease_value) && condition(&lease_value.workloads) {
81 Some(Peer::new(peer_id, lease_value.node_addr))
82 } else {
83 None
84 }
85 })
86 .collect::<Vec<_>>())
87}
88
89pub async fn alive_flownodes(
91 accessor: &impl LeaseValueAccessor,
92 active_duration: Duration,
93 condition: Option<fn(&NodeWorkloads) -> bool>,
94) -> Result<Vec<Peer>> {
95 let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
96 let condition = condition.unwrap_or(|_| true);
97 Ok(accessor
98 .lease_values(LeaseValueType::Flownode)
99 .await?
100 .into_iter()
101 .filter_map(|(peer_id, lease_value)| {
102 if active_filter(&lease_value) && condition(&lease_value.workloads) {
103 Some(Peer::new(peer_id, lease_value.node_addr))
104 } else {
105 None
106 }
107 })
108 .collect::<Vec<_>>())
109}
110
111pub async fn alive_frontends(
113 lister: &impl NodeInfoAccessor,
114 active_duration: Duration,
115) -> Result<Vec<Peer>> {
116 let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
117 Ok(lister
118 .node_infos(NodeInfoType::Frontend)
119 .await?
120 .into_iter()
121 .filter_map(|(_, node_info)| {
122 if active_filter(&node_info) {
123 Some(node_info.peer)
124 } else {
125 None
126 }
127 })
128 .collect::<Vec<_>>())
129}
130
131pub async fn alive_datanode(
133 lister: &impl LeaseValueAccessor,
134 peer_id: u64,
135 active_duration: Duration,
136) -> Result<Option<Peer>> {
137 let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
138 let v = lister
139 .lease_value(LeaseValueType::Datanode, peer_id)
140 .await?
141 .filter(|(_, lease)| active_filter(lease))
142 .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
143
144 Ok(v)
145}
146
147pub fn accept_ingest_workload(datanode_workloads: &NodeWorkloads) -> bool {
155 match &datanode_workloads {
156 NodeWorkloads::Datanode(workloads) => workloads
157 .types
158 .iter()
159 .filter_map(|w| DatanodeWorkloadType::from_i32(*w))
160 .any(|w| w.accept_ingest()),
161 _ => true,
163 }
164}
165
166pub async fn find_datanode_lease_value(
168 in_memory: &KvBackendRef,
169 datanode_id: DatanodeId,
170) -> Result<Option<LeaseValue>> {
171 let lease_key = DatanodeLeaseKey {
172 node_id: datanode_id,
173 };
174 let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
175 let Some(kv) = in_memory
176 .get(&lease_key_bytes)
177 .await
178 .context(KvBackendSnafu)?
179 else {
180 return Ok(None);
181 };
182
183 let lease_value: LeaseValue = kv.value.try_into()?;
184 Ok(Some(lease_value))
185}