1use std::collections::HashMap;
16use std::future::Future;
17use std::hash::Hash;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20
21use api::v1::meta::heartbeat_request::NodeWorkloads;
22use common_error::ext::BoxedError;
23use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
24use common_meta::peer::{Peer, PeerLookupService};
25use common_meta::{util, DatanodeId, FlownodeId};
26use common_time::util as time_util;
27use common_workload::DatanodeWorkloadType;
28use snafu::ResultExt;
29
30use crate::cluster::MetaPeerClientRef;
31use crate::error::{Error, KvBackendSnafu, Result};
32use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
33
34fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool {
35 move |v: &LeaseValue| {
36 ((time_util::current_time_millis() - v.timestamp_millis) as u64)
37 < lease_secs.saturating_mul(1000)
38 }
39}
40
41pub fn is_datanode_accept_ingest_workload(lease_value: &LeaseValue) -> bool {
47 match &lease_value.workloads {
48 NodeWorkloads::Datanode(workloads) => workloads
49 .types
50 .iter()
51 .filter_map(|w| DatanodeWorkloadType::from_i32(*w))
52 .any(|w| w.accept_ingest()),
53 _ => false,
54 }
55}
56
57pub async fn find_datanode_lease_value(
59 datanode_id: DatanodeId,
60 in_memory_key: &ResettableKvBackendRef,
61) -> Result<Option<LeaseValue>> {
62 let lease_key = DatanodeLeaseKey {
63 node_id: datanode_id,
64 };
65 let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
66 let Some(kv) = in_memory_key
67 .get(&lease_key_bytes)
68 .await
69 .context(KvBackendSnafu)?
70 else {
71 return Ok(None);
72 };
73
74 let lease_value: LeaseValue = kv.value.try_into()?;
75
76 Ok(Some(lease_value))
77}
78
79pub async fn lookup_datanode_peer(
81 datanode_id: DatanodeId,
82 meta_peer_client: &MetaPeerClientRef,
83 lease_secs: u64,
84) -> Result<Option<Peer>> {
85 let lease_filter = build_lease_filter(lease_secs);
86 let lease_key = DatanodeLeaseKey {
87 node_id: datanode_id,
88 };
89 let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
90 let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
91 return Ok(None);
92 };
93 let lease_value: LeaseValue = kv.value.try_into()?;
94 let is_alive = lease_filter(&lease_value);
95 if is_alive {
96 Ok(Some(Peer {
97 id: lease_key.node_id,
98 addr: lease_value.node_addr,
99 }))
100 } else {
101 Ok(None)
102 }
103}
104
105type LeaseFilterFuture<'a, K> =
106 Pin<Box<dyn Future<Output = Result<HashMap<K, LeaseValue>>> + Send + 'a>>;
107
108pub struct LeaseFilter<'a, K>
109where
110 K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
111{
112 lease_secs: u64,
113 key_prefix: Vec<u8>,
114 meta_peer_client: &'a MetaPeerClientRef,
115 condition: Option<fn(&LeaseValue) -> bool>,
116 inner_future: Option<LeaseFilterFuture<'a, K>>,
117}
118
119impl<'a, K> LeaseFilter<'a, K>
120where
121 K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
122{
123 pub fn new(
124 lease_secs: u64,
125 key_prefix: Vec<u8>,
126 meta_peer_client: &'a MetaPeerClientRef,
127 ) -> Self {
128 Self {
129 lease_secs,
130 key_prefix,
131 meta_peer_client,
132 condition: None,
133 inner_future: None,
134 }
135 }
136
137 pub fn with_condition(mut self, condition: fn(&LeaseValue) -> bool) -> Self {
139 self.condition = Some(condition);
140 self
141 }
142}
143
144impl<'a, K> Future for LeaseFilter<'a, K>
145where
146 K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
147{
148 type Output = Result<HashMap<K, LeaseValue>>;
149
150 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151 let this = self.get_mut();
152
153 if this.inner_future.is_none() {
154 let lease_filter = build_lease_filter(this.lease_secs);
155 let condition = this.condition;
156 let key_prefix = std::mem::take(&mut this.key_prefix);
157 let fut = filter(key_prefix, this.meta_peer_client, move |v| {
158 lease_filter(v) && condition.unwrap_or(|_| true)(v)
159 });
160
161 this.inner_future = Some(Box::pin(fut));
162 }
163
164 let fut = this.inner_future.as_mut().unwrap();
165 let result = futures::ready!(fut.as_mut().poll(cx))?;
166
167 Poll::Ready(Ok(result))
168 }
169}
170
171pub fn alive_datanodes(
173 meta_peer_client: &MetaPeerClientRef,
174 lease_secs: u64,
175) -> LeaseFilter<'_, DatanodeLeaseKey> {
176 LeaseFilter::new(lease_secs, DatanodeLeaseKey::prefix_key(), meta_peer_client)
177}
178
179pub async fn lookup_flownode_peer(
181 flownode_id: FlownodeId,
182 meta_peer_client: &MetaPeerClientRef,
183 lease_secs: u64,
184) -> Result<Option<Peer>> {
185 let lease_filter = build_lease_filter(lease_secs);
186 let lease_key = FlownodeLeaseKey {
187 node_id: flownode_id,
188 };
189 let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
190 let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
191 return Ok(None);
192 };
193 let lease_value: LeaseValue = kv.value.try_into()?;
194
195 let is_alive = lease_filter(&lease_value);
196 if is_alive {
197 Ok(Some(Peer {
198 id: lease_key.node_id,
199 addr: lease_value.node_addr,
200 }))
201 } else {
202 Ok(None)
203 }
204}
205
206pub fn alive_flownodes(
208 meta_peer_client: &MetaPeerClientRef,
209 lease_secs: u64,
210) -> LeaseFilter<'_, FlownodeLeaseKey> {
211 LeaseFilter::new(
212 lease_secs,
213 FlownodeLeaseKey::prefix_key_by_cluster(),
214 meta_peer_client,
215 )
216}
217
218pub async fn filter<P, K>(
219 key: Vec<u8>,
220 meta_peer_client: &MetaPeerClientRef,
221 predicate: P,
222) -> Result<HashMap<K, LeaseValue>>
223where
224 P: Fn(&LeaseValue) -> bool,
225 K: Eq + Hash + TryFrom<Vec<u8>, Error = Error>,
226{
227 let range_end = util::get_prefix_end_key(&key);
228 let range_req = common_meta::rpc::store::RangeRequest {
229 key,
230 range_end,
231 keys_only: false,
232 ..Default::default()
233 };
234 let kvs = meta_peer_client.range(range_req).await?.kvs;
235 let mut lease_kvs = HashMap::new();
236 for kv in kvs {
237 let lease_key: K = kv.key.try_into()?;
238 let lease_value: LeaseValue = kv.value.try_into()?;
239 if !predicate(&lease_value) {
240 continue;
241 }
242 let _ = lease_kvs.insert(lease_key, lease_value);
243 }
244
245 Ok(lease_kvs)
246}
247
248#[derive(Clone)]
249pub struct MetaPeerLookupService {
250 pub meta_peer_client: MetaPeerClientRef,
251}
252
253impl MetaPeerLookupService {
254 pub fn new(meta_peer_client: MetaPeerClientRef) -> Self {
255 Self { meta_peer_client }
256 }
257}
258
259#[async_trait::async_trait]
260impl PeerLookupService for MetaPeerLookupService {
261 async fn datanode(&self, id: DatanodeId) -> common_meta::error::Result<Option<Peer>> {
262 lookup_datanode_peer(id, &self.meta_peer_client, u64::MAX)
263 .await
264 .map_err(BoxedError::new)
265 .context(common_meta::error::ExternalSnafu)
266 }
267 async fn flownode(&self, id: FlownodeId) -> common_meta::error::Result<Option<Peer>> {
268 lookup_flownode_peer(id, &self.meta_peer_client, u64::MAX)
269 .await
270 .map_err(BoxedError::new)
271 .context(common_meta::error::ExternalSnafu)
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use api::v1::meta::heartbeat_request::NodeWorkloads;
278 use api::v1::meta::DatanodeWorkloads;
279 use common_meta::kv_backend::ResettableKvBackendRef;
280 use common_meta::rpc::store::PutRequest;
281 use common_time::util::current_time_millis;
282 use common_workload::DatanodeWorkloadType;
283
284 use crate::key::{DatanodeLeaseKey, LeaseValue};
285 use crate::lease::{alive_datanodes, is_datanode_accept_ingest_workload};
286 use crate::test_util::create_meta_peer_client;
287
288 async fn put_lease_value(
289 kv_backend: &ResettableKvBackendRef,
290 key: DatanodeLeaseKey,
291 value: LeaseValue,
292 ) {
293 kv_backend
294 .put(PutRequest {
295 key: key.try_into().unwrap(),
296 value: value.try_into().unwrap(),
297 prev_kv: false,
298 })
299 .await
300 .unwrap();
301 }
302
303 #[tokio::test]
304 async fn test_alive_datanodes() {
305 let client = create_meta_peer_client();
306 let in_memory = client.memory_backend();
307 let lease_secs = 10;
308
309 let key = DatanodeLeaseKey { node_id: 1 };
311 let value = LeaseValue {
312 timestamp_millis: current_time_millis() - lease_secs * 2 * 1000,
314 node_addr: "127.0.0.1:20201".to_string(),
315 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
316 types: vec![DatanodeWorkloadType::Hybrid as i32],
317 }),
318 };
319 put_lease_value(&in_memory, key, value).await;
320
321 let key = DatanodeLeaseKey { node_id: 2 };
323 let value = LeaseValue {
324 timestamp_millis: current_time_millis(),
325 node_addr: "127.0.0.1:20202".to_string(),
326 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
327 types: vec![DatanodeWorkloadType::Hybrid as i32],
328 }),
329 };
330 put_lease_value(&in_memory, key.clone(), value.clone()).await;
331 let leases = alive_datanodes(&client, lease_secs as u64).await.unwrap();
332 assert_eq!(leases.len(), 1);
333 assert_eq!(leases.get(&key), Some(&value));
334 }
335
336 #[tokio::test]
337 async fn test_alive_datanodes_with_condition() {
338 let client = create_meta_peer_client();
339 let in_memory = client.memory_backend();
340 let lease_secs = 10;
341
342 let key = DatanodeLeaseKey { node_id: 1 };
344 let value = LeaseValue {
345 timestamp_millis: current_time_millis() - 20 * 1000,
347 node_addr: "127.0.0.1:20201".to_string(),
348 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
349 types: vec![DatanodeWorkloadType::Hybrid as i32],
350 }),
351 };
352 put_lease_value(&in_memory, key, value).await;
353
354 let key = DatanodeLeaseKey { node_id: 2 };
356 let value = LeaseValue {
357 timestamp_millis: current_time_millis(),
358 node_addr: "127.0.0.1:20202".to_string(),
359 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
360 types: vec![DatanodeWorkloadType::Hybrid as i32],
361 }),
362 };
363 put_lease_value(&in_memory, key, value).await;
364
365 let key = DatanodeLeaseKey { node_id: 3 };
367 let value = LeaseValue {
368 timestamp_millis: current_time_millis(),
369 node_addr: "127.0.0.1:20203".to_string(),
370 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
371 types: vec![i32::MAX],
372 }),
373 };
374 put_lease_value(&in_memory, key, value).await;
375
376 let key = DatanodeLeaseKey { node_id: 4 };
378 let value = LeaseValue {
379 timestamp_millis: current_time_millis(),
380 node_addr: "127.0.0.1:20204".to_string(),
381 workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
382 types: vec![i32::MAX],
383 }),
384 };
385 put_lease_value(&in_memory, key, value).await;
386
387 let leases = alive_datanodes(&client, lease_secs as u64)
388 .with_condition(is_datanode_accept_ingest_workload)
389 .await
390 .unwrap();
391 assert_eq!(leases.len(), 1);
392 assert!(leases.contains_key(&DatanodeLeaseKey { node_id: 2 }));
393 }
394}