1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::hash::{DefaultHasher, Hash, Hasher};
18use std::str::FromStr;
19
20use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, HeartbeatRequest};
21use common_error::ext::ErrorExt;
22use lazy_static::lazy_static;
23use regex::Regex;
24use serde::{Deserialize, Serialize};
25use snafu::{OptionExt, ResultExt, ensure};
26
27use crate::datanode::RegionStat;
28use crate::error::{
29 DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
30 InvalidRoleSnafu, ParseNumSnafu, Result,
31};
32use crate::key::flow::flow_state::FlowStat;
33use crate::peer::Peer;
34
35const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";
36
37lazy_static! {
38 static ref CLUSTER_NODE_INFO_PREFIX_PATTERN: Regex = Regex::new(&format!(
39 "^{CLUSTER_NODE_INFO_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
40 ))
41 .unwrap();
42}
43
44#[async_trait::async_trait]
46pub trait ClusterInfo {
47 type Error: ErrorExt;
48
49 async fn list_nodes(
51 &self,
52 role: Option<Role>,
53 ) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
54
55 async fn list_region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
57
58 async fn list_flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
60
61 }
63
64#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
66pub struct NodeInfoKey {
67 pub role: Role,
69 pub node_id: u64,
71}
72
73impl NodeInfoKey {
74 pub fn new(request: &HeartbeatRequest) -> Option<Self> {
77 let HeartbeatRequest { header, peer, .. } = request;
78 let header = header.as_ref()?;
79 let peer = peer.as_ref()?;
80
81 let role = header.role.try_into().ok()?;
82 let node_id = match role {
83 Role::Frontend => calculate_node_id(&peer.addr),
86 _ => peer.id,
87 };
88
89 Some(NodeInfoKey { role, node_id })
90 }
91
92 pub fn key_prefix() -> String {
93 format!("{}-0-", CLUSTER_NODE_INFO_PREFIX)
94 }
95
96 pub fn key_prefix_with_role(role: Role) -> String {
97 format!("{}-0-{}-", CLUSTER_NODE_INFO_PREFIX, i32::from(role))
98 }
99}
100
101fn calculate_node_id(addr: &str) -> u64 {
103 let mut hasher = DefaultHasher::new();
104 addr.hash(&mut hasher);
105 hasher.finish()
106}
107
108#[derive(Debug, Serialize, Deserialize)]
110pub struct NodeInfo {
111 pub peer: Peer,
113 pub last_activity_ts: i64,
115 pub status: NodeStatus,
117 pub version: String,
119 pub git_commit: String,
121 pub start_time_ms: u64,
123 #[serde(default)]
125 pub total_cpu_millicores: i64,
126 #[serde(default)]
128 pub total_memory_bytes: i64,
129 #[serde(default)]
131 pub cpu_usage_millicores: i64,
132 #[serde(default)]
134 pub memory_usage_bytes: i64,
135 #[serde(default)]
137 pub hostname: String,
138 #[serde(default)]
140 pub env_vars: HashMap<String, String>,
141}
142
143#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
144pub enum Role {
145 Datanode,
146 Frontend,
147 Flownode,
148 Metasrv,
149}
150
151#[derive(Debug, Serialize, Deserialize)]
152pub enum NodeStatus {
153 Datanode(DatanodeStatus),
154 Frontend(FrontendStatus),
155 Flownode(FlownodeStatus),
156 Metasrv(MetasrvStatus),
157 Standalone,
158}
159
160impl NodeStatus {
161 pub fn role_name(&self) -> &str {
163 match self {
164 NodeStatus::Datanode(_) => "DATANODE",
165 NodeStatus::Frontend(_) => "FRONTEND",
166 NodeStatus::Flownode(_) => "FLOWNODE",
167 NodeStatus::Metasrv(_) => "METASRV",
168 NodeStatus::Standalone => "STANDALONE",
169 }
170 }
171}
172
173#[derive(Debug, Serialize, Deserialize)]
175pub struct DatanodeStatus {
176 pub rcus: i64,
178 pub wcus: i64,
180 pub leader_regions: usize,
182 pub follower_regions: usize,
184 pub workloads: DatanodeWorkloads,
186}
187
188#[derive(Debug, Serialize, Deserialize)]
190pub struct FrontendStatus {}
191
192#[derive(Debug, Serialize, Deserialize)]
194pub struct FlownodeStatus {
195 #[serde(default)]
197 pub workloads: FlownodeWorkloads,
198}
199
200#[derive(Debug, Serialize, Deserialize)]
202pub struct MetasrvStatus {
203 pub is_leader: bool,
204}
205
206impl FromStr for NodeInfoKey {
207 type Err = Error;
208
209 fn from_str(key: &str) -> Result<Self> {
210 let caps = CLUSTER_NODE_INFO_PREFIX_PATTERN
211 .captures(key)
212 .context(InvalidNodeInfoKeySnafu { key })?;
213 ensure!(caps.len() == 4, InvalidNodeInfoKeySnafu { key });
214
215 let role = caps[2].to_string();
216 let node_id = caps[3].to_string();
217 let role: i32 = role.parse().context(ParseNumSnafu {
218 err_msg: format!("invalid role {role}"),
219 })?;
220 let role = Role::try_from(role)?;
221 let node_id: u64 = node_id.parse().context(ParseNumSnafu {
222 err_msg: format!("invalid node_id: {node_id}"),
223 })?;
224
225 Ok(Self { role, node_id })
226 }
227}
228
229impl TryFrom<Vec<u8>> for NodeInfoKey {
230 type Error = Error;
231
232 fn try_from(bytes: Vec<u8>) -> Result<Self> {
233 String::from_utf8(bytes)
234 .context(FromUtf8Snafu {
235 name: "NodeInfoKey",
236 })
237 .map(|x| x.parse())?
238 }
239}
240
241impl From<&NodeInfoKey> for Vec<u8> {
242 fn from(key: &NodeInfoKey) -> Self {
243 format!(
244 "{}-0-{}-{}",
245 CLUSTER_NODE_INFO_PREFIX,
246 i32::from(key.role),
247 key.node_id
248 )
249 .into_bytes()
250 }
251}
252
253impl Display for NodeInfoKey {
254 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
255 write!(f, "{:?}-{}", self.role, self.node_id)
256 }
257}
258
259impl FromStr for NodeInfo {
260 type Err = Error;
261
262 fn from_str(value: &str) -> Result<Self> {
263 serde_json::from_str(value).context(DecodeJsonSnafu)
264 }
265}
266
267impl TryFrom<Vec<u8>> for NodeInfo {
268 type Error = Error;
269
270 fn try_from(bytes: Vec<u8>) -> Result<Self> {
271 String::from_utf8(bytes)
272 .context(FromUtf8Snafu { name: "NodeInfo" })
273 .map(|x| x.parse())?
274 }
275}
276
277impl TryFrom<NodeInfo> for Vec<u8> {
278 type Error = Error;
279
280 fn try_from(info: NodeInfo) -> Result<Self> {
281 Ok(serde_json::to_string(&info)
282 .context(EncodeJsonSnafu)?
283 .into_bytes())
284 }
285}
286
287impl From<Role> for i32 {
288 fn from(role: Role) -> Self {
289 match role {
290 Role::Datanode => 0,
291 Role::Frontend => 1,
292 Role::Flownode => 2,
293 Role::Metasrv => 99,
294 }
295 }
296}
297
298impl TryFrom<i32> for Role {
299 type Error = Error;
300
301 fn try_from(role: i32) -> Result<Self> {
302 match role {
303 0 => Ok(Self::Datanode),
304 1 => Ok(Self::Frontend),
305 2 => Ok(Self::Flownode),
306 99 => Ok(Self::Metasrv),
307 _ => InvalidRoleSnafu { role }.fail(),
308 }
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use std::assert_matches;
315
316 use common_workload::DatanodeWorkloadType;
317
318 use super::*;
319 use crate::cluster::Role::{Datanode, Frontend};
320 use crate::cluster::{DatanodeStatus, FlownodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
321 use crate::peer::Peer;
322
323 #[test]
324 fn test_node_info_key_round_trip() {
325 let key = NodeInfoKey {
326 role: Datanode,
327 node_id: 2,
328 };
329
330 let key_bytes: Vec<u8> = (&key).into();
331 let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
332
333 assert_eq!(Datanode, new_key.role);
334 assert_eq!(2, new_key.node_id);
335 }
336
337 #[test]
338 fn test_node_info_round_trip() {
339 let node_info = NodeInfo {
340 peer: Peer {
341 id: 1,
342 addr: "127.0.0.1".to_string(),
343 },
344 last_activity_ts: 123,
345 status: NodeStatus::Datanode(DatanodeStatus {
346 rcus: 1,
347 wcus: 2,
348 leader_regions: 3,
349 follower_regions: 4,
350 workloads: DatanodeWorkloads {
351 types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
352 },
353 }),
354 version: "".to_string(),
355 git_commit: "".to_string(),
356 start_time_ms: 1,
357 total_cpu_millicores: 0,
358 total_memory_bytes: 0,
359 cpu_usage_millicores: 0,
360 memory_usage_bytes: 0,
361 hostname: "test_hostname".to_string(),
362 env_vars: Default::default(),
363 };
364
365 let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
366 let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
367
368 assert_matches!(
369 new_node_info,
370 NodeInfo {
371 peer: Peer { id: 1, .. },
372 last_activity_ts: 123,
373 status: NodeStatus::Datanode(DatanodeStatus {
374 rcus: 1,
375 wcus: 2,
376 leader_regions: 3,
377 follower_regions: 4,
378 ..
379 }),
380 start_time_ms: 1,
381 ..
382 }
383 );
384 }
385
386 #[test]
387 fn test_node_info_key_prefix() {
388 let prefix = NodeInfoKey::key_prefix();
389 assert_eq!(prefix, "__meta_cluster_node_info-0-");
390
391 let prefix = NodeInfoKey::key_prefix_with_role(Frontend);
392 assert_eq!(prefix, "__meta_cluster_node_info-0-1-");
393 }
394
395 #[test]
396 fn test_calculate_node_id_from_addr() {
397 assert_eq!(calculate_node_id(""), calculate_node_id(""));
399
400 let addr1 = "127.0.0.1:8080";
402 let id1 = calculate_node_id(addr1);
403 let id2 = calculate_node_id(addr1);
404 assert_eq!(id1, id2);
405
406 let addr2 = "127.0.0.1:8081";
408 let id3 = calculate_node_id(addr2);
409 assert_ne!(id1, id3);
410
411 let long_addr = "very.long.domain.name.example.com:9999";
413 let id4 = calculate_node_id(long_addr);
414 assert!(id4 > 0);
415 }
416
417 #[test]
418 fn test_flownode_status_backward_compatible_without_workloads() {
419 let raw = r#"{
420 "peer":{"id":1,"addr":"127.0.0.1"},
421 "last_activity_ts":123,
422 "status":{"Flownode":{}},
423 "version":"",
424 "git_commit":"",
425 "start_time_ms":1,
426 "total_cpu_millicores":0,
427 "total_memory_bytes":0,
428 "cpu_usage_millicores":0,
429 "memory_usage_bytes":0,
430 "hostname":""
431 }"#;
432
433 let node_info: NodeInfo = raw.parse().unwrap();
434 assert_matches!(
435 node_info.status,
436 NodeStatus::Flownode(FlownodeStatus { workloads }) if workloads.types.is_empty()
437 );
438 }
439
440 #[test]
441 fn test_flownode_status_round_trip_with_workloads() {
442 let node_info = NodeInfo {
443 peer: Peer {
444 id: 1,
445 addr: "127.0.0.1".to_string(),
446 },
447 last_activity_ts: 123,
448 status: NodeStatus::Flownode(FlownodeStatus {
449 workloads: FlownodeWorkloads { types: vec![7] },
450 }),
451 version: "".to_string(),
452 git_commit: "".to_string(),
453 start_time_ms: 1,
454 total_cpu_millicores: 0,
455 total_memory_bytes: 0,
456 cpu_usage_millicores: 0,
457 memory_usage_bytes: 0,
458 hostname: "test_hostname".to_string(),
459 env_vars: Default::default(),
460 };
461
462 let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
463 let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
464
465 assert_matches!(
466 new_node_info,
467 NodeInfo {
468 status: NodeStatus::Flownode(FlownodeStatus { workloads }),
469 ..
470 } if workloads.types == vec![7]
471 );
472 }
473
474 #[test]
475 fn test_node_info_backward_compatible_without_env_vars() {
476 let raw = r#"{
478 "peer":{"id":1,"addr":"127.0.0.1"},
479 "last_activity_ts":123,
480 "status":{"Datanode":{"rcus":0,"wcus":0,"leader_regions":0,"follower_regions":0,"workloads":{"types":[0]}}},
481 "version":"",
482 "git_commit":"",
483 "start_time_ms":1,
484 "total_cpu_millicores":0,
485 "total_memory_bytes":0,
486 "cpu_usage_millicores":0,
487 "memory_usage_bytes":0,
488 "hostname":"test"
489 }"#;
490
491 let node_info: NodeInfo = raw.parse().unwrap();
492 assert!(node_info.env_vars.is_empty());
493 }
494
495 #[test]
496 fn test_node_info_with_env_vars_round_trip() {
497 let mut env_vars = HashMap::new();
498 env_vars.insert("AZ".to_string(), "us-east-1a".to_string());
499
500 let node_info = NodeInfo {
501 peer: Peer {
502 id: 1,
503 addr: "127.0.0.1".to_string(),
504 },
505 last_activity_ts: 123,
506 status: NodeStatus::Datanode(DatanodeStatus {
507 rcus: 0,
508 wcus: 0,
509 leader_regions: 0,
510 follower_regions: 0,
511 workloads: DatanodeWorkloads { types: vec![] },
512 }),
513 version: "".to_string(),
514 git_commit: "".to_string(),
515 start_time_ms: 1,
516 total_cpu_millicores: 0,
517 total_memory_bytes: 0,
518 cpu_usage_millicores: 0,
519 memory_usage_bytes: 0,
520 hostname: "test".to_string(),
521 env_vars,
522 };
523
524 let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
525 let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
526 assert_eq!(new_node_info.env_vars.get("AZ").unwrap(), "us-east-1a");
527 }
528}