1use common_meta::kv_backend::KvBackend;
16use common_meta::rpc::KeyValue;
17use common_meta::rpc::store::RangeRequest;
18
19use crate::cluster::MetaPeerClient;
20use crate::error::Result;
21use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
22
23#[derive(Clone, Copy)]
24pub enum LeaseValueType {
25 Flownode,
26 Datanode,
27}
28
29#[async_trait::async_trait]
30pub trait LeaseValueAccessor: Send + Sync {
31 async fn lease_values(
33 &self,
34 lease_value_type: LeaseValueType,
35 ) -> Result<Vec<(u64, LeaseValue)>>;
36
37 async fn lease_value(
38 &self,
39 lease_value_type: LeaseValueType,
40 node_id: u64,
41 ) -> Result<Option<(u64, LeaseValue)>>;
42}
43
44fn decoder(lease_value_type: LeaseValueType, kv: KeyValue) -> Result<(u64, LeaseValue)> {
45 match lease_value_type {
46 LeaseValueType::Flownode => {
47 let lease_key: FlownodeLeaseKey = kv.key.try_into()?;
48 let lease_value: LeaseValue = kv.value.try_into()?;
49 Ok((lease_key.node_id, lease_value))
50 }
51 LeaseValueType::Datanode => {
52 let lease_key: DatanodeLeaseKey = kv.key.try_into()?;
53 let lease_value: LeaseValue = kv.value.try_into()?;
54 Ok((lease_key.node_id, lease_value))
55 }
56 }
57}
58
59#[async_trait::async_trait]
60impl LeaseValueAccessor for MetaPeerClient {
61 async fn lease_values(
62 &self,
63 lease_value_type: LeaseValueType,
64 ) -> Result<Vec<(u64, LeaseValue)>> {
65 let prefix = match lease_value_type {
66 LeaseValueType::Flownode => FlownodeLeaseKey::prefix_key_by_cluster(),
67 LeaseValueType::Datanode => DatanodeLeaseKey::prefix_key(),
68 };
69 let range_request = RangeRequest::new().with_prefix(prefix);
70 let response = self.range(range_request).await?;
71 response
72 .kvs
73 .into_iter()
74 .map(|kv| {
75 let (lease_key, lease_value) = decoder(lease_value_type, kv)?;
76 Ok((lease_key, lease_value))
77 })
78 .collect::<Result<Vec<_>>>()
79 }
80
81 async fn lease_value(
82 &self,
83 lease_value_type: LeaseValueType,
84 node_id: u64,
85 ) -> Result<Option<(u64, LeaseValue)>> {
86 let key: Vec<u8> = match lease_value_type {
87 LeaseValueType::Flownode => FlownodeLeaseKey { node_id }.try_into()?,
88 LeaseValueType::Datanode => DatanodeLeaseKey { node_id }.try_into()?,
89 };
90
91 let response = self.get(&key).await?;
92 response.map(|kv| decoder(lease_value_type, kv)).transpose()
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use std::collections::HashMap;
99
100 use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, FrontendWorkloads};
101 use common_meta::cluster::{
102 DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role,
103 };
104 use common_meta::distributed_time_constants::default_distributed_time_constants;
105 use common_meta::kv_backend::ResettableKvBackendRef;
106 use common_meta::peer::{Peer, PeerDiscovery};
107 use common_meta::rpc::store::PutRequest;
108 use common_time::util::current_time_millis;
109 use common_workload::DatanodeWorkloadType;
110
111 use crate::discovery::utils::accept_ingest_workload;
112 use crate::test_util::create_meta_peer_client;
113
114 async fn put_node_info(kv_backend: &ResettableKvBackendRef, key: NodeInfoKey, value: NodeInfo) {
115 kv_backend
116 .put(PutRequest {
117 key: (&key).into(),
118 value: value.try_into().unwrap(),
119 prev_kv: false,
120 })
121 .await
122 .unwrap();
123 }
124
125 #[tokio::test]
126 async fn test_active_datanodes_returns_node_info_with_env_vars() {
127 let client = create_meta_peer_client();
128 let in_memory = client.memory_backend();
129
130 let mut env_vars = HashMap::new();
131 env_vars.insert("AZ".to_string(), "az-a".to_string());
132
133 put_node_info(
134 &in_memory,
135 NodeInfoKey {
136 role: Role::Datanode,
137 node_id: 1,
138 },
139 NodeInfo {
140 peer: Peer::new(1, "127.0.0.1:4001".to_string()),
141 last_activity_ts: current_time_millis(),
142 status: NodeStatus::Datanode(DatanodeStatus {
143 rcus: 0,
144 wcus: 0,
145 leader_regions: 0,
146 follower_regions: 0,
147 workloads: DatanodeWorkloads {
148 types: vec![DatanodeWorkloadType::Hybrid as i32],
149 },
150 }),
151 version: String::new(),
152 git_commit: String::new(),
153 start_time_ms: 0,
154 total_cpu_millicores: 0,
155 total_memory_bytes: 0,
156 cpu_usage_millicores: 0,
157 memory_usage_bytes: 0,
158 hostname: String::new(),
159 env_vars,
160 },
161 )
162 .await;
163 let nodes = client
164 .active_datanodes(Some(accept_ingest_workload))
165 .await
166 .unwrap();
167
168 assert_eq!(nodes.len(), 1);
169 assert_eq!(nodes[0].peer.id, 1);
170 assert_eq!(
171 nodes[0].env_vars.get("AZ").map(String::as_str),
172 Some("az-a")
173 );
174 }
175
176 #[tokio::test]
177 async fn test_active_flownodes_returns_node_info() {
178 let client = create_meta_peer_client();
179 let in_memory = client.memory_backend();
180
181 put_node_info(
182 &in_memory,
183 NodeInfoKey {
184 role: Role::Flownode,
185 node_id: 11,
186 },
187 NodeInfo {
188 peer: Peer::new(11, "127.0.0.1:5001".to_string()),
189 last_activity_ts: current_time_millis(),
190 status: NodeStatus::Flownode(FlownodeStatus {
191 workloads: FlownodeWorkloads { types: vec![7] },
192 }),
193 version: String::new(),
194 git_commit: String::new(),
195 start_time_ms: 0,
196 total_cpu_millicores: 0,
197 total_memory_bytes: 0,
198 cpu_usage_millicores: 0,
199 memory_usage_bytes: 0,
200 hostname: String::new(),
201 env_vars: Default::default(),
202 },
203 )
204 .await;
205
206 let nodes = client.active_flownodes(None).await.unwrap();
207
208 assert_eq!(nodes.len(), 1);
209 assert_eq!(nodes[0].peer.id, 11);
210 }
211
212 #[tokio::test]
213 async fn test_no_active_frontends() {
214 let client = create_meta_peer_client();
215 let in_memory = client.memory_backend();
216
217 let frontend_heartbeat_interval =
218 default_distributed_time_constants().frontend_heartbeat_interval;
219 let last_activity_ts =
220 current_time_millis() - frontend_heartbeat_interval.as_millis() as i64 - 1000;
221 let active_frontend_node = NodeInfo {
222 peer: Peer {
223 id: 0,
224 addr: "127.0.0.1:20201".to_string(),
225 },
226 last_activity_ts,
227 status: NodeStatus::Frontend(FrontendStatus {
228 workloads: FrontendWorkloads { types: vec![] },
229 }),
230 version: "1.0.0".to_string(),
231 git_commit: "1234567890".to_string(),
232 start_time_ms: last_activity_ts as u64,
233 total_cpu_millicores: 0,
234 total_memory_bytes: 0,
235 cpu_usage_millicores: 0,
236 memory_usage_bytes: 0,
237 hostname: "test_hostname".to_string(),
238 env_vars: Default::default(),
239 };
240
241 let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
242
243 in_memory
244 .put(PutRequest {
245 key: format!("{}{}", key_prefix, "0").into(),
246 value: active_frontend_node.try_into().unwrap(),
247 prev_kv: false,
248 })
249 .await
250 .unwrap();
251
252 let peers = client.active_frontends().await.unwrap();
253 assert_eq!(peers.len(), 0);
254 }
255}