1use common_meta::kv_backend::KvBackend;
16use common_meta::rpc::KeyValue;
17use common_meta::rpc::store::RangeRequest;
18
19use crate::cluster::MetaPeerClient;
20use crate::error::Result;
21use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
22
23#[derive(Clone, Copy)]
24pub enum LeaseValueType {
25 Flownode,
26 Datanode,
27}
28
29#[async_trait::async_trait]
30pub trait LeaseValueAccessor: Send + Sync {
31 async fn lease_values(
33 &self,
34 lease_value_type: LeaseValueType,
35 ) -> Result<Vec<(u64, LeaseValue)>>;
36
37 async fn lease_value(
38 &self,
39 lease_value_type: LeaseValueType,
40 node_id: u64,
41 ) -> Result<Option<(u64, LeaseValue)>>;
42}
43
44fn decoder(lease_value_type: LeaseValueType, kv: KeyValue) -> Result<(u64, LeaseValue)> {
45 match lease_value_type {
46 LeaseValueType::Flownode => {
47 let lease_key: FlownodeLeaseKey = kv.key.try_into()?;
48 let lease_value: LeaseValue = kv.value.try_into()?;
49 Ok((lease_key.node_id, lease_value))
50 }
51 LeaseValueType::Datanode => {
52 let lease_key: DatanodeLeaseKey = kv.key.try_into()?;
53 let lease_value: LeaseValue = kv.value.try_into()?;
54 Ok((lease_key.node_id, lease_value))
55 }
56 }
57}
58
59#[async_trait::async_trait]
60impl LeaseValueAccessor for MetaPeerClient {
61 async fn lease_values(
62 &self,
63 lease_value_type: LeaseValueType,
64 ) -> Result<Vec<(u64, LeaseValue)>> {
65 let prefix = match lease_value_type {
66 LeaseValueType::Flownode => FlownodeLeaseKey::prefix_key_by_cluster(),
67 LeaseValueType::Datanode => DatanodeLeaseKey::prefix_key(),
68 };
69 let range_request = RangeRequest::new().with_prefix(prefix);
70 let response = self.range(range_request).await?;
71 response
72 .kvs
73 .into_iter()
74 .map(|kv| {
75 let (lease_key, lease_value) = decoder(lease_value_type, kv)?;
76 Ok((lease_key, lease_value))
77 })
78 .collect::<Result<Vec<_>>>()
79 }
80
81 async fn lease_value(
82 &self,
83 lease_value_type: LeaseValueType,
84 node_id: u64,
85 ) -> Result<Option<(u64, LeaseValue)>> {
86 let key: Vec<u8> = match lease_value_type {
87 LeaseValueType::Flownode => FlownodeLeaseKey { node_id }.try_into()?,
88 LeaseValueType::Datanode => DatanodeLeaseKey { node_id }.try_into()?,
89 };
90
91 let response = self.get(&key).await?;
92 response.map(|kv| decoder(lease_value_type, kv)).transpose()
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use std::sync::Arc;
99 use std::sync::atomic::{AtomicI64, Ordering};
100 use std::time::Duration;
101
102 use api::v1::meta::heartbeat_request::NodeWorkloads;
103 use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads};
104 use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role};
105 use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
106 use common_meta::kv_backend::ResettableKvBackendRef;
107 use common_meta::peer::{Peer, PeerDiscovery};
108 use common_meta::rpc::store::PutRequest;
109 use common_time::util::{DefaultSystemTimer, SystemTimer, current_time_millis};
110 use common_workload::DatanodeWorkloadType;
111
112 use crate::discovery::utils::{self, accept_ingest_workload};
113 use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
114 use crate::test_util::create_meta_peer_client;
115
116 async fn put_lease_value(
117 kv_backend: &ResettableKvBackendRef,
118 key: DatanodeLeaseKey,
119 value: LeaseValue,
120 ) {
121 kv_backend
122 .put(PutRequest {
123 key: key.try_into().unwrap(),
124 value: value.try_into().unwrap(),
125 prev_kv: false,
126 })
127 .await
128 .unwrap();
129 }
130
131 async fn put_flownode_lease_value(
132 kv_backend: &ResettableKvBackendRef,
133 key: FlownodeLeaseKey,
134 value: LeaseValue,
135 ) {
136 kv_backend
137 .put(PutRequest {
138 key: key.try_into().unwrap(),
139 value: value.try_into().unwrap(),
140 prev_kv: false,
141 })
142 .await
143 .unwrap();
144 }
145
146 struct MockTimer {
147 current: Arc<AtomicI64>,
148 }
149
150 impl SystemTimer for MockTimer {
151 fn current_time_millis(&self) -> i64 {
152 self.current.fetch_add(1, Ordering::Relaxed)
153 }
154
155 fn current_time_rfc3339(&self) -> String {
156 unimplemented!()
157 }
158 }
159
160 #[tokio::test]
161 async fn test_alive_datanodes() {
162 let client = create_meta_peer_client();
163 let in_memory = client.memory_backend();
164 let lease_secs = 10;
165 let timer = DefaultSystemTimer;
166
167 let key = DatanodeLeaseKey { node_id: 1 };
169 let value = LeaseValue {
170 timestamp_millis: timer.current_time_millis() - lease_secs * 2 * 1000,
172 node_addr: "127.0.0.1:20201".to_string(),
173 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
174 types: vec![DatanodeWorkloadType::Hybrid as i32],
175 }),
176 };
177 put_lease_value(&in_memory, key, value).await;
178
179 let key = DatanodeLeaseKey { node_id: 2 };
181 let value = LeaseValue {
182 timestamp_millis: timer.current_time_millis(),
183 node_addr: "127.0.0.1:20202".to_string(),
184 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
185 types: vec![DatanodeWorkloadType::Hybrid as i32],
186 }),
187 };
188 put_lease_value(&in_memory, key.clone(), value.clone()).await;
189 let peers = utils::alive_datanodes(
190 &timer,
191 client.as_ref(),
192 Duration::from_secs(lease_secs as u64),
193 None,
194 )
195 .await
196 .unwrap();
197 assert_eq!(peers.len(), 1);
198 assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
199 }
200
201 #[tokio::test]
202 async fn test_alive_datanodes_with_timer() {
203 let client = create_meta_peer_client();
204 let in_memory = client.memory_backend();
205 let lease_secs = 10;
206 let timer = MockTimer {
207 current: Arc::new(AtomicI64::new(current_time_millis())),
208 };
209
210 let key = DatanodeLeaseKey { node_id: 2 };
211 let value = LeaseValue {
212 timestamp_millis: timer.current_time_millis(),
213 node_addr: "127.0.0.1:20202".to_string(),
214 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
215 types: vec![DatanodeWorkloadType::Hybrid as i32],
216 }),
217 };
218 put_lease_value(&in_memory, key.clone(), value.clone()).await;
219 let peers = utils::alive_datanodes(
220 &timer,
221 client.as_ref(),
222 Duration::from_secs(lease_secs as u64),
223 None,
224 )
225 .await
226 .unwrap();
227 assert_eq!(peers.len(), 1);
228 assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
229 }
230
231 #[tokio::test]
232 async fn test_alive_datanodes_with_condition() {
233 let client = create_meta_peer_client();
234 let in_memory = client.memory_backend();
235 let lease_secs = 10;
236 let timer = DefaultSystemTimer;
237
238 let key = DatanodeLeaseKey { node_id: 1 };
240 let value = LeaseValue {
241 timestamp_millis: timer.current_time_millis() - 20 * 1000,
243 node_addr: "127.0.0.1:20201".to_string(),
244 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
245 types: vec![DatanodeWorkloadType::Hybrid as i32],
246 }),
247 };
248 put_lease_value(&in_memory, key, value).await;
249
250 let key = DatanodeLeaseKey { node_id: 2 };
252 let value = LeaseValue {
253 timestamp_millis: timer.current_time_millis(),
254 node_addr: "127.0.0.1:20202".to_string(),
255 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
256 types: vec![DatanodeWorkloadType::Hybrid as i32],
257 }),
258 };
259 put_lease_value(&in_memory, key, value).await;
260
261 let key = DatanodeLeaseKey { node_id: 3 };
263 let value = LeaseValue {
264 timestamp_millis: timer.current_time_millis(),
265 node_addr: "127.0.0.1:20203".to_string(),
266 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
267 types: vec![i32::MAX],
268 }),
269 };
270 put_lease_value(&in_memory, key, value).await;
271
272 let key = DatanodeLeaseKey { node_id: 4 };
274 let value = LeaseValue {
275 timestamp_millis: timer.current_time_millis(),
276 node_addr: "127.0.0.1:20204".to_string(),
277 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
278 types: vec![i32::MAX],
279 }),
280 };
281 put_lease_value(&in_memory, key, value).await;
282
283 let peers = utils::alive_datanodes(
284 &timer,
285 client.as_ref(),
286 Duration::from_secs(lease_secs),
287 Some(accept_ingest_workload),
288 )
289 .await
290 .unwrap();
291 assert_eq!(peers.len(), 1);
292 assert!(peers.contains(&Peer::new(2, "127.0.0.1:20202".to_string())));
293 }
294
295 #[tokio::test]
296 async fn test_alive_flownodes() {
297 let client = create_meta_peer_client();
298 let in_memory = client.memory_backend();
299 let lease_secs = 10;
300 let timer = DefaultSystemTimer;
301
302 let key = FlownodeLeaseKey { node_id: 1 };
304 let value = LeaseValue {
305 timestamp_millis: timer.current_time_millis() - lease_secs * 2 * 1000,
307 node_addr: "127.0.0.1:20201".to_string(),
308 workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
309 };
310 put_flownode_lease_value(&in_memory, key, value).await;
311
312 let key = FlownodeLeaseKey { node_id: 2 };
314 let value = LeaseValue {
315 timestamp_millis: timer.current_time_millis(),
316 node_addr: "127.0.0.1:20202".to_string(),
317 workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
318 };
319 put_flownode_lease_value(&in_memory, key.clone(), value.clone()).await;
320 let peers = utils::alive_flownodes(
321 &timer,
322 client.as_ref(),
323 Duration::from_secs(lease_secs as u64),
324 None,
325 )
326 .await
327 .unwrap();
328 assert_eq!(peers.len(), 1);
329 assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
330 }
331
332 #[tokio::test]
333 async fn test_alive_flownodes_with_timer() {
334 let client = create_meta_peer_client();
335 let in_memory = client.memory_backend();
336 let lease_secs = 10;
337 let timer = MockTimer {
338 current: Arc::new(AtomicI64::new(current_time_millis())),
339 };
340
341 let key = FlownodeLeaseKey { node_id: 2 };
342 let value = LeaseValue {
343 timestamp_millis: timer.current_time_millis(),
344 node_addr: "127.0.0.1:20202".to_string(),
345 workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
346 };
347 put_flownode_lease_value(&in_memory, key.clone(), value.clone()).await;
348 let peers = utils::alive_flownodes(
349 &timer,
350 client.as_ref(),
351 Duration::from_secs(lease_secs as u64),
352 None,
353 )
354 .await
355 .unwrap();
356 assert_eq!(peers.len(), 1);
357 assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
358 }
359
360 #[tokio::test]
361 async fn test_lookup_frontends() {
362 let client = create_meta_peer_client();
363 let in_memory = client.memory_backend();
364 let lease_secs = 10;
365 let timer = DefaultSystemTimer;
366
367 let active_frontend_node = NodeInfo {
368 peer: Peer {
369 id: 0,
370 addr: "127.0.0.1:20201".to_string(),
371 },
372 last_activity_ts: timer.current_time_millis(),
373 status: NodeStatus::Frontend(FrontendStatus {}),
374 version: "1.0.0".to_string(),
375 git_commit: "1234567890".to_string(),
376 start_time_ms: current_time_millis() as u64,
377 total_cpu_millicores: 0,
378 total_memory_bytes: 0,
379 cpu_usage_millicores: 0,
380 memory_usage_bytes: 0,
381 hostname: "test_hostname".to_string(),
382 };
383
384 let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
385
386 in_memory
387 .put(PutRequest {
388 key: format!("{}{}", key_prefix, "0").into(),
389 value: active_frontend_node.try_into().unwrap(),
390 prev_kv: false,
391 })
392 .await
393 .unwrap();
394
395 let inactive_frontend_node = NodeInfo {
396 peer: Peer {
397 id: 1,
398 addr: "127.0.0.1:20201".to_string(),
399 },
400 last_activity_ts: timer.current_time_millis() - 20 * 1000,
401 status: NodeStatus::Frontend(FrontendStatus {}),
402 version: "1.0.0".to_string(),
403 git_commit: "1234567890".to_string(),
404 start_time_ms: current_time_millis() as u64,
405 total_cpu_millicores: 0,
406 total_memory_bytes: 0,
407 cpu_usage_millicores: 0,
408 memory_usage_bytes: 0,
409 hostname: "test_hostname".to_string(),
410 };
411
412 in_memory
413 .put(PutRequest {
414 key: format!("{}{}", key_prefix, "1").into(),
415 value: inactive_frontend_node.try_into().unwrap(),
416 prev_kv: false,
417 })
418 .await
419 .unwrap();
420
421 let peers =
422 utils::alive_frontends(&timer, client.as_ref(), Duration::from_secs(lease_secs))
423 .await
424 .unwrap();
425 assert_eq!(peers.len(), 1);
426 assert_eq!(peers[0].id, 0);
427 }
428
429 #[tokio::test]
430 async fn test_lookup_frontends_with_timer() {
431 let client = create_meta_peer_client();
432 let in_memory = client.memory_backend();
433 let lease_secs = 10;
434 let timer = MockTimer {
435 current: Arc::new(AtomicI64::new(current_time_millis())),
436 };
437
438 let active_frontend_node = NodeInfo {
439 peer: Peer {
440 id: 0,
441 addr: "127.0.0.1:20201".to_string(),
442 },
443 last_activity_ts: timer.current_time_millis(),
444 status: NodeStatus::Frontend(FrontendStatus {}),
445 version: "1.0.0".to_string(),
446 git_commit: "1234567890".to_string(),
447 start_time_ms: current_time_millis() as u64,
448 total_cpu_millicores: 0,
449 total_memory_bytes: 0,
450 cpu_usage_millicores: 0,
451 memory_usage_bytes: 0,
452 hostname: "test_hostname".to_string(),
453 };
454 let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
455 in_memory
456 .put(PutRequest {
457 key: format!("{}{}", key_prefix, "0").into(),
458 value: active_frontend_node.try_into().unwrap(),
459 prev_kv: false,
460 })
461 .await
462 .unwrap();
463 let peers =
464 utils::alive_frontends(&timer, client.as_ref(), Duration::from_secs(lease_secs))
465 .await
466 .unwrap();
467 assert_eq!(peers.len(), 1);
468 assert_eq!(peers[0].id, 0);
469 }
470
471 #[tokio::test]
472 async fn test_no_active_frontends() {
473 let client = create_meta_peer_client();
474 let in_memory = client.memory_backend();
475
476 let last_activity_ts =
477 current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000;
478 let active_frontend_node = NodeInfo {
479 peer: Peer {
480 id: 0,
481 addr: "127.0.0.1:20201".to_string(),
482 },
483 last_activity_ts,
484 status: NodeStatus::Frontend(FrontendStatus {}),
485 version: "1.0.0".to_string(),
486 git_commit: "1234567890".to_string(),
487 start_time_ms: last_activity_ts as u64,
488 total_cpu_millicores: 0,
489 total_memory_bytes: 0,
490 cpu_usage_millicores: 0,
491 memory_usage_bytes: 0,
492 hostname: "test_hostname".to_string(),
493 };
494
495 let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
496
497 in_memory
498 .put(PutRequest {
499 key: format!("{}{}", key_prefix, "0").into(),
500 value: active_frontend_node.try_into().unwrap(),
501 prev_kv: false,
502 })
503 .await
504 .unwrap();
505
506 let peers = client.active_frontends().await.unwrap();
507 assert_eq!(peers.len(), 0);
508 }
509}