1use common_meta::kv_backend::KvBackend;
16use common_meta::rpc::KeyValue;
17use common_meta::rpc::store::RangeRequest;
18
19use crate::cluster::MetaPeerClient;
20use crate::error::Result;
21use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
22
23#[derive(Clone, Copy)]
24pub enum LeaseValueType {
25 Flownode,
26 Datanode,
27}
28
29#[async_trait::async_trait]
30pub trait LeaseValueAccessor: Send + Sync {
31 async fn lease_values(
33 &self,
34 lease_value_type: LeaseValueType,
35 ) -> Result<Vec<(u64, LeaseValue)>>;
36
37 async fn lease_value(
38 &self,
39 lease_value_type: LeaseValueType,
40 node_id: u64,
41 ) -> Result<Option<(u64, LeaseValue)>>;
42}
43
44fn decoder(lease_value_type: LeaseValueType, kv: KeyValue) -> Result<(u64, LeaseValue)> {
45 match lease_value_type {
46 LeaseValueType::Flownode => {
47 let lease_key: FlownodeLeaseKey = kv.key.try_into()?;
48 let lease_value: LeaseValue = kv.value.try_into()?;
49 Ok((lease_key.node_id, lease_value))
50 }
51 LeaseValueType::Datanode => {
52 let lease_key: DatanodeLeaseKey = kv.key.try_into()?;
53 let lease_value: LeaseValue = kv.value.try_into()?;
54 Ok((lease_key.node_id, lease_value))
55 }
56 }
57}
58
59#[async_trait::async_trait]
60impl LeaseValueAccessor for MetaPeerClient {
61 async fn lease_values(
62 &self,
63 lease_value_type: LeaseValueType,
64 ) -> Result<Vec<(u64, LeaseValue)>> {
65 let prefix = match lease_value_type {
66 LeaseValueType::Flownode => FlownodeLeaseKey::prefix_key_by_cluster(),
67 LeaseValueType::Datanode => DatanodeLeaseKey::prefix_key(),
68 };
69 let range_request = RangeRequest::new().with_prefix(prefix);
70 let response = self.range(range_request).await?;
71 response
72 .kvs
73 .into_iter()
74 .map(|kv| {
75 let (lease_key, lease_value) = decoder(lease_value_type, kv)?;
76 Ok((lease_key, lease_value))
77 })
78 .collect::<Result<Vec<_>>>()
79 }
80
81 async fn lease_value(
82 &self,
83 lease_value_type: LeaseValueType,
84 node_id: u64,
85 ) -> Result<Option<(u64, LeaseValue)>> {
86 let key: Vec<u8> = match lease_value_type {
87 LeaseValueType::Flownode => FlownodeLeaseKey { node_id }.try_into()?,
88 LeaseValueType::Datanode => DatanodeLeaseKey { node_id }.try_into()?,
89 };
90
91 let response = self.get(&key).await?;
92 response.map(|kv| decoder(lease_value_type, kv)).transpose()
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use std::time::Duration;
99
100 use api::v1::meta::DatanodeWorkloads;
101 use api::v1::meta::heartbeat_request::NodeWorkloads;
102 use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role};
103 use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
104 use common_meta::kv_backend::ResettableKvBackendRef;
105 use common_meta::peer::{Peer, PeerDiscovery};
106 use common_meta::rpc::store::PutRequest;
107 use common_time::util::current_time_millis;
108 use common_workload::DatanodeWorkloadType;
109
110 use crate::discovery::utils::{self, accept_ingest_workload};
111 use crate::key::{DatanodeLeaseKey, LeaseValue};
112 use crate::test_util::create_meta_peer_client;
113
114 async fn put_lease_value(
115 kv_backend: &ResettableKvBackendRef,
116 key: DatanodeLeaseKey,
117 value: LeaseValue,
118 ) {
119 kv_backend
120 .put(PutRequest {
121 key: key.try_into().unwrap(),
122 value: value.try_into().unwrap(),
123 prev_kv: false,
124 })
125 .await
126 .unwrap();
127 }
128
129 #[tokio::test]
130 async fn test_alive_datanodes() {
131 let client = create_meta_peer_client();
132 let in_memory = client.memory_backend();
133 let lease_secs = 10;
134
135 let key = DatanodeLeaseKey { node_id: 1 };
137 let value = LeaseValue {
138 timestamp_millis: current_time_millis() - lease_secs * 2 * 1000,
140 node_addr: "127.0.0.1:20201".to_string(),
141 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
142 types: vec![DatanodeWorkloadType::Hybrid as i32],
143 }),
144 };
145 put_lease_value(&in_memory, key, value).await;
146
147 let key = DatanodeLeaseKey { node_id: 2 };
149 let value = LeaseValue {
150 timestamp_millis: current_time_millis(),
151 node_addr: "127.0.0.1:20202".to_string(),
152 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
153 types: vec![DatanodeWorkloadType::Hybrid as i32],
154 }),
155 };
156 put_lease_value(&in_memory, key.clone(), value.clone()).await;
157 let peers = utils::alive_datanodes(
158 client.as_ref(),
159 Duration::from_secs(lease_secs as u64),
160 None,
161 )
162 .await
163 .unwrap();
164 assert_eq!(peers.len(), 1);
165 assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
166 }
167
168 #[tokio::test]
169 async fn test_alive_datanodes_with_condition() {
170 let client = create_meta_peer_client();
171 let in_memory = client.memory_backend();
172 let lease_secs = 10;
173
174 let key = DatanodeLeaseKey { node_id: 1 };
176 let value = LeaseValue {
177 timestamp_millis: current_time_millis() - 20 * 1000,
179 node_addr: "127.0.0.1:20201".to_string(),
180 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
181 types: vec![DatanodeWorkloadType::Hybrid as i32],
182 }),
183 };
184 put_lease_value(&in_memory, key, value).await;
185
186 let key = DatanodeLeaseKey { node_id: 2 };
188 let value = LeaseValue {
189 timestamp_millis: current_time_millis(),
190 node_addr: "127.0.0.1:20202".to_string(),
191 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
192 types: vec![DatanodeWorkloadType::Hybrid as i32],
193 }),
194 };
195 put_lease_value(&in_memory, key, value).await;
196
197 let key = DatanodeLeaseKey { node_id: 3 };
199 let value = LeaseValue {
200 timestamp_millis: current_time_millis(),
201 node_addr: "127.0.0.1:20203".to_string(),
202 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
203 types: vec![i32::MAX],
204 }),
205 };
206 put_lease_value(&in_memory, key, value).await;
207
208 let key = DatanodeLeaseKey { node_id: 4 };
210 let value = LeaseValue {
211 timestamp_millis: current_time_millis(),
212 node_addr: "127.0.0.1:20204".to_string(),
213 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
214 types: vec![i32::MAX],
215 }),
216 };
217 put_lease_value(&in_memory, key, value).await;
218
219 let peers = utils::alive_datanodes(
220 client.as_ref(),
221 Duration::from_secs(lease_secs),
222 Some(accept_ingest_workload),
223 )
224 .await
225 .unwrap();
226 assert_eq!(peers.len(), 1);
227 assert!(peers.contains(&Peer::new(2, "127.0.0.1:20202".to_string())));
228 }
229
230 #[tokio::test]
231 async fn test_lookup_frontends() {
232 let client = create_meta_peer_client();
233 let in_memory = client.memory_backend();
234 let lease_secs = 10;
235
236 let active_frontend_node = NodeInfo {
237 peer: Peer {
238 id: 0,
239 addr: "127.0.0.1:20201".to_string(),
240 },
241 last_activity_ts: current_time_millis(),
242 status: NodeStatus::Frontend(FrontendStatus {}),
243 version: "1.0.0".to_string(),
244 git_commit: "1234567890".to_string(),
245 start_time_ms: current_time_millis() as u64,
246 cpus: 0,
247 memory_bytes: 0,
248 };
249
250 let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
251
252 in_memory
253 .put(PutRequest {
254 key: format!("{}{}", key_prefix, "0").into(),
255 value: active_frontend_node.try_into().unwrap(),
256 prev_kv: false,
257 })
258 .await
259 .unwrap();
260
261 let inactive_frontend_node = NodeInfo {
262 peer: Peer {
263 id: 1,
264 addr: "127.0.0.1:20201".to_string(),
265 },
266 last_activity_ts: current_time_millis() - 20 * 1000,
267 status: NodeStatus::Frontend(FrontendStatus {}),
268 version: "1.0.0".to_string(),
269 git_commit: "1234567890".to_string(),
270 start_time_ms: current_time_millis() as u64,
271 cpus: 0,
272 memory_bytes: 0,
273 };
274
275 in_memory
276 .put(PutRequest {
277 key: format!("{}{}", key_prefix, "1").into(),
278 value: inactive_frontend_node.try_into().unwrap(),
279 prev_kv: false,
280 })
281 .await
282 .unwrap();
283
284 let peers = utils::alive_frontends(client.as_ref(), Duration::from_secs(lease_secs))
285 .await
286 .unwrap();
287 assert_eq!(peers.len(), 1);
288 assert_eq!(peers[0].id, 0);
289 }
290
291 #[tokio::test]
292 async fn test_no_active_frontends() {
293 let client = create_meta_peer_client();
294 let in_memory = client.memory_backend();
295
296 let last_activity_ts =
297 current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000;
298 let active_frontend_node = NodeInfo {
299 peer: Peer {
300 id: 0,
301 addr: "127.0.0.1:20201".to_string(),
302 },
303 last_activity_ts,
304 status: NodeStatus::Frontend(FrontendStatus {}),
305 version: "1.0.0".to_string(),
306 git_commit: "1234567890".to_string(),
307 start_time_ms: last_activity_ts as u64,
308 cpus: 0,
309 memory_bytes: 0,
310 };
311
312 let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
313
314 in_memory
315 .put(PutRequest {
316 key: format!("{}{}", key_prefix, "0").into(),
317 value: active_frontend_node.try_into().unwrap(),
318 prev_kv: false,
319 })
320 .await
321 .unwrap();
322
323 let peers = client.active_frontends().await.unwrap();
324 assert_eq!(peers.len(), 0);
325 }
326}