meta_srv/
lease.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::Future;
17use std::hash::Hash;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20
21use api::v1::meta::heartbeat_request::NodeWorkloads;
22use common_error::ext::BoxedError;
23use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
24use common_meta::peer::{Peer, PeerLookupService};
25use common_meta::{util, DatanodeId, FlownodeId};
26use common_time::util as time_util;
27use common_workload::DatanodeWorkloadType;
28use snafu::ResultExt;
29
30use crate::cluster::MetaPeerClientRef;
31use crate::error::{Error, KvBackendSnafu, Result};
32use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
33
34fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool {
35    move |v: &LeaseValue| {
36        ((time_util::current_time_millis() - v.timestamp_millis) as u64)
37            < lease_secs.saturating_mul(1000)
38    }
39}
40
41/// Returns true if the datanode can accept ingest workload based on its workload types.
42///
43/// A datanode is considered to accept ingest workload if it supports either:
44/// - Hybrid workload (both ingest and query workloads)
45/// - Ingest workload (only ingest workload)
46pub fn is_datanode_accept_ingest_workload(lease_value: &LeaseValue) -> bool {
47    match &lease_value.workloads {
48        NodeWorkloads::Datanode(workloads) => workloads
49            .types
50            .iter()
51            .filter_map(|w| DatanodeWorkloadType::from_i32(*w))
52            .any(|w| w.accept_ingest()),
53        _ => false,
54    }
55}
56
57/// Returns the lease value of the given datanode id, if the datanode is not found, returns None.
58pub async fn find_datanode_lease_value(
59    datanode_id: DatanodeId,
60    in_memory_key: &ResettableKvBackendRef,
61) -> Result<Option<LeaseValue>> {
62    let lease_key = DatanodeLeaseKey {
63        node_id: datanode_id,
64    };
65    let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
66    let Some(kv) = in_memory_key
67        .get(&lease_key_bytes)
68        .await
69        .context(KvBackendSnafu)?
70    else {
71        return Ok(None);
72    };
73
74    let lease_value: LeaseValue = kv.value.try_into()?;
75
76    Ok(Some(lease_value))
77}
78
79/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], will only return if it's alive under given `lease_secs`
80pub async fn lookup_datanode_peer(
81    datanode_id: DatanodeId,
82    meta_peer_client: &MetaPeerClientRef,
83    lease_secs: u64,
84) -> Result<Option<Peer>> {
85    let lease_filter = build_lease_filter(lease_secs);
86    let lease_key = DatanodeLeaseKey {
87        node_id: datanode_id,
88    };
89    let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
90    let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
91        return Ok(None);
92    };
93    let lease_value: LeaseValue = kv.value.try_into()?;
94    let is_alive = lease_filter(&lease_value);
95    if is_alive {
96        Ok(Some(Peer {
97            id: lease_key.node_id,
98            addr: lease_value.node_addr,
99        }))
100    } else {
101        Ok(None)
102    }
103}
104
105type LeaseFilterFuture<'a, K> =
106    Pin<Box<dyn Future<Output = Result<HashMap<K, LeaseValue>>> + Send + 'a>>;
107
108pub struct LeaseFilter<'a, K>
109where
110    K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
111{
112    lease_secs: u64,
113    key_prefix: Vec<u8>,
114    meta_peer_client: &'a MetaPeerClientRef,
115    condition: Option<fn(&LeaseValue) -> bool>,
116    inner_future: Option<LeaseFilterFuture<'a, K>>,
117}
118
119impl<'a, K> LeaseFilter<'a, K>
120where
121    K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
122{
123    pub fn new(
124        lease_secs: u64,
125        key_prefix: Vec<u8>,
126        meta_peer_client: &'a MetaPeerClientRef,
127    ) -> Self {
128        Self {
129            lease_secs,
130            key_prefix,
131            meta_peer_client,
132            condition: None,
133            inner_future: None,
134        }
135    }
136
137    /// Set the condition for the lease filter.
138    pub fn with_condition(mut self, condition: fn(&LeaseValue) -> bool) -> Self {
139        self.condition = Some(condition);
140        self
141    }
142}
143
144impl<'a, K> Future for LeaseFilter<'a, K>
145where
146    K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
147{
148    type Output = Result<HashMap<K, LeaseValue>>;
149
150    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151        let this = self.get_mut();
152
153        if this.inner_future.is_none() {
154            let lease_filter = build_lease_filter(this.lease_secs);
155            let condition = this.condition;
156            let key_prefix = std::mem::take(&mut this.key_prefix);
157            let fut = filter(key_prefix, this.meta_peer_client, move |v| {
158                lease_filter(v) && condition.unwrap_or(|_| true)(v)
159            });
160
161            this.inner_future = Some(Box::pin(fut));
162        }
163
164        let fut = this.inner_future.as_mut().unwrap();
165        let result = futures::ready!(fut.as_mut().poll(cx))?;
166
167        Poll::Ready(Ok(result))
168    }
169}
170
171/// Find all alive datanodes
172pub fn alive_datanodes(
173    meta_peer_client: &MetaPeerClientRef,
174    lease_secs: u64,
175) -> LeaseFilter<'_, DatanodeLeaseKey> {
176    LeaseFilter::new(lease_secs, DatanodeLeaseKey::prefix_key(), meta_peer_client)
177}
178
179/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], only return if it's alive under given `lease_secs`
180pub async fn lookup_flownode_peer(
181    flownode_id: FlownodeId,
182    meta_peer_client: &MetaPeerClientRef,
183    lease_secs: u64,
184) -> Result<Option<Peer>> {
185    let lease_filter = build_lease_filter(lease_secs);
186    let lease_key = FlownodeLeaseKey {
187        node_id: flownode_id,
188    };
189    let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
190    let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
191        return Ok(None);
192    };
193    let lease_value: LeaseValue = kv.value.try_into()?;
194
195    let is_alive = lease_filter(&lease_value);
196    if is_alive {
197        Ok(Some(Peer {
198            id: lease_key.node_id,
199            addr: lease_value.node_addr,
200        }))
201    } else {
202        Ok(None)
203    }
204}
205
206/// Find all alive flownodes
207pub fn alive_flownodes(
208    meta_peer_client: &MetaPeerClientRef,
209    lease_secs: u64,
210) -> LeaseFilter<'_, FlownodeLeaseKey> {
211    LeaseFilter::new(
212        lease_secs,
213        FlownodeLeaseKey::prefix_key_by_cluster(),
214        meta_peer_client,
215    )
216}
217
218pub async fn filter<P, K>(
219    key: Vec<u8>,
220    meta_peer_client: &MetaPeerClientRef,
221    predicate: P,
222) -> Result<HashMap<K, LeaseValue>>
223where
224    P: Fn(&LeaseValue) -> bool,
225    K: Eq + Hash + TryFrom<Vec<u8>, Error = Error>,
226{
227    let range_end = util::get_prefix_end_key(&key);
228    let range_req = common_meta::rpc::store::RangeRequest {
229        key,
230        range_end,
231        keys_only: false,
232        ..Default::default()
233    };
234    let kvs = meta_peer_client.range(range_req).await?.kvs;
235    let mut lease_kvs = HashMap::new();
236    for kv in kvs {
237        let lease_key: K = kv.key.try_into()?;
238        let lease_value: LeaseValue = kv.value.try_into()?;
239        if !predicate(&lease_value) {
240            continue;
241        }
242        let _ = lease_kvs.insert(lease_key, lease_value);
243    }
244
245    Ok(lease_kvs)
246}
247
248#[derive(Clone)]
249pub struct MetaPeerLookupService {
250    pub meta_peer_client: MetaPeerClientRef,
251}
252
253impl MetaPeerLookupService {
254    pub fn new(meta_peer_client: MetaPeerClientRef) -> Self {
255        Self { meta_peer_client }
256    }
257}
258
259#[async_trait::async_trait]
260impl PeerLookupService for MetaPeerLookupService {
261    async fn datanode(&self, id: DatanodeId) -> common_meta::error::Result<Option<Peer>> {
262        lookup_datanode_peer(id, &self.meta_peer_client, u64::MAX)
263            .await
264            .map_err(BoxedError::new)
265            .context(common_meta::error::ExternalSnafu)
266    }
267    async fn flownode(&self, id: FlownodeId) -> common_meta::error::Result<Option<Peer>> {
268        lookup_flownode_peer(id, &self.meta_peer_client, u64::MAX)
269            .await
270            .map_err(BoxedError::new)
271            .context(common_meta::error::ExternalSnafu)
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use api::v1::meta::heartbeat_request::NodeWorkloads;
278    use api::v1::meta::DatanodeWorkloads;
279    use common_meta::kv_backend::ResettableKvBackendRef;
280    use common_meta::rpc::store::PutRequest;
281    use common_time::util::current_time_millis;
282    use common_workload::DatanodeWorkloadType;
283
284    use crate::key::{DatanodeLeaseKey, LeaseValue};
285    use crate::lease::{alive_datanodes, is_datanode_accept_ingest_workload};
286    use crate::test_util::create_meta_peer_client;
287
288    async fn put_lease_value(
289        kv_backend: &ResettableKvBackendRef,
290        key: DatanodeLeaseKey,
291        value: LeaseValue,
292    ) {
293        kv_backend
294            .put(PutRequest {
295                key: key.try_into().unwrap(),
296                value: value.try_into().unwrap(),
297                prev_kv: false,
298            })
299            .await
300            .unwrap();
301    }
302
303    #[tokio::test]
304    async fn test_alive_datanodes() {
305        let client = create_meta_peer_client();
306        let in_memory = client.memory_backend();
307        let lease_secs = 10;
308
309        // put a stale lease value for node 1
310        let key = DatanodeLeaseKey { node_id: 1 };
311        let value = LeaseValue {
312            // 20s ago
313            timestamp_millis: current_time_millis() - lease_secs * 2 * 1000,
314            node_addr: "127.0.0.1:20201".to_string(),
315            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
316                types: vec![DatanodeWorkloadType::Hybrid as i32],
317            }),
318        };
319        put_lease_value(&in_memory, key, value).await;
320
321        // put a fresh lease value for node 2
322        let key = DatanodeLeaseKey { node_id: 2 };
323        let value = LeaseValue {
324            timestamp_millis: current_time_millis(),
325            node_addr: "127.0.0.1:20202".to_string(),
326            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
327                types: vec![DatanodeWorkloadType::Hybrid as i32],
328            }),
329        };
330        put_lease_value(&in_memory, key.clone(), value.clone()).await;
331        let leases = alive_datanodes(&client, lease_secs as u64).await.unwrap();
332        assert_eq!(leases.len(), 1);
333        assert_eq!(leases.get(&key), Some(&value));
334    }
335
336    #[tokio::test]
337    async fn test_alive_datanodes_with_condition() {
338        let client = create_meta_peer_client();
339        let in_memory = client.memory_backend();
340        let lease_secs = 10;
341
342        // put a lease value for node 1 without mode info
343        let key = DatanodeLeaseKey { node_id: 1 };
344        let value = LeaseValue {
345            // 20s ago
346            timestamp_millis: current_time_millis() - 20 * 1000,
347            node_addr: "127.0.0.1:20201".to_string(),
348            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
349                types: vec![DatanodeWorkloadType::Hybrid as i32],
350            }),
351        };
352        put_lease_value(&in_memory, key, value).await;
353
354        // put a lease value for node 2 with mode info
355        let key = DatanodeLeaseKey { node_id: 2 };
356        let value = LeaseValue {
357            timestamp_millis: current_time_millis(),
358            node_addr: "127.0.0.1:20202".to_string(),
359            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
360                types: vec![DatanodeWorkloadType::Hybrid as i32],
361            }),
362        };
363        put_lease_value(&in_memory, key, value).await;
364
365        // put a lease value for node 3 with mode info
366        let key = DatanodeLeaseKey { node_id: 3 };
367        let value = LeaseValue {
368            timestamp_millis: current_time_millis(),
369            node_addr: "127.0.0.1:20203".to_string(),
370            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
371                types: vec![i32::MAX],
372            }),
373        };
374        put_lease_value(&in_memory, key, value).await;
375
376        // put a lease value for node 3 with mode info
377        let key = DatanodeLeaseKey { node_id: 4 };
378        let value = LeaseValue {
379            timestamp_millis: current_time_millis(),
380            node_addr: "127.0.0.1:20204".to_string(),
381            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
382                types: vec![i32::MAX],
383            }),
384        };
385        put_lease_value(&in_memory, key, value).await;
386
387        let leases = alive_datanodes(&client, lease_secs as u64)
388            .with_condition(is_datanode_accept_ingest_workload)
389            .await
390            .unwrap();
391        assert_eq!(leases.len(), 1);
392        assert!(leases.contains_key(&DatanodeLeaseKey { node_id: 2 }));
393    }
394}