meta_srv/discovery/
lease.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::kv_backend::KvBackend;
16use common_meta::rpc::KeyValue;
17use common_meta::rpc::store::RangeRequest;
18
19use crate::cluster::MetaPeerClient;
20use crate::error::Result;
21use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
22
23#[derive(Clone, Copy)]
24pub enum LeaseValueType {
25    Flownode,
26    Datanode,
27}
28
29#[async_trait::async_trait]
30pub trait LeaseValueAccessor: Send + Sync {
31    /// Returns the peer id and lease value.
32    async fn lease_values(
33        &self,
34        lease_value_type: LeaseValueType,
35    ) -> Result<Vec<(u64, LeaseValue)>>;
36
37    async fn lease_value(
38        &self,
39        lease_value_type: LeaseValueType,
40        node_id: u64,
41    ) -> Result<Option<(u64, LeaseValue)>>;
42}
43
44fn decoder(lease_value_type: LeaseValueType, kv: KeyValue) -> Result<(u64, LeaseValue)> {
45    match lease_value_type {
46        LeaseValueType::Flownode => {
47            let lease_key: FlownodeLeaseKey = kv.key.try_into()?;
48            let lease_value: LeaseValue = kv.value.try_into()?;
49            Ok((lease_key.node_id, lease_value))
50        }
51        LeaseValueType::Datanode => {
52            let lease_key: DatanodeLeaseKey = kv.key.try_into()?;
53            let lease_value: LeaseValue = kv.value.try_into()?;
54            Ok((lease_key.node_id, lease_value))
55        }
56    }
57}
58
59#[async_trait::async_trait]
60impl LeaseValueAccessor for MetaPeerClient {
61    async fn lease_values(
62        &self,
63        lease_value_type: LeaseValueType,
64    ) -> Result<Vec<(u64, LeaseValue)>> {
65        let prefix = match lease_value_type {
66            LeaseValueType::Flownode => FlownodeLeaseKey::prefix_key_by_cluster(),
67            LeaseValueType::Datanode => DatanodeLeaseKey::prefix_key(),
68        };
69        let range_request = RangeRequest::new().with_prefix(prefix);
70        let response = self.range(range_request).await?;
71        response
72            .kvs
73            .into_iter()
74            .map(|kv| {
75                let (lease_key, lease_value) = decoder(lease_value_type, kv)?;
76                Ok((lease_key, lease_value))
77            })
78            .collect::<Result<Vec<_>>>()
79    }
80
81    async fn lease_value(
82        &self,
83        lease_value_type: LeaseValueType,
84        node_id: u64,
85    ) -> Result<Option<(u64, LeaseValue)>> {
86        let key: Vec<u8> = match lease_value_type {
87            LeaseValueType::Flownode => FlownodeLeaseKey { node_id }.try_into()?,
88            LeaseValueType::Datanode => DatanodeLeaseKey { node_id }.try_into()?,
89        };
90
91        let response = self.get(&key).await?;
92        response.map(|kv| decoder(lease_value_type, kv)).transpose()
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use std::sync::Arc;
99    use std::sync::atomic::{AtomicI64, Ordering};
100    use std::time::Duration;
101
102    use api::v1::meta::heartbeat_request::NodeWorkloads;
103    use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads};
104    use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role};
105    use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
106    use common_meta::kv_backend::ResettableKvBackendRef;
107    use common_meta::peer::{Peer, PeerDiscovery};
108    use common_meta::rpc::store::PutRequest;
109    use common_time::util::{DefaultSystemTimer, SystemTimer, current_time_millis};
110    use common_workload::DatanodeWorkloadType;
111
112    use crate::discovery::utils::{self, accept_ingest_workload};
113    use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
114    use crate::test_util::create_meta_peer_client;
115
116    async fn put_lease_value(
117        kv_backend: &ResettableKvBackendRef,
118        key: DatanodeLeaseKey,
119        value: LeaseValue,
120    ) {
121        kv_backend
122            .put(PutRequest {
123                key: key.try_into().unwrap(),
124                value: value.try_into().unwrap(),
125                prev_kv: false,
126            })
127            .await
128            .unwrap();
129    }
130
131    async fn put_flownode_lease_value(
132        kv_backend: &ResettableKvBackendRef,
133        key: FlownodeLeaseKey,
134        value: LeaseValue,
135    ) {
136        kv_backend
137            .put(PutRequest {
138                key: key.try_into().unwrap(),
139                value: value.try_into().unwrap(),
140                prev_kv: false,
141            })
142            .await
143            .unwrap();
144    }
145
146    struct MockTimer {
147        current: Arc<AtomicI64>,
148    }
149
150    impl SystemTimer for MockTimer {
151        fn current_time_millis(&self) -> i64 {
152            self.current.fetch_add(1, Ordering::Relaxed)
153        }
154
155        fn current_time_rfc3339(&self) -> String {
156            unimplemented!()
157        }
158    }
159
160    #[tokio::test]
161    async fn test_alive_datanodes() {
162        let client = create_meta_peer_client();
163        let in_memory = client.memory_backend();
164        let lease_secs = 10;
165        let timer = DefaultSystemTimer;
166
167        // put a stale lease value for node 1
168        let key = DatanodeLeaseKey { node_id: 1 };
169        let value = LeaseValue {
170            // 20s ago
171            timestamp_millis: timer.current_time_millis() - lease_secs * 2 * 1000,
172            node_addr: "127.0.0.1:20201".to_string(),
173            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
174                types: vec![DatanodeWorkloadType::Hybrid as i32],
175            }),
176        };
177        put_lease_value(&in_memory, key, value).await;
178
179        // put a fresh lease value for node 2
180        let key = DatanodeLeaseKey { node_id: 2 };
181        let value = LeaseValue {
182            timestamp_millis: timer.current_time_millis(),
183            node_addr: "127.0.0.1:20202".to_string(),
184            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
185                types: vec![DatanodeWorkloadType::Hybrid as i32],
186            }),
187        };
188        put_lease_value(&in_memory, key.clone(), value.clone()).await;
189        let peers = utils::alive_datanodes(
190            &timer,
191            client.as_ref(),
192            Duration::from_secs(lease_secs as u64),
193            None,
194        )
195        .await
196        .unwrap();
197        assert_eq!(peers.len(), 1);
198        assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
199    }
200
201    #[tokio::test]
202    async fn test_alive_datanodes_with_timer() {
203        let client = create_meta_peer_client();
204        let in_memory = client.memory_backend();
205        let lease_secs = 10;
206        let timer = MockTimer {
207            current: Arc::new(AtomicI64::new(current_time_millis())),
208        };
209
210        let key = DatanodeLeaseKey { node_id: 2 };
211        let value = LeaseValue {
212            timestamp_millis: timer.current_time_millis(),
213            node_addr: "127.0.0.1:20202".to_string(),
214            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
215                types: vec![DatanodeWorkloadType::Hybrid as i32],
216            }),
217        };
218        put_lease_value(&in_memory, key.clone(), value.clone()).await;
219        let peers = utils::alive_datanodes(
220            &timer,
221            client.as_ref(),
222            Duration::from_secs(lease_secs as u64),
223            None,
224        )
225        .await
226        .unwrap();
227        assert_eq!(peers.len(), 1);
228        assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
229    }
230
231    #[tokio::test]
232    async fn test_alive_datanodes_with_condition() {
233        let client = create_meta_peer_client();
234        let in_memory = client.memory_backend();
235        let lease_secs = 10;
236        let timer = DefaultSystemTimer;
237
238        // put a lease value for node 1 without mode info
239        let key = DatanodeLeaseKey { node_id: 1 };
240        let value = LeaseValue {
241            // 20s ago
242            timestamp_millis: timer.current_time_millis() - 20 * 1000,
243            node_addr: "127.0.0.1:20201".to_string(),
244            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
245                types: vec![DatanodeWorkloadType::Hybrid as i32],
246            }),
247        };
248        put_lease_value(&in_memory, key, value).await;
249
250        // put a lease value for node 2 with mode info
251        let key = DatanodeLeaseKey { node_id: 2 };
252        let value = LeaseValue {
253            timestamp_millis: timer.current_time_millis(),
254            node_addr: "127.0.0.1:20202".to_string(),
255            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
256                types: vec![DatanodeWorkloadType::Hybrid as i32],
257            }),
258        };
259        put_lease_value(&in_memory, key, value).await;
260
261        // put a lease value for node 3 with mode info
262        let key = DatanodeLeaseKey { node_id: 3 };
263        let value = LeaseValue {
264            timestamp_millis: timer.current_time_millis(),
265            node_addr: "127.0.0.1:20203".to_string(),
266            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
267                types: vec![i32::MAX],
268            }),
269        };
270        put_lease_value(&in_memory, key, value).await;
271
272        // put a lease value for node 3 with mode info
273        let key = DatanodeLeaseKey { node_id: 4 };
274        let value = LeaseValue {
275            timestamp_millis: timer.current_time_millis(),
276            node_addr: "127.0.0.1:20204".to_string(),
277            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
278                types: vec![i32::MAX],
279            }),
280        };
281        put_lease_value(&in_memory, key, value).await;
282
283        let peers = utils::alive_datanodes(
284            &timer,
285            client.as_ref(),
286            Duration::from_secs(lease_secs),
287            Some(accept_ingest_workload),
288        )
289        .await
290        .unwrap();
291        assert_eq!(peers.len(), 1);
292        assert!(peers.contains(&Peer::new(2, "127.0.0.1:20202".to_string())));
293    }
294
295    #[tokio::test]
296    async fn test_alive_flownodes() {
297        let client = create_meta_peer_client();
298        let in_memory = client.memory_backend();
299        let lease_secs = 10;
300        let timer = DefaultSystemTimer;
301
302        // put a stale lease value for node 1
303        let key = FlownodeLeaseKey { node_id: 1 };
304        let value = LeaseValue {
305            // 20s ago
306            timestamp_millis: timer.current_time_millis() - lease_secs * 2 * 1000,
307            node_addr: "127.0.0.1:20201".to_string(),
308            workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
309        };
310        put_flownode_lease_value(&in_memory, key, value).await;
311
312        // put a fresh lease value for node 2
313        let key = FlownodeLeaseKey { node_id: 2 };
314        let value = LeaseValue {
315            timestamp_millis: timer.current_time_millis(),
316            node_addr: "127.0.0.1:20202".to_string(),
317            workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
318        };
319        put_flownode_lease_value(&in_memory, key.clone(), value.clone()).await;
320        let peers = utils::alive_flownodes(
321            &timer,
322            client.as_ref(),
323            Duration::from_secs(lease_secs as u64),
324            None,
325        )
326        .await
327        .unwrap();
328        assert_eq!(peers.len(), 1);
329        assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
330    }
331
332    #[tokio::test]
333    async fn test_alive_flownodes_with_timer() {
334        let client = create_meta_peer_client();
335        let in_memory = client.memory_backend();
336        let lease_secs = 10;
337        let timer = MockTimer {
338            current: Arc::new(AtomicI64::new(current_time_millis())),
339        };
340
341        let key = FlownodeLeaseKey { node_id: 2 };
342        let value = LeaseValue {
343            timestamp_millis: timer.current_time_millis(),
344            node_addr: "127.0.0.1:20202".to_string(),
345            workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
346        };
347        put_flownode_lease_value(&in_memory, key.clone(), value.clone()).await;
348        let peers = utils::alive_flownodes(
349            &timer,
350            client.as_ref(),
351            Duration::from_secs(lease_secs as u64),
352            None,
353        )
354        .await
355        .unwrap();
356        assert_eq!(peers.len(), 1);
357        assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
358    }
359
360    #[tokio::test]
361    async fn test_lookup_frontends() {
362        let client = create_meta_peer_client();
363        let in_memory = client.memory_backend();
364        let lease_secs = 10;
365        let timer = DefaultSystemTimer;
366
367        let active_frontend_node = NodeInfo {
368            peer: Peer {
369                id: 0,
370                addr: "127.0.0.1:20201".to_string(),
371            },
372            last_activity_ts: timer.current_time_millis(),
373            status: NodeStatus::Frontend(FrontendStatus {}),
374            version: "1.0.0".to_string(),
375            git_commit: "1234567890".to_string(),
376            start_time_ms: current_time_millis() as u64,
377            total_cpu_millicores: 0,
378            total_memory_bytes: 0,
379            cpu_usage_millicores: 0,
380            memory_usage_bytes: 0,
381            hostname: "test_hostname".to_string(),
382        };
383
384        let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
385
386        in_memory
387            .put(PutRequest {
388                key: format!("{}{}", key_prefix, "0").into(),
389                value: active_frontend_node.try_into().unwrap(),
390                prev_kv: false,
391            })
392            .await
393            .unwrap();
394
395        let inactive_frontend_node = NodeInfo {
396            peer: Peer {
397                id: 1,
398                addr: "127.0.0.1:20201".to_string(),
399            },
400            last_activity_ts: timer.current_time_millis() - 20 * 1000,
401            status: NodeStatus::Frontend(FrontendStatus {}),
402            version: "1.0.0".to_string(),
403            git_commit: "1234567890".to_string(),
404            start_time_ms: current_time_millis() as u64,
405            total_cpu_millicores: 0,
406            total_memory_bytes: 0,
407            cpu_usage_millicores: 0,
408            memory_usage_bytes: 0,
409            hostname: "test_hostname".to_string(),
410        };
411
412        in_memory
413            .put(PutRequest {
414                key: format!("{}{}", key_prefix, "1").into(),
415                value: inactive_frontend_node.try_into().unwrap(),
416                prev_kv: false,
417            })
418            .await
419            .unwrap();
420
421        let peers =
422            utils::alive_frontends(&timer, client.as_ref(), Duration::from_secs(lease_secs))
423                .await
424                .unwrap();
425        assert_eq!(peers.len(), 1);
426        assert_eq!(peers[0].id, 0);
427    }
428
429    #[tokio::test]
430    async fn test_lookup_frontends_with_timer() {
431        let client = create_meta_peer_client();
432        let in_memory = client.memory_backend();
433        let lease_secs = 10;
434        let timer = MockTimer {
435            current: Arc::new(AtomicI64::new(current_time_millis())),
436        };
437
438        let active_frontend_node = NodeInfo {
439            peer: Peer {
440                id: 0,
441                addr: "127.0.0.1:20201".to_string(),
442            },
443            last_activity_ts: timer.current_time_millis(),
444            status: NodeStatus::Frontend(FrontendStatus {}),
445            version: "1.0.0".to_string(),
446            git_commit: "1234567890".to_string(),
447            start_time_ms: current_time_millis() as u64,
448            total_cpu_millicores: 0,
449            total_memory_bytes: 0,
450            cpu_usage_millicores: 0,
451            memory_usage_bytes: 0,
452            hostname: "test_hostname".to_string(),
453        };
454        let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
455        in_memory
456            .put(PutRequest {
457                key: format!("{}{}", key_prefix, "0").into(),
458                value: active_frontend_node.try_into().unwrap(),
459                prev_kv: false,
460            })
461            .await
462            .unwrap();
463        let peers =
464            utils::alive_frontends(&timer, client.as_ref(), Duration::from_secs(lease_secs))
465                .await
466                .unwrap();
467        assert_eq!(peers.len(), 1);
468        assert_eq!(peers[0].id, 0);
469    }
470
471    #[tokio::test]
472    async fn test_no_active_frontends() {
473        let client = create_meta_peer_client();
474        let in_memory = client.memory_backend();
475
476        let last_activity_ts =
477            current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000;
478        let active_frontend_node = NodeInfo {
479            peer: Peer {
480                id: 0,
481                addr: "127.0.0.1:20201".to_string(),
482            },
483            last_activity_ts,
484            status: NodeStatus::Frontend(FrontendStatus {}),
485            version: "1.0.0".to_string(),
486            git_commit: "1234567890".to_string(),
487            start_time_ms: last_activity_ts as u64,
488            total_cpu_millicores: 0,
489            total_memory_bytes: 0,
490            cpu_usage_millicores: 0,
491            memory_usage_bytes: 0,
492            hostname: "test_hostname".to_string(),
493        };
494
495        let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
496
497        in_memory
498            .put(PutRequest {
499                key: format!("{}{}", key_prefix, "0").into(),
500                value: active_frontend_node.try_into().unwrap(),
501                prev_kv: false,
502            })
503            .await
504            .unwrap();
505
506        let peers = client.active_frontends().await.unwrap();
507        assert_eq!(peers.len(), 0);
508    }
509}