1use std::hash::{DefaultHasher, Hash, Hasher};
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest};
19use common_error::ext::ErrorExt;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt, ensure};
24
25use crate::datanode::RegionStat;
26use crate::error::{
27 DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
28 InvalidRoleSnafu, ParseNumSnafu, Result,
29};
30use crate::key::flow::flow_state::FlowStat;
31use crate::peer::Peer;
32
33const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";
34
35lazy_static! {
36 static ref CLUSTER_NODE_INFO_PREFIX_PATTERN: Regex = Regex::new(&format!(
37 "^{CLUSTER_NODE_INFO_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
38 ))
39 .unwrap();
40}
41
42#[async_trait::async_trait]
44pub trait ClusterInfo {
45 type Error: ErrorExt;
46
47 async fn list_nodes(
49 &self,
50 role: Option<Role>,
51 ) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
52
53 async fn list_region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
55
56 async fn list_flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
58
59 }
61
62#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
64pub struct NodeInfoKey {
65 pub role: Role,
67 pub node_id: u64,
69}
70
71impl NodeInfoKey {
72 pub fn new(request: &HeartbeatRequest) -> Option<Self> {
75 let HeartbeatRequest { header, peer, .. } = request;
76 let header = header.as_ref()?;
77 let peer = peer.as_ref()?;
78
79 let role = header.role.try_into().ok()?;
80 let node_id = match role {
81 Role::Frontend => calculate_node_id(&peer.addr),
84 _ => peer.id,
85 };
86
87 Some(NodeInfoKey { role, node_id })
88 }
89
90 pub fn key_prefix() -> String {
91 format!("{}-0-", CLUSTER_NODE_INFO_PREFIX)
92 }
93
94 pub fn key_prefix_with_role(role: Role) -> String {
95 format!("{}-0-{}-", CLUSTER_NODE_INFO_PREFIX, i32::from(role))
96 }
97}
98
99fn calculate_node_id(addr: &str) -> u64 {
101 let mut hasher = DefaultHasher::new();
102 addr.hash(&mut hasher);
103 hasher.finish()
104}
105
106#[derive(Debug, Serialize, Deserialize)]
108pub struct NodeInfo {
109 pub peer: Peer,
111 pub last_activity_ts: i64,
113 pub status: NodeStatus,
115 pub version: String,
117 pub git_commit: String,
119 pub start_time_ms: u64,
121 #[serde(default)]
123 pub total_cpu_millicores: i64,
124 #[serde(default)]
126 pub total_memory_bytes: i64,
127 #[serde(default)]
129 pub cpu_usage_millicores: i64,
130 #[serde(default)]
132 pub memory_usage_bytes: i64,
133 #[serde(default)]
135 pub hostname: String,
136}
137
138#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
139pub enum Role {
140 Datanode,
141 Frontend,
142 Flownode,
143 Metasrv,
144}
145
146#[derive(Debug, Serialize, Deserialize)]
147pub enum NodeStatus {
148 Datanode(DatanodeStatus),
149 Frontend(FrontendStatus),
150 Flownode(FlownodeStatus),
151 Metasrv(MetasrvStatus),
152 Standalone,
153}
154
155impl NodeStatus {
156 pub fn role_name(&self) -> &str {
158 match self {
159 NodeStatus::Datanode(_) => "DATANODE",
160 NodeStatus::Frontend(_) => "FRONTEND",
161 NodeStatus::Flownode(_) => "FLOWNODE",
162 NodeStatus::Metasrv(_) => "METASRV",
163 NodeStatus::Standalone => "STANDALONE",
164 }
165 }
166}
167
168#[derive(Debug, Serialize, Deserialize)]
170pub struct DatanodeStatus {
171 pub rcus: i64,
173 pub wcus: i64,
175 pub leader_regions: usize,
177 pub follower_regions: usize,
179 pub workloads: DatanodeWorkloads,
181}
182
183#[derive(Debug, Serialize, Deserialize)]
185pub struct FrontendStatus {}
186
187#[derive(Debug, Serialize, Deserialize)]
189pub struct FlownodeStatus {}
190
191#[derive(Debug, Serialize, Deserialize)]
193pub struct MetasrvStatus {
194 pub is_leader: bool,
195}
196
197impl FromStr for NodeInfoKey {
198 type Err = Error;
199
200 fn from_str(key: &str) -> Result<Self> {
201 let caps = CLUSTER_NODE_INFO_PREFIX_PATTERN
202 .captures(key)
203 .context(InvalidNodeInfoKeySnafu { key })?;
204 ensure!(caps.len() == 4, InvalidNodeInfoKeySnafu { key });
205
206 let role = caps[2].to_string();
207 let node_id = caps[3].to_string();
208 let role: i32 = role.parse().context(ParseNumSnafu {
209 err_msg: format!("invalid role {role}"),
210 })?;
211 let role = Role::try_from(role)?;
212 let node_id: u64 = node_id.parse().context(ParseNumSnafu {
213 err_msg: format!("invalid node_id: {node_id}"),
214 })?;
215
216 Ok(Self { role, node_id })
217 }
218}
219
220impl TryFrom<Vec<u8>> for NodeInfoKey {
221 type Error = Error;
222
223 fn try_from(bytes: Vec<u8>) -> Result<Self> {
224 String::from_utf8(bytes)
225 .context(FromUtf8Snafu {
226 name: "NodeInfoKey",
227 })
228 .map(|x| x.parse())?
229 }
230}
231
232impl From<&NodeInfoKey> for Vec<u8> {
233 fn from(key: &NodeInfoKey) -> Self {
234 format!(
235 "{}-0-{}-{}",
236 CLUSTER_NODE_INFO_PREFIX,
237 i32::from(key.role),
238 key.node_id
239 )
240 .into_bytes()
241 }
242}
243
244impl FromStr for NodeInfo {
245 type Err = Error;
246
247 fn from_str(value: &str) -> Result<Self> {
248 serde_json::from_str(value).context(DecodeJsonSnafu)
249 }
250}
251
252impl TryFrom<Vec<u8>> for NodeInfo {
253 type Error = Error;
254
255 fn try_from(bytes: Vec<u8>) -> Result<Self> {
256 String::from_utf8(bytes)
257 .context(FromUtf8Snafu { name: "NodeInfo" })
258 .map(|x| x.parse())?
259 }
260}
261
262impl TryFrom<NodeInfo> for Vec<u8> {
263 type Error = Error;
264
265 fn try_from(info: NodeInfo) -> Result<Self> {
266 Ok(serde_json::to_string(&info)
267 .context(EncodeJsonSnafu)?
268 .into_bytes())
269 }
270}
271
272impl From<Role> for i32 {
273 fn from(role: Role) -> Self {
274 match role {
275 Role::Datanode => 0,
276 Role::Frontend => 1,
277 Role::Flownode => 2,
278 Role::Metasrv => 99,
279 }
280 }
281}
282
283impl TryFrom<i32> for Role {
284 type Error = Error;
285
286 fn try_from(role: i32) -> Result<Self> {
287 match role {
288 0 => Ok(Self::Datanode),
289 1 => Ok(Self::Frontend),
290 2 => Ok(Self::Flownode),
291 99 => Ok(Self::Metasrv),
292 _ => InvalidRoleSnafu { role }.fail(),
293 }
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use std::assert_matches::assert_matches;
300
301 use common_workload::DatanodeWorkloadType;
302
303 use super::*;
304 use crate::cluster::Role::{Datanode, Frontend};
305 use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
306 use crate::peer::Peer;
307
308 #[test]
309 fn test_node_info_key_round_trip() {
310 let key = NodeInfoKey {
311 role: Datanode,
312 node_id: 2,
313 };
314
315 let key_bytes: Vec<u8> = (&key).into();
316 let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
317
318 assert_eq!(Datanode, new_key.role);
319 assert_eq!(2, new_key.node_id);
320 }
321
322 #[test]
323 fn test_node_info_round_trip() {
324 let node_info = NodeInfo {
325 peer: Peer {
326 id: 1,
327 addr: "127.0.0.1".to_string(),
328 },
329 last_activity_ts: 123,
330 status: NodeStatus::Datanode(DatanodeStatus {
331 rcus: 1,
332 wcus: 2,
333 leader_regions: 3,
334 follower_regions: 4,
335 workloads: DatanodeWorkloads {
336 types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
337 },
338 }),
339 version: "".to_string(),
340 git_commit: "".to_string(),
341 start_time_ms: 1,
342 total_cpu_millicores: 0,
343 total_memory_bytes: 0,
344 cpu_usage_millicores: 0,
345 memory_usage_bytes: 0,
346 hostname: "test_hostname".to_string(),
347 };
348
349 let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
350 let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
351
352 assert_matches!(
353 new_node_info,
354 NodeInfo {
355 peer: Peer { id: 1, .. },
356 last_activity_ts: 123,
357 status: NodeStatus::Datanode(DatanodeStatus {
358 rcus: 1,
359 wcus: 2,
360 leader_regions: 3,
361 follower_regions: 4,
362 ..
363 }),
364 start_time_ms: 1,
365 ..
366 }
367 );
368 }
369
370 #[test]
371 fn test_node_info_key_prefix() {
372 let prefix = NodeInfoKey::key_prefix();
373 assert_eq!(prefix, "__meta_cluster_node_info-0-");
374
375 let prefix = NodeInfoKey::key_prefix_with_role(Frontend);
376 assert_eq!(prefix, "__meta_cluster_node_info-0-1-");
377 }
378
379 #[test]
380 fn test_calculate_node_id_from_addr() {
381 assert_eq!(calculate_node_id(""), calculate_node_id(""));
383
384 let addr1 = "127.0.0.1:8080";
386 let id1 = calculate_node_id(addr1);
387 let id2 = calculate_node_id(addr1);
388 assert_eq!(id1, id2);
389
390 let addr2 = "127.0.0.1:8081";
392 let id3 = calculate_node_id(addr2);
393 assert_ne!(id1, id3);
394
395 let long_addr = "very.long.domain.name.example.com:9999";
397 let id4 = calculate_node_id(long_addr);
398 assert!(id4 > 0);
399 }
400}