1use std::collections::HashMap;
16use std::future::Future;
17use std::hash::Hash;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20use std::time::Duration;
21
22use api::v1::meta::heartbeat_request::NodeWorkloads;
23use common_error::ext::BoxedError;
24use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole};
25use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
26use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
27use common_meta::peer::{Peer, PeerLookupService};
28use common_meta::rpc::store::RangeRequest;
29use common_meta::{util, DatanodeId, FlownodeId};
30use common_time::util as time_util;
31use common_workload::DatanodeWorkloadType;
32use snafu::ResultExt;
33
34use crate::cluster::MetaPeerClientRef;
35use crate::error::{Error, KvBackendSnafu, Result};
36use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
37
38enum Value<'a> {
39 LeaseValue(&'a LeaseValue),
40 NodeInfo(&'a NodeInfo),
41}
42
43fn build_lease_filter(lease: Duration) -> impl Fn(Value) -> bool {
44 move |value: Value| {
45 let active_time = match value {
46 Value::LeaseValue(lease_value) => lease_value.timestamp_millis,
47 Value::NodeInfo(node_info) => node_info.last_activity_ts,
48 };
49
50 ((time_util::current_time_millis() - active_time) as u64) < lease.as_millis() as u64
51 }
52}
53
54pub fn is_datanode_accept_ingest_workload(lease_value: &LeaseValue) -> bool {
60 match &lease_value.workloads {
61 NodeWorkloads::Datanode(workloads) => workloads
62 .types
63 .iter()
64 .filter_map(|w| DatanodeWorkloadType::from_i32(*w))
65 .any(|w| w.accept_ingest()),
66 _ => false,
67 }
68}
69
70pub async fn find_datanode_lease_value(
72 datanode_id: DatanodeId,
73 in_memory_key: &ResettableKvBackendRef,
74) -> Result<Option<LeaseValue>> {
75 let lease_key = DatanodeLeaseKey {
76 node_id: datanode_id,
77 };
78 let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
79 let Some(kv) = in_memory_key
80 .get(&lease_key_bytes)
81 .await
82 .context(KvBackendSnafu)?
83 else {
84 return Ok(None);
85 };
86
87 let lease_value: LeaseValue = kv.value.try_into()?;
88
89 Ok(Some(lease_value))
90}
91
92pub async fn lookup_datanode_peer(
94 datanode_id: DatanodeId,
95 meta_peer_client: &MetaPeerClientRef,
96 lease: Duration,
97) -> Result<Option<Peer>> {
98 let lease_filter = build_lease_filter(lease);
99 let lease_key = DatanodeLeaseKey {
100 node_id: datanode_id,
101 };
102 let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
103 let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
104 return Ok(None);
105 };
106 let lease_value: LeaseValue = kv.value.try_into()?;
107 let is_alive = lease_filter(Value::LeaseValue(&lease_value));
108 if is_alive {
109 Ok(Some(Peer {
110 id: lease_key.node_id,
111 addr: lease_value.node_addr,
112 }))
113 } else {
114 Ok(None)
115 }
116}
117
118type LeaseFilterFuture<'a, K> =
119 Pin<Box<dyn Future<Output = Result<HashMap<K, LeaseValue>>> + Send + 'a>>;
120
121pub struct LeaseFilter<'a, K>
122where
123 K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
124{
125 lease: Duration,
126 key_prefix: Vec<u8>,
127 meta_peer_client: &'a MetaPeerClientRef,
128 condition: Option<fn(&LeaseValue) -> bool>,
129 inner_future: Option<LeaseFilterFuture<'a, K>>,
130}
131
132impl<'a, K> LeaseFilter<'a, K>
133where
134 K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
135{
136 pub fn new(
137 lease: Duration,
138 key_prefix: Vec<u8>,
139 meta_peer_client: &'a MetaPeerClientRef,
140 ) -> Self {
141 Self {
142 lease,
143 key_prefix,
144 meta_peer_client,
145 condition: None,
146 inner_future: None,
147 }
148 }
149
150 pub fn with_condition(mut self, condition: fn(&LeaseValue) -> bool) -> Self {
152 self.condition = Some(condition);
153 self
154 }
155}
156
157impl<'a, K> Future for LeaseFilter<'a, K>
158where
159 K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
160{
161 type Output = Result<HashMap<K, LeaseValue>>;
162
163 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
164 let this = self.get_mut();
165
166 if this.inner_future.is_none() {
167 let lease_filter = build_lease_filter(this.lease);
168 let condition = this.condition;
169 let key_prefix = std::mem::take(&mut this.key_prefix);
170 let fut = filter(key_prefix, this.meta_peer_client, move |v| {
171 lease_filter(Value::LeaseValue(v)) && condition.unwrap_or(|_| true)(v)
172 });
173
174 this.inner_future = Some(Box::pin(fut));
175 }
176
177 let fut = this.inner_future.as_mut().unwrap();
178 let result = futures::ready!(fut.as_mut().poll(cx))?;
179
180 Poll::Ready(Ok(result))
181 }
182}
183
184pub fn alive_datanodes(
186 meta_peer_client: &MetaPeerClientRef,
187 lease: Duration,
188) -> LeaseFilter<'_, DatanodeLeaseKey> {
189 LeaseFilter::new(lease, DatanodeLeaseKey::prefix_key(), meta_peer_client)
190}
191
192pub async fn lookup_flownode_peer(
194 flownode_id: FlownodeId,
195 meta_peer_client: &MetaPeerClientRef,
196 lease: Duration,
197) -> Result<Option<Peer>> {
198 let lease_filter = build_lease_filter(lease);
199 let lease_key = FlownodeLeaseKey {
200 node_id: flownode_id,
201 };
202 let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
203 let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
204 return Ok(None);
205 };
206 let lease_value: LeaseValue = kv.value.try_into()?;
207
208 let is_alive = lease_filter(Value::LeaseValue(&lease_value));
209 if is_alive {
210 Ok(Some(Peer {
211 id: lease_key.node_id,
212 addr: lease_value.node_addr,
213 }))
214 } else {
215 Ok(None)
216 }
217}
218
219pub async fn lookup_frontends(
221 meta_peer_client: &MetaPeerClientRef,
222 lease: Duration,
223) -> Result<Vec<Peer>> {
224 let range_request =
225 RangeRequest::new().with_prefix(NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend));
226
227 let response = meta_peer_client.range(range_request).await?;
228 let lease_filter = build_lease_filter(lease);
229
230 let mut peers = Vec::with_capacity(response.kvs.len());
231 for kv in response.kvs {
232 let node_info = NodeInfo::try_from(kv.value).context(KvBackendSnafu)?;
233 let is_alive = lease_filter(Value::NodeInfo(&node_info));
234 if is_alive {
235 peers.push(node_info.peer);
236 }
237 }
238
239 Ok(peers)
240}
241
242pub fn alive_flownodes(
244 meta_peer_client: &MetaPeerClientRef,
245 lease: Duration,
246) -> LeaseFilter<'_, FlownodeLeaseKey> {
247 LeaseFilter::new(
248 lease,
249 FlownodeLeaseKey::prefix_key_by_cluster(),
250 meta_peer_client,
251 )
252}
253
254pub async fn filter<P, K>(
255 key: Vec<u8>,
256 meta_peer_client: &MetaPeerClientRef,
257 predicate: P,
258) -> Result<HashMap<K, LeaseValue>>
259where
260 P: Fn(&LeaseValue) -> bool,
261 K: Eq + Hash + TryFrom<Vec<u8>, Error = Error>,
262{
263 let range_end = util::get_prefix_end_key(&key);
264 let range_req = common_meta::rpc::store::RangeRequest {
265 key,
266 range_end,
267 keys_only: false,
268 ..Default::default()
269 };
270 let kvs = meta_peer_client.range(range_req).await?.kvs;
271 let mut lease_kvs = HashMap::new();
272 for kv in kvs {
273 let lease_key: K = kv.key.try_into()?;
274 let lease_value: LeaseValue = kv.value.try_into()?;
275 if !predicate(&lease_value) {
276 continue;
277 }
278 let _ = lease_kvs.insert(lease_key, lease_value);
279 }
280
281 Ok(lease_kvs)
282}
283
284#[derive(Clone)]
285pub struct MetaPeerLookupService {
286 pub meta_peer_client: MetaPeerClientRef,
287}
288
289impl MetaPeerLookupService {
290 pub fn new(meta_peer_client: MetaPeerClientRef) -> Self {
291 Self { meta_peer_client }
292 }
293}
294
295#[async_trait::async_trait]
296impl PeerLookupService for MetaPeerLookupService {
297 async fn datanode(&self, id: DatanodeId) -> common_meta::error::Result<Option<Peer>> {
298 lookup_datanode_peer(id, &self.meta_peer_client, Duration::from_secs(u64::MAX))
299 .await
300 .map_err(BoxedError::new)
301 .context(common_meta::error::ExternalSnafu)
302 }
303
304 async fn flownode(&self, id: FlownodeId) -> common_meta::error::Result<Option<Peer>> {
305 lookup_flownode_peer(id, &self.meta_peer_client, Duration::from_secs(u64::MAX))
306 .await
307 .map_err(BoxedError::new)
308 .context(common_meta::error::ExternalSnafu)
309 }
310
311 async fn active_frontends(&self) -> common_meta::error::Result<Vec<Peer>> {
312 lookup_frontends(
314 &self.meta_peer_client,
315 Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS),
317 )
318 .await
319 .map_err(BoxedError::new)
320 .context(common_meta::error::ExternalSnafu)
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use std::time::Duration;
327
328 use api::v1::meta::heartbeat_request::NodeWorkloads;
329 use api::v1::meta::DatanodeWorkloads;
330 use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus};
331 use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
332 use common_meta::kv_backend::ResettableKvBackendRef;
333 use common_meta::peer::{Peer, PeerLookupService};
334 use common_meta::rpc::store::PutRequest;
335 use common_time::util::current_time_millis;
336 use common_workload::DatanodeWorkloadType;
337
338 use crate::key::{DatanodeLeaseKey, LeaseValue};
339 use crate::lease::{
340 alive_datanodes, is_datanode_accept_ingest_workload, lookup_frontends, ClusterRole,
341 MetaPeerLookupService,
342 };
343 use crate::test_util::create_meta_peer_client;
344
345 async fn put_lease_value(
346 kv_backend: &ResettableKvBackendRef,
347 key: DatanodeLeaseKey,
348 value: LeaseValue,
349 ) {
350 kv_backend
351 .put(PutRequest {
352 key: key.try_into().unwrap(),
353 value: value.try_into().unwrap(),
354 prev_kv: false,
355 })
356 .await
357 .unwrap();
358 }
359
360 #[tokio::test]
361 async fn test_alive_datanodes() {
362 let client = create_meta_peer_client();
363 let in_memory = client.memory_backend();
364 let lease_secs = 10;
365
366 let key = DatanodeLeaseKey { node_id: 1 };
368 let value = LeaseValue {
369 timestamp_millis: current_time_millis() - lease_secs * 2 * 1000,
371 node_addr: "127.0.0.1:20201".to_string(),
372 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
373 types: vec![DatanodeWorkloadType::Hybrid as i32],
374 }),
375 };
376 put_lease_value(&in_memory, key, value).await;
377
378 let key = DatanodeLeaseKey { node_id: 2 };
380 let value = LeaseValue {
381 timestamp_millis: current_time_millis(),
382 node_addr: "127.0.0.1:20202".to_string(),
383 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
384 types: vec![DatanodeWorkloadType::Hybrid as i32],
385 }),
386 };
387 put_lease_value(&in_memory, key.clone(), value.clone()).await;
388 let leases = alive_datanodes(&client, Duration::from_secs(lease_secs as u64))
389 .await
390 .unwrap();
391 assert_eq!(leases.len(), 1);
392 assert_eq!(leases.get(&key), Some(&value));
393 }
394
395 #[tokio::test]
396 async fn test_alive_datanodes_with_condition() {
397 let client = create_meta_peer_client();
398 let in_memory = client.memory_backend();
399 let lease_secs = 10;
400
401 let key = DatanodeLeaseKey { node_id: 1 };
403 let value = LeaseValue {
404 timestamp_millis: current_time_millis() - 20 * 1000,
406 node_addr: "127.0.0.1:20201".to_string(),
407 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
408 types: vec![DatanodeWorkloadType::Hybrid as i32],
409 }),
410 };
411 put_lease_value(&in_memory, key, value).await;
412
413 let key = DatanodeLeaseKey { node_id: 2 };
415 let value = LeaseValue {
416 timestamp_millis: current_time_millis(),
417 node_addr: "127.0.0.1:20202".to_string(),
418 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
419 types: vec![DatanodeWorkloadType::Hybrid as i32],
420 }),
421 };
422 put_lease_value(&in_memory, key, value).await;
423
424 let key = DatanodeLeaseKey { node_id: 3 };
426 let value = LeaseValue {
427 timestamp_millis: current_time_millis(),
428 node_addr: "127.0.0.1:20203".to_string(),
429 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
430 types: vec![i32::MAX],
431 }),
432 };
433 put_lease_value(&in_memory, key, value).await;
434
435 let key = DatanodeLeaseKey { node_id: 4 };
437 let value = LeaseValue {
438 timestamp_millis: current_time_millis(),
439 node_addr: "127.0.0.1:20204".to_string(),
440 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
441 types: vec![i32::MAX],
442 }),
443 };
444 put_lease_value(&in_memory, key, value).await;
445
446 let leases = alive_datanodes(&client, Duration::from_secs(lease_secs))
447 .with_condition(is_datanode_accept_ingest_workload)
448 .await
449 .unwrap();
450 assert_eq!(leases.len(), 1);
451 assert!(leases.contains_key(&DatanodeLeaseKey { node_id: 2 }));
452 }
453
454 #[tokio::test]
455 async fn test_lookup_frontends() {
456 let client = create_meta_peer_client();
457 let in_memory = client.memory_backend();
458 let lease_secs = 10;
459
460 let active_frontend_node = NodeInfo {
461 peer: Peer {
462 id: 0,
463 addr: "127.0.0.1:20201".to_string(),
464 },
465 last_activity_ts: current_time_millis(),
466 status: NodeStatus::Frontend(FrontendStatus {}),
467 version: "1.0.0".to_string(),
468 git_commit: "1234567890".to_string(),
469 start_time_ms: current_time_millis() as u64,
470 };
471
472 let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend);
473
474 in_memory
475 .put(PutRequest {
476 key: format!("{}{}", key_prefix, "0").into(),
477 value: active_frontend_node.try_into().unwrap(),
478 prev_kv: false,
479 })
480 .await
481 .unwrap();
482
483 let inactive_frontend_node = NodeInfo {
484 peer: Peer {
485 id: 1,
486 addr: "127.0.0.1:20201".to_string(),
487 },
488 last_activity_ts: current_time_millis() - 20 * 1000,
489 status: NodeStatus::Frontend(FrontendStatus {}),
490 version: "1.0.0".to_string(),
491 git_commit: "1234567890".to_string(),
492 start_time_ms: current_time_millis() as u64,
493 };
494
495 in_memory
496 .put(PutRequest {
497 key: format!("{}{}", key_prefix, "1").into(),
498 value: inactive_frontend_node.try_into().unwrap(),
499 prev_kv: false,
500 })
501 .await
502 .unwrap();
503
504 let peers = lookup_frontends(&client, Duration::from_secs(lease_secs))
505 .await
506 .unwrap();
507
508 assert_eq!(peers.len(), 1);
509 assert_eq!(peers[0].id, 0);
510 }
511
512 #[tokio::test]
513 async fn test_no_active_frontends() {
514 let client = create_meta_peer_client();
515 let in_memory = client.memory_backend();
516
517 let last_activity_ts =
518 current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000;
519 let active_frontend_node = NodeInfo {
520 peer: Peer {
521 id: 0,
522 addr: "127.0.0.1:20201".to_string(),
523 },
524 last_activity_ts,
525 status: NodeStatus::Frontend(FrontendStatus {}),
526 version: "1.0.0".to_string(),
527 git_commit: "1234567890".to_string(),
528 start_time_ms: last_activity_ts as u64,
529 };
530
531 let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend);
532
533 in_memory
534 .put(PutRequest {
535 key: format!("{}{}", key_prefix, "0").into(),
536 value: active_frontend_node.try_into().unwrap(),
537 prev_kv: false,
538 })
539 .await
540 .unwrap();
541
542 let service = MetaPeerLookupService::new(client);
543 let peers = service.active_frontends().await.unwrap();
544 assert_eq!(peers.len(), 0);
545 }
546}