meta_srv/
lease.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::Future;
17use std::hash::Hash;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20use std::time::Duration;
21
22use api::v1::meta::heartbeat_request::NodeWorkloads;
23use common_error::ext::BoxedError;
24use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole};
25use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
26use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
27use common_meta::peer::{Peer, PeerLookupService};
28use common_meta::rpc::store::RangeRequest;
29use common_meta::{util, DatanodeId, FlownodeId};
30use common_time::util as time_util;
31use common_workload::DatanodeWorkloadType;
32use snafu::ResultExt;
33
34use crate::cluster::MetaPeerClientRef;
35use crate::error::{Error, KvBackendSnafu, Result};
36use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
37
38enum Value<'a> {
39    LeaseValue(&'a LeaseValue),
40    NodeInfo(&'a NodeInfo),
41}
42
43fn build_lease_filter(lease: Duration) -> impl Fn(Value) -> bool {
44    move |value: Value| {
45        let active_time = match value {
46            Value::LeaseValue(lease_value) => lease_value.timestamp_millis,
47            Value::NodeInfo(node_info) => node_info.last_activity_ts,
48        };
49
50        ((time_util::current_time_millis() - active_time) as u64) < lease.as_millis() as u64
51    }
52}
53
54/// Returns true if the datanode can accept ingest workload based on its workload types.
55///
56/// A datanode is considered to accept ingest workload if it supports either:
57/// - Hybrid workload (both ingest and query workloads)
58/// - Ingest workload (only ingest workload)
59pub fn is_datanode_accept_ingest_workload(lease_value: &LeaseValue) -> bool {
60    match &lease_value.workloads {
61        NodeWorkloads::Datanode(workloads) => workloads
62            .types
63            .iter()
64            .filter_map(|w| DatanodeWorkloadType::from_i32(*w))
65            .any(|w| w.accept_ingest()),
66        _ => false,
67    }
68}
69
70/// Returns the lease value of the given datanode id, if the datanode is not found, returns None.
71pub async fn find_datanode_lease_value(
72    datanode_id: DatanodeId,
73    in_memory_key: &ResettableKvBackendRef,
74) -> Result<Option<LeaseValue>> {
75    let lease_key = DatanodeLeaseKey {
76        node_id: datanode_id,
77    };
78    let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
79    let Some(kv) = in_memory_key
80        .get(&lease_key_bytes)
81        .await
82        .context(KvBackendSnafu)?
83    else {
84        return Ok(None);
85    };
86
87    let lease_value: LeaseValue = kv.value.try_into()?;
88
89    Ok(Some(lease_value))
90}
91
92/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], will only return if it's alive under given `lease_secs`
93pub async fn lookup_datanode_peer(
94    datanode_id: DatanodeId,
95    meta_peer_client: &MetaPeerClientRef,
96    lease: Duration,
97) -> Result<Option<Peer>> {
98    let lease_filter = build_lease_filter(lease);
99    let lease_key = DatanodeLeaseKey {
100        node_id: datanode_id,
101    };
102    let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
103    let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
104        return Ok(None);
105    };
106    let lease_value: LeaseValue = kv.value.try_into()?;
107    let is_alive = lease_filter(Value::LeaseValue(&lease_value));
108    if is_alive {
109        Ok(Some(Peer {
110            id: lease_key.node_id,
111            addr: lease_value.node_addr,
112        }))
113    } else {
114        Ok(None)
115    }
116}
117
118type LeaseFilterFuture<'a, K> =
119    Pin<Box<dyn Future<Output = Result<HashMap<K, LeaseValue>>> + Send + 'a>>;
120
121pub struct LeaseFilter<'a, K>
122where
123    K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
124{
125    lease: Duration,
126    key_prefix: Vec<u8>,
127    meta_peer_client: &'a MetaPeerClientRef,
128    condition: Option<fn(&LeaseValue) -> bool>,
129    inner_future: Option<LeaseFilterFuture<'a, K>>,
130}
131
132impl<'a, K> LeaseFilter<'a, K>
133where
134    K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
135{
136    pub fn new(
137        lease: Duration,
138        key_prefix: Vec<u8>,
139        meta_peer_client: &'a MetaPeerClientRef,
140    ) -> Self {
141        Self {
142            lease,
143            key_prefix,
144            meta_peer_client,
145            condition: None,
146            inner_future: None,
147        }
148    }
149
150    /// Set the condition for the lease filter.
151    pub fn with_condition(mut self, condition: fn(&LeaseValue) -> bool) -> Self {
152        self.condition = Some(condition);
153        self
154    }
155}
156
157impl<'a, K> Future for LeaseFilter<'a, K>
158where
159    K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
160{
161    type Output = Result<HashMap<K, LeaseValue>>;
162
163    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
164        let this = self.get_mut();
165
166        if this.inner_future.is_none() {
167            let lease_filter = build_lease_filter(this.lease);
168            let condition = this.condition;
169            let key_prefix = std::mem::take(&mut this.key_prefix);
170            let fut = filter(key_prefix, this.meta_peer_client, move |v| {
171                lease_filter(Value::LeaseValue(v)) && condition.unwrap_or(|_| true)(v)
172            });
173
174            this.inner_future = Some(Box::pin(fut));
175        }
176
177        let fut = this.inner_future.as_mut().unwrap();
178        let result = futures::ready!(fut.as_mut().poll(cx))?;
179
180        Poll::Ready(Ok(result))
181    }
182}
183
184/// Find all alive datanodes
185pub fn alive_datanodes(
186    meta_peer_client: &MetaPeerClientRef,
187    lease: Duration,
188) -> LeaseFilter<'_, DatanodeLeaseKey> {
189    LeaseFilter::new(lease, DatanodeLeaseKey::prefix_key(), meta_peer_client)
190}
191
192/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], only return if it's alive under given `lease_secs`
193pub async fn lookup_flownode_peer(
194    flownode_id: FlownodeId,
195    meta_peer_client: &MetaPeerClientRef,
196    lease: Duration,
197) -> Result<Option<Peer>> {
198    let lease_filter = build_lease_filter(lease);
199    let lease_key = FlownodeLeaseKey {
200        node_id: flownode_id,
201    };
202    let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
203    let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
204        return Ok(None);
205    };
206    let lease_value: LeaseValue = kv.value.try_into()?;
207
208    let is_alive = lease_filter(Value::LeaseValue(&lease_value));
209    if is_alive {
210        Ok(Some(Peer {
211            id: lease_key.node_id,
212            addr: lease_value.node_addr,
213        }))
214    } else {
215        Ok(None)
216    }
217}
218
219/// Lookup all alive frontends from the memory backend, only return if it's alive under given `lease_secs`.
220pub async fn lookup_frontends(
221    meta_peer_client: &MetaPeerClientRef,
222    lease: Duration,
223) -> Result<Vec<Peer>> {
224    let range_request =
225        RangeRequest::new().with_prefix(NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend));
226
227    let response = meta_peer_client.range(range_request).await?;
228    let lease_filter = build_lease_filter(lease);
229
230    let mut peers = Vec::with_capacity(response.kvs.len());
231    for kv in response.kvs {
232        let node_info = NodeInfo::try_from(kv.value).context(KvBackendSnafu)?;
233        let is_alive = lease_filter(Value::NodeInfo(&node_info));
234        if is_alive {
235            peers.push(node_info.peer);
236        }
237    }
238
239    Ok(peers)
240}
241
242/// Find all alive flownodes
243pub fn alive_flownodes(
244    meta_peer_client: &MetaPeerClientRef,
245    lease: Duration,
246) -> LeaseFilter<'_, FlownodeLeaseKey> {
247    LeaseFilter::new(
248        lease,
249        FlownodeLeaseKey::prefix_key_by_cluster(),
250        meta_peer_client,
251    )
252}
253
254pub async fn filter<P, K>(
255    key: Vec<u8>,
256    meta_peer_client: &MetaPeerClientRef,
257    predicate: P,
258) -> Result<HashMap<K, LeaseValue>>
259where
260    P: Fn(&LeaseValue) -> bool,
261    K: Eq + Hash + TryFrom<Vec<u8>, Error = Error>,
262{
263    let range_end = util::get_prefix_end_key(&key);
264    let range_req = common_meta::rpc::store::RangeRequest {
265        key,
266        range_end,
267        keys_only: false,
268        ..Default::default()
269    };
270    let kvs = meta_peer_client.range(range_req).await?.kvs;
271    let mut lease_kvs = HashMap::new();
272    for kv in kvs {
273        let lease_key: K = kv.key.try_into()?;
274        let lease_value: LeaseValue = kv.value.try_into()?;
275        if !predicate(&lease_value) {
276            continue;
277        }
278        let _ = lease_kvs.insert(lease_key, lease_value);
279    }
280
281    Ok(lease_kvs)
282}
283
284#[derive(Clone)]
285pub struct MetaPeerLookupService {
286    pub meta_peer_client: MetaPeerClientRef,
287}
288
289impl MetaPeerLookupService {
290    pub fn new(meta_peer_client: MetaPeerClientRef) -> Self {
291        Self { meta_peer_client }
292    }
293}
294
295#[async_trait::async_trait]
296impl PeerLookupService for MetaPeerLookupService {
297    async fn datanode(&self, id: DatanodeId) -> common_meta::error::Result<Option<Peer>> {
298        lookup_datanode_peer(id, &self.meta_peer_client, Duration::from_secs(u64::MAX))
299            .await
300            .map_err(BoxedError::new)
301            .context(common_meta::error::ExternalSnafu)
302    }
303
304    async fn flownode(&self, id: FlownodeId) -> common_meta::error::Result<Option<Peer>> {
305        lookup_flownode_peer(id, &self.meta_peer_client, Duration::from_secs(u64::MAX))
306            .await
307            .map_err(BoxedError::new)
308            .context(common_meta::error::ExternalSnafu)
309    }
310
311    async fn active_frontends(&self) -> common_meta::error::Result<Vec<Peer>> {
312        // Get the active frontends within the last heartbeat interval.
313        lookup_frontends(
314            &self.meta_peer_client,
315            // TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval?
316            Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS),
317        )
318        .await
319        .map_err(BoxedError::new)
320        .context(common_meta::error::ExternalSnafu)
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use std::time::Duration;
327
328    use api::v1::meta::heartbeat_request::NodeWorkloads;
329    use api::v1::meta::DatanodeWorkloads;
330    use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus};
331    use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
332    use common_meta::kv_backend::ResettableKvBackendRef;
333    use common_meta::peer::{Peer, PeerLookupService};
334    use common_meta::rpc::store::PutRequest;
335    use common_time::util::current_time_millis;
336    use common_workload::DatanodeWorkloadType;
337
338    use crate::key::{DatanodeLeaseKey, LeaseValue};
339    use crate::lease::{
340        alive_datanodes, is_datanode_accept_ingest_workload, lookup_frontends, ClusterRole,
341        MetaPeerLookupService,
342    };
343    use crate::test_util::create_meta_peer_client;
344
345    async fn put_lease_value(
346        kv_backend: &ResettableKvBackendRef,
347        key: DatanodeLeaseKey,
348        value: LeaseValue,
349    ) {
350        kv_backend
351            .put(PutRequest {
352                key: key.try_into().unwrap(),
353                value: value.try_into().unwrap(),
354                prev_kv: false,
355            })
356            .await
357            .unwrap();
358    }
359
360    #[tokio::test]
361    async fn test_alive_datanodes() {
362        let client = create_meta_peer_client();
363        let in_memory = client.memory_backend();
364        let lease_secs = 10;
365
366        // put a stale lease value for node 1
367        let key = DatanodeLeaseKey { node_id: 1 };
368        let value = LeaseValue {
369            // 20s ago
370            timestamp_millis: current_time_millis() - lease_secs * 2 * 1000,
371            node_addr: "127.0.0.1:20201".to_string(),
372            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
373                types: vec![DatanodeWorkloadType::Hybrid as i32],
374            }),
375        };
376        put_lease_value(&in_memory, key, value).await;
377
378        // put a fresh lease value for node 2
379        let key = DatanodeLeaseKey { node_id: 2 };
380        let value = LeaseValue {
381            timestamp_millis: current_time_millis(),
382            node_addr: "127.0.0.1:20202".to_string(),
383            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
384                types: vec![DatanodeWorkloadType::Hybrid as i32],
385            }),
386        };
387        put_lease_value(&in_memory, key.clone(), value.clone()).await;
388        let leases = alive_datanodes(&client, Duration::from_secs(lease_secs as u64))
389            .await
390            .unwrap();
391        assert_eq!(leases.len(), 1);
392        assert_eq!(leases.get(&key), Some(&value));
393    }
394
395    #[tokio::test]
396    async fn test_alive_datanodes_with_condition() {
397        let client = create_meta_peer_client();
398        let in_memory = client.memory_backend();
399        let lease_secs = 10;
400
401        // put a lease value for node 1 without mode info
402        let key = DatanodeLeaseKey { node_id: 1 };
403        let value = LeaseValue {
404            // 20s ago
405            timestamp_millis: current_time_millis() - 20 * 1000,
406            node_addr: "127.0.0.1:20201".to_string(),
407            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
408                types: vec![DatanodeWorkloadType::Hybrid as i32],
409            }),
410        };
411        put_lease_value(&in_memory, key, value).await;
412
413        // put a lease value for node 2 with mode info
414        let key = DatanodeLeaseKey { node_id: 2 };
415        let value = LeaseValue {
416            timestamp_millis: current_time_millis(),
417            node_addr: "127.0.0.1:20202".to_string(),
418            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
419                types: vec![DatanodeWorkloadType::Hybrid as i32],
420            }),
421        };
422        put_lease_value(&in_memory, key, value).await;
423
424        // put a lease value for node 3 with mode info
425        let key = DatanodeLeaseKey { node_id: 3 };
426        let value = LeaseValue {
427            timestamp_millis: current_time_millis(),
428            node_addr: "127.0.0.1:20203".to_string(),
429            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
430                types: vec![i32::MAX],
431            }),
432        };
433        put_lease_value(&in_memory, key, value).await;
434
435        // put a lease value for node 3 with mode info
436        let key = DatanodeLeaseKey { node_id: 4 };
437        let value = LeaseValue {
438            timestamp_millis: current_time_millis(),
439            node_addr: "127.0.0.1:20204".to_string(),
440            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
441                types: vec![i32::MAX],
442            }),
443        };
444        put_lease_value(&in_memory, key, value).await;
445
446        let leases = alive_datanodes(&client, Duration::from_secs(lease_secs))
447            .with_condition(is_datanode_accept_ingest_workload)
448            .await
449            .unwrap();
450        assert_eq!(leases.len(), 1);
451        assert!(leases.contains_key(&DatanodeLeaseKey { node_id: 2 }));
452    }
453
454    #[tokio::test]
455    async fn test_lookup_frontends() {
456        let client = create_meta_peer_client();
457        let in_memory = client.memory_backend();
458        let lease_secs = 10;
459
460        let active_frontend_node = NodeInfo {
461            peer: Peer {
462                id: 0,
463                addr: "127.0.0.1:20201".to_string(),
464            },
465            last_activity_ts: current_time_millis(),
466            status: NodeStatus::Frontend(FrontendStatus {}),
467            version: "1.0.0".to_string(),
468            git_commit: "1234567890".to_string(),
469            start_time_ms: current_time_millis() as u64,
470        };
471
472        let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend);
473
474        in_memory
475            .put(PutRequest {
476                key: format!("{}{}", key_prefix, "0").into(),
477                value: active_frontend_node.try_into().unwrap(),
478                prev_kv: false,
479            })
480            .await
481            .unwrap();
482
483        let inactive_frontend_node = NodeInfo {
484            peer: Peer {
485                id: 1,
486                addr: "127.0.0.1:20201".to_string(),
487            },
488            last_activity_ts: current_time_millis() - 20 * 1000,
489            status: NodeStatus::Frontend(FrontendStatus {}),
490            version: "1.0.0".to_string(),
491            git_commit: "1234567890".to_string(),
492            start_time_ms: current_time_millis() as u64,
493        };
494
495        in_memory
496            .put(PutRequest {
497                key: format!("{}{}", key_prefix, "1").into(),
498                value: inactive_frontend_node.try_into().unwrap(),
499                prev_kv: false,
500            })
501            .await
502            .unwrap();
503
504        let peers = lookup_frontends(&client, Duration::from_secs(lease_secs))
505            .await
506            .unwrap();
507
508        assert_eq!(peers.len(), 1);
509        assert_eq!(peers[0].id, 0);
510    }
511
512    #[tokio::test]
513    async fn test_no_active_frontends() {
514        let client = create_meta_peer_client();
515        let in_memory = client.memory_backend();
516
517        let last_activity_ts =
518            current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000;
519        let active_frontend_node = NodeInfo {
520            peer: Peer {
521                id: 0,
522                addr: "127.0.0.1:20201".to_string(),
523            },
524            last_activity_ts,
525            status: NodeStatus::Frontend(FrontendStatus {}),
526            version: "1.0.0".to_string(),
527            git_commit: "1234567890".to_string(),
528            start_time_ms: last_activity_ts as u64,
529        };
530
531        let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend);
532
533        in_memory
534            .put(PutRequest {
535                key: format!("{}{}", key_prefix, "0").into(),
536                value: active_frontend_node.try_into().unwrap(),
537                prev_kv: false,
538            })
539            .await
540            .unwrap();
541
542        let service = MetaPeerLookupService::new(client);
543        let peers = service.active_frontends().await.unwrap();
544        assert_eq!(peers.len(), 0);
545    }
546}