meta_srv/discovery/
utils.rs1use std::time::Duration;
16
17use api::v1::meta::heartbeat_request::NodeWorkloads;
18use common_meta::DatanodeId;
19use common_meta::cluster::NodeInfo;
20use common_meta::kv_backend::KvBackendRef;
21use common_meta::peer::Peer;
22use common_time::util::SystemTimer;
23use common_workload::DatanodeWorkloadType;
24use snafu::ResultExt;
25
26use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
27use crate::discovery::node_info::{NodeInfoAccessor, NodeInfoType};
28use crate::error::{KvBackendSnafu, Result};
29use crate::key::{DatanodeLeaseKey, LeaseValue};
30
31impl LastActiveTs for LeaseValue {
32 fn last_active_ts(&self) -> i64 {
33 self.timestamp_millis
34 }
35}
36
37impl LastActiveTs for NodeInfo {
38 fn last_active_ts(&self) -> i64 {
39 self.last_activity_ts
40 }
41}
42
43pub trait LastActiveTs {
45 fn last_active_ts(&self) -> i64;
47}
48
49pub fn build_active_filter<T: LastActiveTs>(active_duration: Duration) -> impl Fn(i64, &T) -> bool {
53 move |now: i64, item: &T| {
54 let active_duration = active_duration.as_millis() as u64;
55 let elapsed = now.saturating_sub(item.last_active_ts()) as u64;
56 elapsed < active_duration
57 }
58}
59
60pub async fn alive_datanodes(
62 timer: &impl SystemTimer,
63 accessor: &impl LeaseValueAccessor,
64 active_duration: Duration,
65 condition: Option<fn(&NodeWorkloads) -> bool>,
66) -> Result<Vec<Peer>> {
67 let active_filter = build_active_filter(active_duration);
68 let condition = condition.unwrap_or(|_| true);
69 let lease_values = accessor.lease_values(LeaseValueType::Datanode).await?;
70 let now = timer.current_time_millis();
71 Ok(lease_values
72 .into_iter()
73 .filter_map(|(peer_id, lease_value)| {
74 if active_filter(now, &lease_value) && condition(&lease_value.workloads) {
75 Some(Peer::new(peer_id, lease_value.node_addr))
76 } else {
77 None
78 }
79 })
80 .collect::<Vec<_>>())
81}
82
83pub async fn alive_flownodes(
85 timer: &impl SystemTimer,
86 accessor: &impl LeaseValueAccessor,
87 active_duration: Duration,
88 condition: Option<fn(&NodeWorkloads) -> bool>,
89) -> Result<Vec<Peer>> {
90 let active_filter = build_active_filter(active_duration);
91 let condition = condition.unwrap_or(|_| true);
92 let lease_values = accessor.lease_values(LeaseValueType::Flownode).await?;
93 let now = timer.current_time_millis();
94 Ok(lease_values
95 .into_iter()
96 .filter_map(|(peer_id, lease_value)| {
97 if active_filter(now, &lease_value) && condition(&lease_value.workloads) {
98 Some(Peer::new(peer_id, lease_value.node_addr))
99 } else {
100 None
101 }
102 })
103 .collect::<Vec<_>>())
104}
105
106pub async fn alive_frontends(
108 timer: &impl SystemTimer,
109 lister: &impl NodeInfoAccessor,
110 active_duration: Duration,
111) -> Result<Vec<Peer>> {
112 let active_filter = build_active_filter(active_duration);
113 let node_infos = lister.node_infos(NodeInfoType::Frontend).await?;
114 let now = timer.current_time_millis();
115 Ok(node_infos
116 .into_iter()
117 .filter_map(|(_, node_info)| {
118 if active_filter(now, &node_info) {
119 Some(node_info.peer)
120 } else {
121 None
122 }
123 })
124 .collect::<Vec<_>>())
125}
126
127pub async fn alive_datanode(
129 timer: &impl SystemTimer,
130 lister: &impl LeaseValueAccessor,
131 peer_id: u64,
132 active_duration: Duration,
133) -> Result<Option<Peer>> {
134 let active_filter = build_active_filter(active_duration);
135 let lease_value = lister
136 .lease_value(LeaseValueType::Datanode, peer_id)
137 .await?;
138 let now = timer.current_time_millis();
139 let v = lease_value
140 .filter(|(_, lease)| active_filter(now, lease))
141 .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
142
143 Ok(v)
144}
145
146pub fn accept_ingest_workload(datanode_workloads: &NodeWorkloads) -> bool {
154 match &datanode_workloads {
155 NodeWorkloads::Datanode(workloads) => workloads
156 .types
157 .iter()
158 .filter_map(|w| DatanodeWorkloadType::from_i32(*w))
159 .any(|w| w.accept_ingest()),
160 _ => true,
162 }
163}
164
165pub async fn find_datanode_lease_value(
167 in_memory: &KvBackendRef,
168 datanode_id: DatanodeId,
169) -> Result<Option<LeaseValue>> {
170 let lease_key = DatanodeLeaseKey {
171 node_id: datanode_id,
172 };
173 let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
174 let Some(kv) = in_memory
175 .get(&lease_key_bytes)
176 .await
177 .context(KvBackendSnafu)?
178 else {
179 return Ok(None);
180 };
181
182 let lease_value: LeaseValue = kv.value.try_into()?;
183 Ok(Some(lease_value))
184}