meta_srv/discovery/
utils.rs1use std::time::Duration;
16
17use api::v1::meta::heartbeat_request::NodeWorkloads;
18use common_meta::DatanodeId;
19use common_meta::cluster::{NodeInfo, NodeStatus};
20use common_meta::kv_backend::KvBackendRef;
21use common_meta::peer::Peer;
22use common_time::util::SystemTimer;
23use common_workload::DatanodeWorkloadType;
24use snafu::ResultExt;
25
26use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
27use crate::discovery::node_info::{NodeInfoAccessor, NodeInfoType};
28use crate::error::{KvBackendSnafu, Result};
29use crate::key::{DatanodeLeaseKey, LeaseValue};
30
31impl LastActiveTs for LeaseValue {
32 fn last_active_ts(&self) -> i64 {
33 self.timestamp_millis
34 }
35}
36
37impl LastActiveTs for NodeInfo {
38 fn last_active_ts(&self) -> i64 {
39 self.last_activity_ts
40 }
41}
42
43pub trait LastActiveTs {
45 fn last_active_ts(&self) -> i64;
47}
48
49pub fn build_active_filter<T: LastActiveTs>(active_duration: Duration) -> impl Fn(i64, &T) -> bool {
53 move |now: i64, item: &T| {
54 let active_duration = active_duration.as_millis() as u64;
55 let elapsed = now.saturating_sub(item.last_active_ts()) as u64;
56 elapsed < active_duration
57 }
58}
59
60pub async fn alive_frontend_infos(
62 timer: &impl SystemTimer,
63 lister: &impl NodeInfoAccessor,
64 active_duration: Duration,
65) -> Result<Vec<NodeInfo>> {
66 alive_node_infos(timer, lister, NodeInfoType::Frontend, active_duration, None).await
67}
68
69pub async fn alive_datanode_infos(
71 timer: &impl SystemTimer,
72 lister: &impl NodeInfoAccessor,
73 active_duration: Duration,
74 condition: Option<fn(&NodeWorkloads) -> bool>,
75) -> Result<Vec<NodeInfo>> {
76 alive_node_infos(
77 timer,
78 lister,
79 NodeInfoType::Datanode,
80 active_duration,
81 condition,
82 )
83 .await
84}
85
86pub async fn alive_flownode_infos(
88 timer: &impl SystemTimer,
89 lister: &impl NodeInfoAccessor,
90 active_duration: Duration,
91 condition: Option<fn(&NodeWorkloads) -> bool>,
92) -> Result<Vec<NodeInfo>> {
93 alive_node_infos(
94 timer,
95 lister,
96 NodeInfoType::Flownode,
97 active_duration,
98 condition,
99 )
100 .await
101}
102
103async fn alive_node_infos(
104 timer: &impl SystemTimer,
105 lister: &impl NodeInfoAccessor,
106 node_info_type: NodeInfoType,
107 active_duration: Duration,
108 condition: Option<fn(&NodeWorkloads) -> bool>,
109) -> Result<Vec<NodeInfo>> {
110 let active_filter = build_active_filter(active_duration);
111 let node_infos = lister.node_infos(node_info_type).await?;
112 let now = timer.current_time_millis();
113 Ok(node_infos
114 .into_iter()
115 .filter_map(|(_, node_info)| {
116 if !active_filter(now, &node_info) {
117 return None;
118 }
119
120 match (&node_info.status, condition) {
121 (NodeStatus::Frontend(_), None) => Some(node_info),
122 (NodeStatus::Frontend(status), Some(condition)) => {
123 let workloads = NodeWorkloads::Frontend(status.workloads.clone());
124 condition(&workloads).then_some(node_info)
125 }
126 (NodeStatus::Datanode(status), Some(condition)) => {
127 let workloads = NodeWorkloads::Datanode(status.workloads.clone());
128 condition(&workloads).then_some(node_info)
129 }
130 (NodeStatus::Flownode(status), Some(condition)) => {
131 let workloads = NodeWorkloads::Flownode(status.workloads.clone());
132 condition(&workloads).then_some(node_info)
133 }
134 (NodeStatus::Datanode(_), None) | (NodeStatus::Flownode(_), None) => {
135 Some(node_info)
136 }
137 _ => None,
138 }
139 })
140 .collect::<Vec<_>>())
141}
142
143pub async fn alive_datanode(
145 timer: &impl SystemTimer,
146 lister: &impl LeaseValueAccessor,
147 peer_id: u64,
148 active_duration: Duration,
149) -> Result<Option<Peer>> {
150 let active_filter = build_active_filter(active_duration);
151 let lease_value = lister
152 .lease_value(LeaseValueType::Datanode, peer_id)
153 .await?;
154 let now = timer.current_time_millis();
155 let v = lease_value
156 .filter(|(_, lease)| active_filter(now, lease))
157 .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
158
159 Ok(v)
160}
161
162pub fn accept_ingest_workload(datanode_workloads: &NodeWorkloads) -> bool {
170 match &datanode_workloads {
171 NodeWorkloads::Datanode(workloads) => workloads
172 .types
173 .iter()
174 .filter_map(|w| DatanodeWorkloadType::from_i32(*w))
175 .any(|w| w.accept_ingest()),
176 _ => true,
178 }
179}
180
181pub async fn find_datanode_lease_value(
183 in_memory: &KvBackendRef,
184 datanode_id: DatanodeId,
185) -> Result<Option<LeaseValue>> {
186 let lease_key = DatanodeLeaseKey {
187 node_id: datanode_id,
188 };
189 let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
190 let Some(kv) = in_memory
191 .get(&lease_key_bytes)
192 .await
193 .context(KvBackendSnafu)?
194 else {
195 return Ok(None);
196 };
197
198 let lease_value: LeaseValue = kv.value.try_into()?;
199 Ok(Some(lease_value))
200}