meta_srv/discovery/
lease.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::kv_backend::KvBackend;
16use common_meta::rpc::KeyValue;
17use common_meta::rpc::store::RangeRequest;
18
19use crate::cluster::MetaPeerClient;
20use crate::error::Result;
21use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
22
23#[derive(Clone, Copy)]
24pub enum LeaseValueType {
25    Flownode,
26    Datanode,
27}
28
29#[async_trait::async_trait]
30pub trait LeaseValueAccessor: Send + Sync {
31    /// Returns the peer id and lease value.
32    async fn lease_values(
33        &self,
34        lease_value_type: LeaseValueType,
35    ) -> Result<Vec<(u64, LeaseValue)>>;
36
37    async fn lease_value(
38        &self,
39        lease_value_type: LeaseValueType,
40        node_id: u64,
41    ) -> Result<Option<(u64, LeaseValue)>>;
42}
43
44fn decoder(lease_value_type: LeaseValueType, kv: KeyValue) -> Result<(u64, LeaseValue)> {
45    match lease_value_type {
46        LeaseValueType::Flownode => {
47            let lease_key: FlownodeLeaseKey = kv.key.try_into()?;
48            let lease_value: LeaseValue = kv.value.try_into()?;
49            Ok((lease_key.node_id, lease_value))
50        }
51        LeaseValueType::Datanode => {
52            let lease_key: DatanodeLeaseKey = kv.key.try_into()?;
53            let lease_value: LeaseValue = kv.value.try_into()?;
54            Ok((lease_key.node_id, lease_value))
55        }
56    }
57}
58
59#[async_trait::async_trait]
60impl LeaseValueAccessor for MetaPeerClient {
61    async fn lease_values(
62        &self,
63        lease_value_type: LeaseValueType,
64    ) -> Result<Vec<(u64, LeaseValue)>> {
65        let prefix = match lease_value_type {
66            LeaseValueType::Flownode => FlownodeLeaseKey::prefix_key_by_cluster(),
67            LeaseValueType::Datanode => DatanodeLeaseKey::prefix_key(),
68        };
69        let range_request = RangeRequest::new().with_prefix(prefix);
70        let response = self.range(range_request).await?;
71        response
72            .kvs
73            .into_iter()
74            .map(|kv| {
75                let (lease_key, lease_value) = decoder(lease_value_type, kv)?;
76                Ok((lease_key, lease_value))
77            })
78            .collect::<Result<Vec<_>>>()
79    }
80
81    async fn lease_value(
82        &self,
83        lease_value_type: LeaseValueType,
84        node_id: u64,
85    ) -> Result<Option<(u64, LeaseValue)>> {
86        let key: Vec<u8> = match lease_value_type {
87            LeaseValueType::Flownode => FlownodeLeaseKey { node_id }.try_into()?,
88            LeaseValueType::Datanode => DatanodeLeaseKey { node_id }.try_into()?,
89        };
90
91        let response = self.get(&key).await?;
92        response.map(|kv| decoder(lease_value_type, kv)).transpose()
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use std::time::Duration;
99
100    use api::v1::meta::DatanodeWorkloads;
101    use api::v1::meta::heartbeat_request::NodeWorkloads;
102    use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role};
103    use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
104    use common_meta::kv_backend::ResettableKvBackendRef;
105    use common_meta::peer::{Peer, PeerDiscovery};
106    use common_meta::rpc::store::PutRequest;
107    use common_time::util::current_time_millis;
108    use common_workload::DatanodeWorkloadType;
109
110    use crate::discovery::utils::{self, accept_ingest_workload};
111    use crate::key::{DatanodeLeaseKey, LeaseValue};
112    use crate::test_util::create_meta_peer_client;
113
114    async fn put_lease_value(
115        kv_backend: &ResettableKvBackendRef,
116        key: DatanodeLeaseKey,
117        value: LeaseValue,
118    ) {
119        kv_backend
120            .put(PutRequest {
121                key: key.try_into().unwrap(),
122                value: value.try_into().unwrap(),
123                prev_kv: false,
124            })
125            .await
126            .unwrap();
127    }
128
129    #[tokio::test]
130    async fn test_alive_datanodes() {
131        let client = create_meta_peer_client();
132        let in_memory = client.memory_backend();
133        let lease_secs = 10;
134
135        // put a stale lease value for node 1
136        let key = DatanodeLeaseKey { node_id: 1 };
137        let value = LeaseValue {
138            // 20s ago
139            timestamp_millis: current_time_millis() - lease_secs * 2 * 1000,
140            node_addr: "127.0.0.1:20201".to_string(),
141            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
142                types: vec![DatanodeWorkloadType::Hybrid as i32],
143            }),
144        };
145        put_lease_value(&in_memory, key, value).await;
146
147        // put a fresh lease value for node 2
148        let key = DatanodeLeaseKey { node_id: 2 };
149        let value = LeaseValue {
150            timestamp_millis: current_time_millis(),
151            node_addr: "127.0.0.1:20202".to_string(),
152            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
153                types: vec![DatanodeWorkloadType::Hybrid as i32],
154            }),
155        };
156        put_lease_value(&in_memory, key.clone(), value.clone()).await;
157        let peers = utils::alive_datanodes(
158            client.as_ref(),
159            Duration::from_secs(lease_secs as u64),
160            None,
161        )
162        .await
163        .unwrap();
164        assert_eq!(peers.len(), 1);
165        assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
166    }
167
168    #[tokio::test]
169    async fn test_alive_datanodes_with_condition() {
170        let client = create_meta_peer_client();
171        let in_memory = client.memory_backend();
172        let lease_secs = 10;
173
174        // put a lease value for node 1 without mode info
175        let key = DatanodeLeaseKey { node_id: 1 };
176        let value = LeaseValue {
177            // 20s ago
178            timestamp_millis: current_time_millis() - 20 * 1000,
179            node_addr: "127.0.0.1:20201".to_string(),
180            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
181                types: vec![DatanodeWorkloadType::Hybrid as i32],
182            }),
183        };
184        put_lease_value(&in_memory, key, value).await;
185
186        // put a lease value for node 2 with mode info
187        let key = DatanodeLeaseKey { node_id: 2 };
188        let value = LeaseValue {
189            timestamp_millis: current_time_millis(),
190            node_addr: "127.0.0.1:20202".to_string(),
191            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
192                types: vec![DatanodeWorkloadType::Hybrid as i32],
193            }),
194        };
195        put_lease_value(&in_memory, key, value).await;
196
197        // put a lease value for node 3 with mode info
198        let key = DatanodeLeaseKey { node_id: 3 };
199        let value = LeaseValue {
200            timestamp_millis: current_time_millis(),
201            node_addr: "127.0.0.1:20203".to_string(),
202            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
203                types: vec![i32::MAX],
204            }),
205        };
206        put_lease_value(&in_memory, key, value).await;
207
208        // put a lease value for node 3 with mode info
209        let key = DatanodeLeaseKey { node_id: 4 };
210        let value = LeaseValue {
211            timestamp_millis: current_time_millis(),
212            node_addr: "127.0.0.1:20204".to_string(),
213            workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
214                types: vec![i32::MAX],
215            }),
216        };
217        put_lease_value(&in_memory, key, value).await;
218
219        let peers = utils::alive_datanodes(
220            client.as_ref(),
221            Duration::from_secs(lease_secs),
222            Some(accept_ingest_workload),
223        )
224        .await
225        .unwrap();
226        assert_eq!(peers.len(), 1);
227        assert!(peers.contains(&Peer::new(2, "127.0.0.1:20202".to_string())));
228    }
229
230    #[tokio::test]
231    async fn test_lookup_frontends() {
232        let client = create_meta_peer_client();
233        let in_memory = client.memory_backend();
234        let lease_secs = 10;
235
236        let active_frontend_node = NodeInfo {
237            peer: Peer {
238                id: 0,
239                addr: "127.0.0.1:20201".to_string(),
240            },
241            last_activity_ts: current_time_millis(),
242            status: NodeStatus::Frontend(FrontendStatus {}),
243            version: "1.0.0".to_string(),
244            git_commit: "1234567890".to_string(),
245            start_time_ms: current_time_millis() as u64,
246            cpus: 0,
247            memory_bytes: 0,
248        };
249
250        let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
251
252        in_memory
253            .put(PutRequest {
254                key: format!("{}{}", key_prefix, "0").into(),
255                value: active_frontend_node.try_into().unwrap(),
256                prev_kv: false,
257            })
258            .await
259            .unwrap();
260
261        let inactive_frontend_node = NodeInfo {
262            peer: Peer {
263                id: 1,
264                addr: "127.0.0.1:20201".to_string(),
265            },
266            last_activity_ts: current_time_millis() - 20 * 1000,
267            status: NodeStatus::Frontend(FrontendStatus {}),
268            version: "1.0.0".to_string(),
269            git_commit: "1234567890".to_string(),
270            start_time_ms: current_time_millis() as u64,
271            cpus: 0,
272            memory_bytes: 0,
273        };
274
275        in_memory
276            .put(PutRequest {
277                key: format!("{}{}", key_prefix, "1").into(),
278                value: inactive_frontend_node.try_into().unwrap(),
279                prev_kv: false,
280            })
281            .await
282            .unwrap();
283
284        let peers = utils::alive_frontends(client.as_ref(), Duration::from_secs(lease_secs))
285            .await
286            .unwrap();
287        assert_eq!(peers.len(), 1);
288        assert_eq!(peers[0].id, 0);
289    }
290
291    #[tokio::test]
292    async fn test_no_active_frontends() {
293        let client = create_meta_peer_client();
294        let in_memory = client.memory_backend();
295
296        let last_activity_ts =
297            current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000;
298        let active_frontend_node = NodeInfo {
299            peer: Peer {
300                id: 0,
301                addr: "127.0.0.1:20201".to_string(),
302            },
303            last_activity_ts,
304            status: NodeStatus::Frontend(FrontendStatus {}),
305            version: "1.0.0".to_string(),
306            git_commit: "1234567890".to_string(),
307            start_time_ms: last_activity_ts as u64,
308            cpus: 0,
309            memory_bytes: 0,
310        };
311
312        let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
313
314        in_memory
315            .put(PutRequest {
316                key: format!("{}{}", key_prefix, "0").into(),
317                value: active_frontend_node.try_into().unwrap(),
318                prev_kv: false,
319            })
320            .await
321            .unwrap();
322
323        let peers = client.active_frontends().await.unwrap();
324        assert_eq!(peers.len(), 0);
325    }
326}