meta_srv/
lease.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::Future;
17use std::hash::Hash;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20
21use api::v1::meta::heartbeat_request::NodeWorkloads;
22use common_error::ext::BoxedError;
23use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole};
24use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
25use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
26use common_meta::peer::{Peer, PeerLookupService};
27use common_meta::rpc::store::RangeRequest;
28use common_meta::{util, DatanodeId, FlownodeId};
29use common_time::util as time_util;
30use common_workload::DatanodeWorkloadType;
31use snafu::ResultExt;
32
33use crate::cluster::MetaPeerClientRef;
34use crate::error::{Error, KvBackendSnafu, Result};
35use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
36
37enum Value<'a> {
38    LeaseValue(&'a LeaseValue),
39    NodeInfo(&'a NodeInfo),
40}
41
42fn build_lease_filter(lease_secs: u64) -> impl Fn(Value) -> bool {
43    move |value: Value| {
44        let active_time = match value {
45            Value::LeaseValue(lease_value) => lease_value.timestamp_millis,
46            Value::NodeInfo(node_info) => node_info.last_activity_ts,
47        };
48
49        ((time_util::current_time_millis() - active_time) as u64) < lease_secs.saturating_mul(1000)
50    }
51}
52
53/// Returns true if the datanode can accept ingest workload based on its workload types.
54///
55/// A datanode is considered to accept ingest workload if it supports either:
56/// - Hybrid workload (both ingest and query workloads)
57/// - Ingest workload (only ingest workload)
58pub fn is_datanode_accept_ingest_workload(lease_value: &LeaseValue) -> bool {
59    match &lease_value.workloads {
60        NodeWorkloads::Datanode(workloads) => workloads
61            .types
62            .iter()
63            .filter_map(|w| DatanodeWorkloadType::from_i32(*w))
64            .any(|w| w.accept_ingest()),
65        _ => false,
66    }
67}
68
69/// Returns the lease value of the given datanode id, if the datanode is not found, returns None.
70pub async fn find_datanode_lease_value(
71    datanode_id: DatanodeId,
72    in_memory_key: &ResettableKvBackendRef,
73) -> Result<Option<LeaseValue>> {
74    let lease_key = DatanodeLeaseKey {
75        node_id: datanode_id,
76    };
77    let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
78    let Some(kv) = in_memory_key
79        .get(&lease_key_bytes)
80        .await
81        .context(KvBackendSnafu)?
82    else {
83        return Ok(None);
84    };
85
86    let lease_value: LeaseValue = kv.value.try_into()?;
87
88    Ok(Some(lease_value))
89}
90
91/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], will only return if it's alive under given `lease_secs`
92pub async fn lookup_datanode_peer(
93    datanode_id: DatanodeId,
94    meta_peer_client: &MetaPeerClientRef,
95    lease_secs: u64,
96) -> Result<Option<Peer>> {
97    let lease_filter = build_lease_filter(lease_secs);
98    let lease_key = DatanodeLeaseKey {
99        node_id: datanode_id,
100    };
101    let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
102    let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
103        return Ok(None);
104    };
105    let lease_value: LeaseValue = kv.value.try_into()?;
106    let is_alive = lease_filter(Value::LeaseValue(&lease_value));
107    if is_alive {
108        Ok(Some(Peer {
109            id: lease_key.node_id,
110            addr: lease_value.node_addr,
111        }))
112    } else {
113        Ok(None)
114    }
115}
116
117type LeaseFilterFuture<'a, K> =
118    Pin<Box<dyn Future<Output = Result<HashMap<K, LeaseValue>>> + Send + 'a>>;
119
120pub struct LeaseFilter<'a, K>
121where
122    K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
123{
124    lease_secs: u64,
125    key_prefix: Vec<u8>,
126    meta_peer_client: &'a MetaPeerClientRef,
127    condition: Option<fn(&LeaseValue) -> bool>,
128    inner_future: Option<LeaseFilterFuture<'a, K>>,
129}
130
131impl<'a, K> LeaseFilter<'a, K>
132where
133    K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
134{
135    pub fn new(
136        lease_secs: u64,
137        key_prefix: Vec<u8>,
138        meta_peer_client: &'a MetaPeerClientRef,
139    ) -> Self {
140        Self {
141            lease_secs,
142            key_prefix,
143            meta_peer_client,
144            condition: None,
145            inner_future: None,
146        }
147    }
148
149    /// Set the condition for the lease filter.
150    pub fn with_condition(mut self, condition: fn(&LeaseValue) -> bool) -> Self {
151        self.condition = Some(condition);
152        self
153    }
154}
155
156impl<'a, K> Future for LeaseFilter<'a, K>
157where
158    K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
159{
160    type Output = Result<HashMap<K, LeaseValue>>;
161
162    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
163        let this = self.get_mut();
164
165        if this.inner_future.is_none() {
166            let lease_filter = build_lease_filter(this.lease_secs);
167            let condition = this.condition;
168            let key_prefix = std::mem::take(&mut this.key_prefix);
169            let fut = filter(key_prefix, this.meta_peer_client, move |v| {
170                lease_filter(Value::LeaseValue(v)) && condition.unwrap_or(|_| true)(v)
171            });
172
173            this.inner_future = Some(Box::pin(fut));
174        }
175
176        let fut = this.inner_future.as_mut().unwrap();
177        let result = futures::ready!(fut.as_mut().poll(cx))?;
178
179        Poll::Ready(Ok(result))
180    }
181}
182
183/// Find all alive datanodes
184pub fn alive_datanodes(
185    meta_peer_client: &MetaPeerClientRef,
186    lease_secs: u64,
187) -> LeaseFilter<'_, DatanodeLeaseKey> {
188    LeaseFilter::new(lease_secs, DatanodeLeaseKey::prefix_key(), meta_peer_client)
189}
190
191/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], only return if it's alive under given `lease_secs`
192pub async fn lookup_flownode_peer(
193    flownode_id: FlownodeId,
194    meta_peer_client: &MetaPeerClientRef,
195    lease_secs: u64,
196) -> Result<Option<Peer>> {
197    let lease_filter = build_lease_filter(lease_secs);
198    let lease_key = FlownodeLeaseKey {
199        node_id: flownode_id,
200    };
201    let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
202    let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
203        return Ok(None);
204    };
205    let lease_value: LeaseValue = kv.value.try_into()?;
206
207    let is_alive = lease_filter(Value::LeaseValue(&lease_value));
208    if is_alive {
209        Ok(Some(Peer {
210            id: lease_key.node_id,
211            addr: lease_value.node_addr,
212        }))
213    } else {
214        Ok(None)
215    }
216}
217
218/// Lookup all alive frontends from the memory backend, only return if it's alive under given `lease_secs`.
219pub async fn lookup_frontends(
220    meta_peer_client: &MetaPeerClientRef,
221    lease_secs: u64,
222) -> Result<Vec<Peer>> {
223    let range_request =
224        RangeRequest::new().with_prefix(NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend));
225
226    let response = meta_peer_client.range(range_request).await?;
227    let lease_filter = build_lease_filter(lease_secs);
228
229    let mut peers = Vec::with_capacity(response.kvs.len());
230    for kv in response.kvs {
231        let node_info = NodeInfo::try_from(kv.value).context(KvBackendSnafu)?;
232        let is_alive = lease_filter(Value::NodeInfo(&node_info));
233        if is_alive {
234            peers.push(node_info.peer);
235        }
236    }
237
238    Ok(peers)
239}
240
241/// Find all alive flownodes
242pub fn alive_flownodes(
243    meta_peer_client: &MetaPeerClientRef,
244    lease_secs: u64,
245) -> LeaseFilter<'_, FlownodeLeaseKey> {
246    LeaseFilter::new(
247        lease_secs,
248        FlownodeLeaseKey::prefix_key_by_cluster(),
249        meta_peer_client,
250    )
251}
252
253pub async fn filter<P, K>(
254    key: Vec<u8>,
255    meta_peer_client: &MetaPeerClientRef,
256    predicate: P,
257) -> Result<HashMap<K, LeaseValue>>
258where
259    P: Fn(&LeaseValue) -> bool,
260    K: Eq + Hash + TryFrom<Vec<u8>, Error = Error>,
261{
262    let range_end = util::get_prefix_end_key(&key);
263    let range_req = common_meta::rpc::store::RangeRequest {
264        key,
265        range_end,
266        keys_only: false,
267        ..Default::default()
268    };
269    let kvs = meta_peer_client.range(range_req).await?.kvs;
270    let mut lease_kvs = HashMap::new();
271    for kv in kvs {
272        let lease_key: K = kv.key.try_into()?;
273        let lease_value: LeaseValue = kv.value.try_into()?;
274        if !predicate(&lease_value) {
275            continue;
276        }
277        let _ = lease_kvs.insert(lease_key, lease_value);
278    }
279
280    Ok(lease_kvs)
281}
282
283#[derive(Clone)]
284pub struct MetaPeerLookupService {
285    pub meta_peer_client: MetaPeerClientRef,
286}
287
288impl MetaPeerLookupService {
289    pub fn new(meta_peer_client: MetaPeerClientRef) -> Self {
290        Self { meta_peer_client }
291    }
292}
293
294#[async_trait::async_trait]
295impl PeerLookupService for MetaPeerLookupService {
296    async fn datanode(&self, id: DatanodeId) -> common_meta::error::Result<Option<Peer>> {
297        lookup_datanode_peer(id, &self.meta_peer_client, u64::MAX)
298            .await
299            .map_err(BoxedError::new)
300            .context(common_meta::error::ExternalSnafu)
301    }
302
303    async fn flownode(&self, id: FlownodeId) -> common_meta::error::Result<Option<Peer>> {
304        lookup_flownode_peer(id, &self.meta_peer_client, u64::MAX)
305            .await
306            .map_err(BoxedError::new)
307            .context(common_meta::error::ExternalSnafu)
308    }
309
310    async fn active_frontends(&self) -> common_meta::error::Result<Vec<Peer>> {
311        // Get the active frontends within the last heartbeat interval.
312        lookup_frontends(
313            &self.meta_peer_client,
314            // TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval?
315            FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
316        )
317        .await
318        .map_err(BoxedError::new)
319        .context(common_meta::error::ExternalSnafu)
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use api::v1::meta::heartbeat_request::NodeWorkloads;
326    use api::v1::meta::DatanodeWorkloads;
327    use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus};
328    use common_meta::kv_backend::ResettableKvBackendRef;
329    use common_meta::peer::Peer;
330    use common_meta::rpc::store::PutRequest;
331    use common_time::util::current_time_millis;
332    use common_workload::DatanodeWorkloadType;
333
334    use crate::key::{DatanodeLeaseKey, LeaseValue};
335    use crate::lease::{
336        alive_datanodes, is_datanode_accept_ingest_workload, lookup_frontends, ClusterRole,
337    };
338    use crate::test_util::create_meta_peer_client;
339
340    async fn put_lease_value(
341        kv_backend: &ResettableKvBackendRef,
342        key: DatanodeLeaseKey,
343        value: LeaseValue,
344    ) {
345        kv_backend
346            .put(PutRequest {
347                key: key.try_into().unwrap(),
348                value: value.try_into().unwrap(),
349                prev_kv: false,
350            })
351            .await
352            .unwrap();
353    }
354
355    #[tokio::test]
356    async fn test_alive_datanodes() {
357        let client = create_meta_peer_client();
358        let in_memory = client.memory_backend();
359        let lease_secs = 10;
360
361        // put a stale lease value for node 1
362        let key = DatanodeLeaseKey { node_id: 1 };
363        let value = LeaseValue {
364            // 20s ago
365            timestamp_millis: current_time_millis() - lease_secs * 2 * 1000,
366            node_addr: "127.0.0.1:20201".to_string(),
367            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
368                types: vec![DatanodeWorkloadType::Hybrid as i32],
369            }),
370        };
371        put_lease_value(&in_memory, key, value).await;
372
373        // put a fresh lease value for node 2
374        let key = DatanodeLeaseKey { node_id: 2 };
375        let value = LeaseValue {
376            timestamp_millis: current_time_millis(),
377            node_addr: "127.0.0.1:20202".to_string(),
378            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
379                types: vec![DatanodeWorkloadType::Hybrid as i32],
380            }),
381        };
382        put_lease_value(&in_memory, key.clone(), value.clone()).await;
383        let leases = alive_datanodes(&client, lease_secs as u64).await.unwrap();
384        assert_eq!(leases.len(), 1);
385        assert_eq!(leases.get(&key), Some(&value));
386    }
387
388    #[tokio::test]
389    async fn test_alive_datanodes_with_condition() {
390        let client = create_meta_peer_client();
391        let in_memory = client.memory_backend();
392        let lease_secs = 10;
393
394        // put a lease value for node 1 without mode info
395        let key = DatanodeLeaseKey { node_id: 1 };
396        let value = LeaseValue {
397            // 20s ago
398            timestamp_millis: current_time_millis() - 20 * 1000,
399            node_addr: "127.0.0.1:20201".to_string(),
400            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
401                types: vec![DatanodeWorkloadType::Hybrid as i32],
402            }),
403        };
404        put_lease_value(&in_memory, key, value).await;
405
406        // put a lease value for node 2 with mode info
407        let key = DatanodeLeaseKey { node_id: 2 };
408        let value = LeaseValue {
409            timestamp_millis: current_time_millis(),
410            node_addr: "127.0.0.1:20202".to_string(),
411            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
412                types: vec![DatanodeWorkloadType::Hybrid as i32],
413            }),
414        };
415        put_lease_value(&in_memory, key, value).await;
416
417        // put a lease value for node 3 with mode info
418        let key = DatanodeLeaseKey { node_id: 3 };
419        let value = LeaseValue {
420            timestamp_millis: current_time_millis(),
421            node_addr: "127.0.0.1:20203".to_string(),
422            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
423                types: vec![i32::MAX],
424            }),
425        };
426        put_lease_value(&in_memory, key, value).await;
427
428        // put a lease value for node 3 with mode info
429        let key = DatanodeLeaseKey { node_id: 4 };
430        let value = LeaseValue {
431            timestamp_millis: current_time_millis(),
432            node_addr: "127.0.0.1:20204".to_string(),
433            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
434                types: vec![i32::MAX],
435            }),
436        };
437        put_lease_value(&in_memory, key, value).await;
438
439        let leases = alive_datanodes(&client, lease_secs as u64)
440            .with_condition(is_datanode_accept_ingest_workload)
441            .await
442            .unwrap();
443        assert_eq!(leases.len(), 1);
444        assert!(leases.contains_key(&DatanodeLeaseKey { node_id: 2 }));
445    }
446
447    #[tokio::test]
448    async fn test_lookup_frontends() {
449        let client = create_meta_peer_client();
450        let in_memory = client.memory_backend();
451        let lease_secs = 10;
452
453        let active_frontend_node = NodeInfo {
454            peer: Peer {
455                id: 0,
456                addr: "127.0.0.1:20201".to_string(),
457            },
458            last_activity_ts: current_time_millis(),
459            status: NodeStatus::Frontend(FrontendStatus {}),
460            version: "1.0.0".to_string(),
461            git_commit: "1234567890".to_string(),
462            start_time_ms: current_time_millis() as u64,
463        };
464
465        let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend);
466
467        in_memory
468            .put(PutRequest {
469                key: format!("{}{}", key_prefix, "0").into(),
470                value: active_frontend_node.try_into().unwrap(),
471                prev_kv: false,
472            })
473            .await
474            .unwrap();
475
476        let inactive_frontend_node = NodeInfo {
477            peer: Peer {
478                id: 1,
479                addr: "127.0.0.1:20201".to_string(),
480            },
481            last_activity_ts: current_time_millis() - 20 * 1000,
482            status: NodeStatus::Frontend(FrontendStatus {}),
483            version: "1.0.0".to_string(),
484            git_commit: "1234567890".to_string(),
485            start_time_ms: current_time_millis() as u64,
486        };
487
488        in_memory
489            .put(PutRequest {
490                key: format!("{}{}", key_prefix, "1").into(),
491                value: inactive_frontend_node.try_into().unwrap(),
492                prev_kv: false,
493            })
494            .await
495            .unwrap();
496
497        let peers = lookup_frontends(&client, lease_secs as u64).await.unwrap();
498
499        assert_eq!(peers.len(), 1);
500        assert_eq!(peers[0].id, 0);
501    }
502}