1use std::collections::HashMap;
16use std::future::Future;
17use std::hash::Hash;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20
21use api::v1::meta::heartbeat_request::NodeWorkloads;
22use common_error::ext::BoxedError;
23use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole};
24use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
25use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
26use common_meta::peer::{Peer, PeerLookupService};
27use common_meta::rpc::store::RangeRequest;
28use common_meta::{util, DatanodeId, FlownodeId};
29use common_time::util as time_util;
30use common_workload::DatanodeWorkloadType;
31use snafu::ResultExt;
32
33use crate::cluster::MetaPeerClientRef;
34use crate::error::{Error, KvBackendSnafu, Result};
35use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
36
37enum Value<'a> {
38 LeaseValue(&'a LeaseValue),
39 NodeInfo(&'a NodeInfo),
40}
41
42fn build_lease_filter(lease_secs: u64) -> impl Fn(Value) -> bool {
43 move |value: Value| {
44 let active_time = match value {
45 Value::LeaseValue(lease_value) => lease_value.timestamp_millis,
46 Value::NodeInfo(node_info) => node_info.last_activity_ts,
47 };
48
49 ((time_util::current_time_millis() - active_time) as u64) < lease_secs.saturating_mul(1000)
50 }
51}
52
53pub fn is_datanode_accept_ingest_workload(lease_value: &LeaseValue) -> bool {
59 match &lease_value.workloads {
60 NodeWorkloads::Datanode(workloads) => workloads
61 .types
62 .iter()
63 .filter_map(|w| DatanodeWorkloadType::from_i32(*w))
64 .any(|w| w.accept_ingest()),
65 _ => false,
66 }
67}
68
69pub async fn find_datanode_lease_value(
71 datanode_id: DatanodeId,
72 in_memory_key: &ResettableKvBackendRef,
73) -> Result<Option<LeaseValue>> {
74 let lease_key = DatanodeLeaseKey {
75 node_id: datanode_id,
76 };
77 let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
78 let Some(kv) = in_memory_key
79 .get(&lease_key_bytes)
80 .await
81 .context(KvBackendSnafu)?
82 else {
83 return Ok(None);
84 };
85
86 let lease_value: LeaseValue = kv.value.try_into()?;
87
88 Ok(Some(lease_value))
89}
90
91pub async fn lookup_datanode_peer(
93 datanode_id: DatanodeId,
94 meta_peer_client: &MetaPeerClientRef,
95 lease_secs: u64,
96) -> Result<Option<Peer>> {
97 let lease_filter = build_lease_filter(lease_secs);
98 let lease_key = DatanodeLeaseKey {
99 node_id: datanode_id,
100 };
101 let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
102 let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
103 return Ok(None);
104 };
105 let lease_value: LeaseValue = kv.value.try_into()?;
106 let is_alive = lease_filter(Value::LeaseValue(&lease_value));
107 if is_alive {
108 Ok(Some(Peer {
109 id: lease_key.node_id,
110 addr: lease_value.node_addr,
111 }))
112 } else {
113 Ok(None)
114 }
115}
116
117type LeaseFilterFuture<'a, K> =
118 Pin<Box<dyn Future<Output = Result<HashMap<K, LeaseValue>>> + Send + 'a>>;
119
120pub struct LeaseFilter<'a, K>
121where
122 K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
123{
124 lease_secs: u64,
125 key_prefix: Vec<u8>,
126 meta_peer_client: &'a MetaPeerClientRef,
127 condition: Option<fn(&LeaseValue) -> bool>,
128 inner_future: Option<LeaseFilterFuture<'a, K>>,
129}
130
131impl<'a, K> LeaseFilter<'a, K>
132where
133 K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
134{
135 pub fn new(
136 lease_secs: u64,
137 key_prefix: Vec<u8>,
138 meta_peer_client: &'a MetaPeerClientRef,
139 ) -> Self {
140 Self {
141 lease_secs,
142 key_prefix,
143 meta_peer_client,
144 condition: None,
145 inner_future: None,
146 }
147 }
148
149 pub fn with_condition(mut self, condition: fn(&LeaseValue) -> bool) -> Self {
151 self.condition = Some(condition);
152 self
153 }
154}
155
156impl<'a, K> Future for LeaseFilter<'a, K>
157where
158 K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
159{
160 type Output = Result<HashMap<K, LeaseValue>>;
161
162 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
163 let this = self.get_mut();
164
165 if this.inner_future.is_none() {
166 let lease_filter = build_lease_filter(this.lease_secs);
167 let condition = this.condition;
168 let key_prefix = std::mem::take(&mut this.key_prefix);
169 let fut = filter(key_prefix, this.meta_peer_client, move |v| {
170 lease_filter(Value::LeaseValue(v)) && condition.unwrap_or(|_| true)(v)
171 });
172
173 this.inner_future = Some(Box::pin(fut));
174 }
175
176 let fut = this.inner_future.as_mut().unwrap();
177 let result = futures::ready!(fut.as_mut().poll(cx))?;
178
179 Poll::Ready(Ok(result))
180 }
181}
182
183pub fn alive_datanodes(
185 meta_peer_client: &MetaPeerClientRef,
186 lease_secs: u64,
187) -> LeaseFilter<'_, DatanodeLeaseKey> {
188 LeaseFilter::new(lease_secs, DatanodeLeaseKey::prefix_key(), meta_peer_client)
189}
190
191pub async fn lookup_flownode_peer(
193 flownode_id: FlownodeId,
194 meta_peer_client: &MetaPeerClientRef,
195 lease_secs: u64,
196) -> Result<Option<Peer>> {
197 let lease_filter = build_lease_filter(lease_secs);
198 let lease_key = FlownodeLeaseKey {
199 node_id: flownode_id,
200 };
201 let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
202 let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
203 return Ok(None);
204 };
205 let lease_value: LeaseValue = kv.value.try_into()?;
206
207 let is_alive = lease_filter(Value::LeaseValue(&lease_value));
208 if is_alive {
209 Ok(Some(Peer {
210 id: lease_key.node_id,
211 addr: lease_value.node_addr,
212 }))
213 } else {
214 Ok(None)
215 }
216}
217
218pub async fn lookup_frontends(
220 meta_peer_client: &MetaPeerClientRef,
221 lease_secs: u64,
222) -> Result<Vec<Peer>> {
223 let range_request =
224 RangeRequest::new().with_prefix(NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend));
225
226 let response = meta_peer_client.range(range_request).await?;
227 let lease_filter = build_lease_filter(lease_secs);
228
229 let mut peers = Vec::with_capacity(response.kvs.len());
230 for kv in response.kvs {
231 let node_info = NodeInfo::try_from(kv.value).context(KvBackendSnafu)?;
232 let is_alive = lease_filter(Value::NodeInfo(&node_info));
233 if is_alive {
234 peers.push(node_info.peer);
235 }
236 }
237
238 Ok(peers)
239}
240
241pub fn alive_flownodes(
243 meta_peer_client: &MetaPeerClientRef,
244 lease_secs: u64,
245) -> LeaseFilter<'_, FlownodeLeaseKey> {
246 LeaseFilter::new(
247 lease_secs,
248 FlownodeLeaseKey::prefix_key_by_cluster(),
249 meta_peer_client,
250 )
251}
252
253pub async fn filter<P, K>(
254 key: Vec<u8>,
255 meta_peer_client: &MetaPeerClientRef,
256 predicate: P,
257) -> Result<HashMap<K, LeaseValue>>
258where
259 P: Fn(&LeaseValue) -> bool,
260 K: Eq + Hash + TryFrom<Vec<u8>, Error = Error>,
261{
262 let range_end = util::get_prefix_end_key(&key);
263 let range_req = common_meta::rpc::store::RangeRequest {
264 key,
265 range_end,
266 keys_only: false,
267 ..Default::default()
268 };
269 let kvs = meta_peer_client.range(range_req).await?.kvs;
270 let mut lease_kvs = HashMap::new();
271 for kv in kvs {
272 let lease_key: K = kv.key.try_into()?;
273 let lease_value: LeaseValue = kv.value.try_into()?;
274 if !predicate(&lease_value) {
275 continue;
276 }
277 let _ = lease_kvs.insert(lease_key, lease_value);
278 }
279
280 Ok(lease_kvs)
281}
282
283#[derive(Clone)]
284pub struct MetaPeerLookupService {
285 pub meta_peer_client: MetaPeerClientRef,
286}
287
288impl MetaPeerLookupService {
289 pub fn new(meta_peer_client: MetaPeerClientRef) -> Self {
290 Self { meta_peer_client }
291 }
292}
293
294#[async_trait::async_trait]
295impl PeerLookupService for MetaPeerLookupService {
296 async fn datanode(&self, id: DatanodeId) -> common_meta::error::Result<Option<Peer>> {
297 lookup_datanode_peer(id, &self.meta_peer_client, u64::MAX)
298 .await
299 .map_err(BoxedError::new)
300 .context(common_meta::error::ExternalSnafu)
301 }
302
303 async fn flownode(&self, id: FlownodeId) -> common_meta::error::Result<Option<Peer>> {
304 lookup_flownode_peer(id, &self.meta_peer_client, u64::MAX)
305 .await
306 .map_err(BoxedError::new)
307 .context(common_meta::error::ExternalSnafu)
308 }
309
310 async fn active_frontends(&self) -> common_meta::error::Result<Vec<Peer>> {
311 lookup_frontends(
313 &self.meta_peer_client,
314 FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
316 )
317 .await
318 .map_err(BoxedError::new)
319 .context(common_meta::error::ExternalSnafu)
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use api::v1::meta::heartbeat_request::NodeWorkloads;
326 use api::v1::meta::DatanodeWorkloads;
327 use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus};
328 use common_meta::kv_backend::ResettableKvBackendRef;
329 use common_meta::peer::Peer;
330 use common_meta::rpc::store::PutRequest;
331 use common_time::util::current_time_millis;
332 use common_workload::DatanodeWorkloadType;
333
334 use crate::key::{DatanodeLeaseKey, LeaseValue};
335 use crate::lease::{
336 alive_datanodes, is_datanode_accept_ingest_workload, lookup_frontends, ClusterRole,
337 };
338 use crate::test_util::create_meta_peer_client;
339
340 async fn put_lease_value(
341 kv_backend: &ResettableKvBackendRef,
342 key: DatanodeLeaseKey,
343 value: LeaseValue,
344 ) {
345 kv_backend
346 .put(PutRequest {
347 key: key.try_into().unwrap(),
348 value: value.try_into().unwrap(),
349 prev_kv: false,
350 })
351 .await
352 .unwrap();
353 }
354
355 #[tokio::test]
356 async fn test_alive_datanodes() {
357 let client = create_meta_peer_client();
358 let in_memory = client.memory_backend();
359 let lease_secs = 10;
360
361 let key = DatanodeLeaseKey { node_id: 1 };
363 let value = LeaseValue {
364 timestamp_millis: current_time_millis() - lease_secs * 2 * 1000,
366 node_addr: "127.0.0.1:20201".to_string(),
367 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
368 types: vec![DatanodeWorkloadType::Hybrid as i32],
369 }),
370 };
371 put_lease_value(&in_memory, key, value).await;
372
373 let key = DatanodeLeaseKey { node_id: 2 };
375 let value = LeaseValue {
376 timestamp_millis: current_time_millis(),
377 node_addr: "127.0.0.1:20202".to_string(),
378 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
379 types: vec![DatanodeWorkloadType::Hybrid as i32],
380 }),
381 };
382 put_lease_value(&in_memory, key.clone(), value.clone()).await;
383 let leases = alive_datanodes(&client, lease_secs as u64).await.unwrap();
384 assert_eq!(leases.len(), 1);
385 assert_eq!(leases.get(&key), Some(&value));
386 }
387
388 #[tokio::test]
389 async fn test_alive_datanodes_with_condition() {
390 let client = create_meta_peer_client();
391 let in_memory = client.memory_backend();
392 let lease_secs = 10;
393
394 let key = DatanodeLeaseKey { node_id: 1 };
396 let value = LeaseValue {
397 timestamp_millis: current_time_millis() - 20 * 1000,
399 node_addr: "127.0.0.1:20201".to_string(),
400 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
401 types: vec![DatanodeWorkloadType::Hybrid as i32],
402 }),
403 };
404 put_lease_value(&in_memory, key, value).await;
405
406 let key = DatanodeLeaseKey { node_id: 2 };
408 let value = LeaseValue {
409 timestamp_millis: current_time_millis(),
410 node_addr: "127.0.0.1:20202".to_string(),
411 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
412 types: vec![DatanodeWorkloadType::Hybrid as i32],
413 }),
414 };
415 put_lease_value(&in_memory, key, value).await;
416
417 let key = DatanodeLeaseKey { node_id: 3 };
419 let value = LeaseValue {
420 timestamp_millis: current_time_millis(),
421 node_addr: "127.0.0.1:20203".to_string(),
422 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
423 types: vec![i32::MAX],
424 }),
425 };
426 put_lease_value(&in_memory, key, value).await;
427
428 let key = DatanodeLeaseKey { node_id: 4 };
430 let value = LeaseValue {
431 timestamp_millis: current_time_millis(),
432 node_addr: "127.0.0.1:20204".to_string(),
433 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
434 types: vec![i32::MAX],
435 }),
436 };
437 put_lease_value(&in_memory, key, value).await;
438
439 let leases = alive_datanodes(&client, lease_secs as u64)
440 .with_condition(is_datanode_accept_ingest_workload)
441 .await
442 .unwrap();
443 assert_eq!(leases.len(), 1);
444 assert!(leases.contains_key(&DatanodeLeaseKey { node_id: 2 }));
445 }
446
447 #[tokio::test]
448 async fn test_lookup_frontends() {
449 let client = create_meta_peer_client();
450 let in_memory = client.memory_backend();
451 let lease_secs = 10;
452
453 let active_frontend_node = NodeInfo {
454 peer: Peer {
455 id: 0,
456 addr: "127.0.0.1:20201".to_string(),
457 },
458 last_activity_ts: current_time_millis(),
459 status: NodeStatus::Frontend(FrontendStatus {}),
460 version: "1.0.0".to_string(),
461 git_commit: "1234567890".to_string(),
462 start_time_ms: current_time_millis() as u64,
463 };
464
465 let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend);
466
467 in_memory
468 .put(PutRequest {
469 key: format!("{}{}", key_prefix, "0").into(),
470 value: active_frontend_node.try_into().unwrap(),
471 prev_kv: false,
472 })
473 .await
474 .unwrap();
475
476 let inactive_frontend_node = NodeInfo {
477 peer: Peer {
478 id: 1,
479 addr: "127.0.0.1:20201".to_string(),
480 },
481 last_activity_ts: current_time_millis() - 20 * 1000,
482 status: NodeStatus::Frontend(FrontendStatus {}),
483 version: "1.0.0".to_string(),
484 git_commit: "1234567890".to_string(),
485 start_time_ms: current_time_millis() as u64,
486 };
487
488 in_memory
489 .put(PutRequest {
490 key: format!("{}{}", key_prefix, "1").into(),
491 value: inactive_frontend_node.try_into().unwrap(),
492 prev_kv: false,
493 })
494 .await
495 .unwrap();
496
497 let peers = lookup_frontends(&client, lease_secs as u64).await.unwrap();
498
499 assert_eq!(peers.len(), 1);
500 assert_eq!(peers[0].id, 0);
501 }
502}