Skip to main content

meta_srv/discovery/
lease.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::kv_backend::KvBackend;
16use common_meta::rpc::KeyValue;
17use common_meta::rpc::store::RangeRequest;
18
19use crate::cluster::MetaPeerClient;
20use crate::error::Result;
21use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
22
23#[derive(Clone, Copy)]
24pub enum LeaseValueType {
25    Flownode,
26    Datanode,
27}
28
29#[async_trait::async_trait]
30pub trait LeaseValueAccessor: Send + Sync {
31    /// Returns the peer id and lease value.
32    async fn lease_values(
33        &self,
34        lease_value_type: LeaseValueType,
35    ) -> Result<Vec<(u64, LeaseValue)>>;
36
37    async fn lease_value(
38        &self,
39        lease_value_type: LeaseValueType,
40        node_id: u64,
41    ) -> Result<Option<(u64, LeaseValue)>>;
42}
43
44fn decoder(lease_value_type: LeaseValueType, kv: KeyValue) -> Result<(u64, LeaseValue)> {
45    match lease_value_type {
46        LeaseValueType::Flownode => {
47            let lease_key: FlownodeLeaseKey = kv.key.try_into()?;
48            let lease_value: LeaseValue = kv.value.try_into()?;
49            Ok((lease_key.node_id, lease_value))
50        }
51        LeaseValueType::Datanode => {
52            let lease_key: DatanodeLeaseKey = kv.key.try_into()?;
53            let lease_value: LeaseValue = kv.value.try_into()?;
54            Ok((lease_key.node_id, lease_value))
55        }
56    }
57}
58
59#[async_trait::async_trait]
60impl LeaseValueAccessor for MetaPeerClient {
61    async fn lease_values(
62        &self,
63        lease_value_type: LeaseValueType,
64    ) -> Result<Vec<(u64, LeaseValue)>> {
65        let prefix = match lease_value_type {
66            LeaseValueType::Flownode => FlownodeLeaseKey::prefix_key_by_cluster(),
67            LeaseValueType::Datanode => DatanodeLeaseKey::prefix_key(),
68        };
69        let range_request = RangeRequest::new().with_prefix(prefix);
70        let response = self.range(range_request).await?;
71        response
72            .kvs
73            .into_iter()
74            .map(|kv| {
75                let (lease_key, lease_value) = decoder(lease_value_type, kv)?;
76                Ok((lease_key, lease_value))
77            })
78            .collect::<Result<Vec<_>>>()
79    }
80
81    async fn lease_value(
82        &self,
83        lease_value_type: LeaseValueType,
84        node_id: u64,
85    ) -> Result<Option<(u64, LeaseValue)>> {
86        let key: Vec<u8> = match lease_value_type {
87            LeaseValueType::Flownode => FlownodeLeaseKey { node_id }.try_into()?,
88            LeaseValueType::Datanode => DatanodeLeaseKey { node_id }.try_into()?,
89        };
90
91        let response = self.get(&key).await?;
92        response.map(|kv| decoder(lease_value_type, kv)).transpose()
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use std::collections::HashMap;
99
100    use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, FrontendWorkloads};
101    use common_meta::cluster::{
102        DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role,
103    };
104    use common_meta::distributed_time_constants::default_distributed_time_constants;
105    use common_meta::kv_backend::ResettableKvBackendRef;
106    use common_meta::peer::{Peer, PeerDiscovery};
107    use common_meta::rpc::store::PutRequest;
108    use common_time::util::current_time_millis;
109    use common_workload::DatanodeWorkloadType;
110
111    use crate::discovery::utils::accept_ingest_workload;
112    use crate::test_util::create_meta_peer_client;
113
114    async fn put_node_info(kv_backend: &ResettableKvBackendRef, key: NodeInfoKey, value: NodeInfo) {
115        kv_backend
116            .put(PutRequest {
117                key: (&key).into(),
118                value: value.try_into().unwrap(),
119                prev_kv: false,
120            })
121            .await
122            .unwrap();
123    }
124
125    #[tokio::test]
126    async fn test_active_datanodes_returns_node_info_with_env_vars() {
127        let client = create_meta_peer_client();
128        let in_memory = client.memory_backend();
129
130        let mut env_vars = HashMap::new();
131        env_vars.insert("AZ".to_string(), "az-a".to_string());
132
133        put_node_info(
134            &in_memory,
135            NodeInfoKey {
136                role: Role::Datanode,
137                node_id: 1,
138            },
139            NodeInfo {
140                peer: Peer::new(1, "127.0.0.1:4001".to_string()),
141                last_activity_ts: current_time_millis(),
142                status: NodeStatus::Datanode(DatanodeStatus {
143                    rcus: 0,
144                    wcus: 0,
145                    leader_regions: 0,
146                    follower_regions: 0,
147                    workloads: DatanodeWorkloads {
148                        types: vec![DatanodeWorkloadType::Hybrid as i32],
149                    },
150                }),
151                version: String::new(),
152                git_commit: String::new(),
153                start_time_ms: 0,
154                total_cpu_millicores: 0,
155                total_memory_bytes: 0,
156                cpu_usage_millicores: 0,
157                memory_usage_bytes: 0,
158                hostname: String::new(),
159                env_vars,
160            },
161        )
162        .await;
163        let nodes = client
164            .active_datanodes(Some(accept_ingest_workload))
165            .await
166            .unwrap();
167
168        assert_eq!(nodes.len(), 1);
169        assert_eq!(nodes[0].peer.id, 1);
170        assert_eq!(
171            nodes[0].env_vars.get("AZ").map(String::as_str),
172            Some("az-a")
173        );
174    }
175
176    #[tokio::test]
177    async fn test_active_flownodes_returns_node_info() {
178        let client = create_meta_peer_client();
179        let in_memory = client.memory_backend();
180
181        put_node_info(
182            &in_memory,
183            NodeInfoKey {
184                role: Role::Flownode,
185                node_id: 11,
186            },
187            NodeInfo {
188                peer: Peer::new(11, "127.0.0.1:5001".to_string()),
189                last_activity_ts: current_time_millis(),
190                status: NodeStatus::Flownode(FlownodeStatus {
191                    workloads: FlownodeWorkloads { types: vec![7] },
192                }),
193                version: String::new(),
194                git_commit: String::new(),
195                start_time_ms: 0,
196                total_cpu_millicores: 0,
197                total_memory_bytes: 0,
198                cpu_usage_millicores: 0,
199                memory_usage_bytes: 0,
200                hostname: String::new(),
201                env_vars: Default::default(),
202            },
203        )
204        .await;
205
206        let nodes = client.active_flownodes(None).await.unwrap();
207
208        assert_eq!(nodes.len(), 1);
209        assert_eq!(nodes[0].peer.id, 11);
210    }
211
212    #[tokio::test]
213    async fn test_no_active_frontends() {
214        let client = create_meta_peer_client();
215        let in_memory = client.memory_backend();
216
217        let frontend_heartbeat_interval =
218            default_distributed_time_constants().frontend_heartbeat_interval;
219        let last_activity_ts =
220            current_time_millis() - frontend_heartbeat_interval.as_millis() as i64 - 1000;
221        let active_frontend_node = NodeInfo {
222            peer: Peer {
223                id: 0,
224                addr: "127.0.0.1:20201".to_string(),
225            },
226            last_activity_ts,
227            status: NodeStatus::Frontend(FrontendStatus {
228                workloads: FrontendWorkloads { types: vec![] },
229            }),
230            version: "1.0.0".to_string(),
231            git_commit: "1234567890".to_string(),
232            start_time_ms: last_activity_ts as u64,
233            total_cpu_millicores: 0,
234            total_memory_bytes: 0,
235            cpu_usage_millicores: 0,
236            memory_usage_bytes: 0,
237            hostname: "test_hostname".to_string(),
238            env_vars: Default::default(),
239        };
240
241        let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
242
243        in_memory
244            .put(PutRequest {
245                key: format!("{}{}", key_prefix, "0").into(),
246                value: active_frontend_node.try_into().unwrap(),
247                prev_kv: false,
248            })
249            .await
250            .unwrap();
251
252        let peers = client.active_frontends().await.unwrap();
253        assert_eq!(peers.len(), 0);
254    }
255}