1use std::fmt::{Display, Formatter};
16use std::hash::{DefaultHasher, Hash, Hasher};
17use std::str::FromStr;
18
19use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest};
20use common_error::ext::ErrorExt;
21use lazy_static::lazy_static;
22use regex::Regex;
23use serde::{Deserialize, Serialize};
24use snafu::{OptionExt, ResultExt, ensure};
25
26use crate::datanode::RegionStat;
27use crate::error::{
28 DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
29 InvalidRoleSnafu, ParseNumSnafu, Result,
30};
31use crate::key::flow::flow_state::FlowStat;
32use crate::peer::Peer;
33
34const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";
35
36lazy_static! {
37 static ref CLUSTER_NODE_INFO_PREFIX_PATTERN: Regex = Regex::new(&format!(
38 "^{CLUSTER_NODE_INFO_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
39 ))
40 .unwrap();
41}
42
43#[async_trait::async_trait]
45pub trait ClusterInfo {
46 type Error: ErrorExt;
47
48 async fn list_nodes(
50 &self,
51 role: Option<Role>,
52 ) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
53
54 async fn list_region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
56
57 async fn list_flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
59
60 }
62
63#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
65pub struct NodeInfoKey {
66 pub role: Role,
68 pub node_id: u64,
70}
71
72impl NodeInfoKey {
73 pub fn new(request: &HeartbeatRequest) -> Option<Self> {
76 let HeartbeatRequest { header, peer, .. } = request;
77 let header = header.as_ref()?;
78 let peer = peer.as_ref()?;
79
80 let role = header.role.try_into().ok()?;
81 let node_id = match role {
82 Role::Frontend => calculate_node_id(&peer.addr),
85 _ => peer.id,
86 };
87
88 Some(NodeInfoKey { role, node_id })
89 }
90
91 pub fn key_prefix() -> String {
92 format!("{}-0-", CLUSTER_NODE_INFO_PREFIX)
93 }
94
95 pub fn key_prefix_with_role(role: Role) -> String {
96 format!("{}-0-{}-", CLUSTER_NODE_INFO_PREFIX, i32::from(role))
97 }
98}
99
100fn calculate_node_id(addr: &str) -> u64 {
102 let mut hasher = DefaultHasher::new();
103 addr.hash(&mut hasher);
104 hasher.finish()
105}
106
107#[derive(Debug, Serialize, Deserialize)]
109pub struct NodeInfo {
110 pub peer: Peer,
112 pub last_activity_ts: i64,
114 pub status: NodeStatus,
116 pub version: String,
118 pub git_commit: String,
120 pub start_time_ms: u64,
122 #[serde(default)]
124 pub total_cpu_millicores: i64,
125 #[serde(default)]
127 pub total_memory_bytes: i64,
128 #[serde(default)]
130 pub cpu_usage_millicores: i64,
131 #[serde(default)]
133 pub memory_usage_bytes: i64,
134 #[serde(default)]
136 pub hostname: String,
137}
138
139#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
140pub enum Role {
141 Datanode,
142 Frontend,
143 Flownode,
144 Metasrv,
145}
146
147#[derive(Debug, Serialize, Deserialize)]
148pub enum NodeStatus {
149 Datanode(DatanodeStatus),
150 Frontend(FrontendStatus),
151 Flownode(FlownodeStatus),
152 Metasrv(MetasrvStatus),
153 Standalone,
154}
155
156impl NodeStatus {
157 pub fn role_name(&self) -> &str {
159 match self {
160 NodeStatus::Datanode(_) => "DATANODE",
161 NodeStatus::Frontend(_) => "FRONTEND",
162 NodeStatus::Flownode(_) => "FLOWNODE",
163 NodeStatus::Metasrv(_) => "METASRV",
164 NodeStatus::Standalone => "STANDALONE",
165 }
166 }
167}
168
169#[derive(Debug, Serialize, Deserialize)]
171pub struct DatanodeStatus {
172 pub rcus: i64,
174 pub wcus: i64,
176 pub leader_regions: usize,
178 pub follower_regions: usize,
180 pub workloads: DatanodeWorkloads,
182}
183
184#[derive(Debug, Serialize, Deserialize)]
186pub struct FrontendStatus {}
187
188#[derive(Debug, Serialize, Deserialize)]
190pub struct FlownodeStatus {}
191
192#[derive(Debug, Serialize, Deserialize)]
194pub struct MetasrvStatus {
195 pub is_leader: bool,
196}
197
198impl FromStr for NodeInfoKey {
199 type Err = Error;
200
201 fn from_str(key: &str) -> Result<Self> {
202 let caps = CLUSTER_NODE_INFO_PREFIX_PATTERN
203 .captures(key)
204 .context(InvalidNodeInfoKeySnafu { key })?;
205 ensure!(caps.len() == 4, InvalidNodeInfoKeySnafu { key });
206
207 let role = caps[2].to_string();
208 let node_id = caps[3].to_string();
209 let role: i32 = role.parse().context(ParseNumSnafu {
210 err_msg: format!("invalid role {role}"),
211 })?;
212 let role = Role::try_from(role)?;
213 let node_id: u64 = node_id.parse().context(ParseNumSnafu {
214 err_msg: format!("invalid node_id: {node_id}"),
215 })?;
216
217 Ok(Self { role, node_id })
218 }
219}
220
221impl TryFrom<Vec<u8>> for NodeInfoKey {
222 type Error = Error;
223
224 fn try_from(bytes: Vec<u8>) -> Result<Self> {
225 String::from_utf8(bytes)
226 .context(FromUtf8Snafu {
227 name: "NodeInfoKey",
228 })
229 .map(|x| x.parse())?
230 }
231}
232
233impl From<&NodeInfoKey> for Vec<u8> {
234 fn from(key: &NodeInfoKey) -> Self {
235 format!(
236 "{}-0-{}-{}",
237 CLUSTER_NODE_INFO_PREFIX,
238 i32::from(key.role),
239 key.node_id
240 )
241 .into_bytes()
242 }
243}
244
245impl Display for NodeInfoKey {
246 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
247 write!(f, "{:?}-{}", self.role, self.node_id)
248 }
249}
250
251impl FromStr for NodeInfo {
252 type Err = Error;
253
254 fn from_str(value: &str) -> Result<Self> {
255 serde_json::from_str(value).context(DecodeJsonSnafu)
256 }
257}
258
259impl TryFrom<Vec<u8>> for NodeInfo {
260 type Error = Error;
261
262 fn try_from(bytes: Vec<u8>) -> Result<Self> {
263 String::from_utf8(bytes)
264 .context(FromUtf8Snafu { name: "NodeInfo" })
265 .map(|x| x.parse())?
266 }
267}
268
269impl TryFrom<NodeInfo> for Vec<u8> {
270 type Error = Error;
271
272 fn try_from(info: NodeInfo) -> Result<Self> {
273 Ok(serde_json::to_string(&info)
274 .context(EncodeJsonSnafu)?
275 .into_bytes())
276 }
277}
278
279impl From<Role> for i32 {
280 fn from(role: Role) -> Self {
281 match role {
282 Role::Datanode => 0,
283 Role::Frontend => 1,
284 Role::Flownode => 2,
285 Role::Metasrv => 99,
286 }
287 }
288}
289
290impl TryFrom<i32> for Role {
291 type Error = Error;
292
293 fn try_from(role: i32) -> Result<Self> {
294 match role {
295 0 => Ok(Self::Datanode),
296 1 => Ok(Self::Frontend),
297 2 => Ok(Self::Flownode),
298 99 => Ok(Self::Metasrv),
299 _ => InvalidRoleSnafu { role }.fail(),
300 }
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use std::assert_matches::assert_matches;
307
308 use common_workload::DatanodeWorkloadType;
309
310 use super::*;
311 use crate::cluster::Role::{Datanode, Frontend};
312 use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
313 use crate::peer::Peer;
314
315 #[test]
316 fn test_node_info_key_round_trip() {
317 let key = NodeInfoKey {
318 role: Datanode,
319 node_id: 2,
320 };
321
322 let key_bytes: Vec<u8> = (&key).into();
323 let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
324
325 assert_eq!(Datanode, new_key.role);
326 assert_eq!(2, new_key.node_id);
327 }
328
329 #[test]
330 fn test_node_info_round_trip() {
331 let node_info = NodeInfo {
332 peer: Peer {
333 id: 1,
334 addr: "127.0.0.1".to_string(),
335 },
336 last_activity_ts: 123,
337 status: NodeStatus::Datanode(DatanodeStatus {
338 rcus: 1,
339 wcus: 2,
340 leader_regions: 3,
341 follower_regions: 4,
342 workloads: DatanodeWorkloads {
343 types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
344 },
345 }),
346 version: "".to_string(),
347 git_commit: "".to_string(),
348 start_time_ms: 1,
349 total_cpu_millicores: 0,
350 total_memory_bytes: 0,
351 cpu_usage_millicores: 0,
352 memory_usage_bytes: 0,
353 hostname: "test_hostname".to_string(),
354 };
355
356 let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
357 let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
358
359 assert_matches!(
360 new_node_info,
361 NodeInfo {
362 peer: Peer { id: 1, .. },
363 last_activity_ts: 123,
364 status: NodeStatus::Datanode(DatanodeStatus {
365 rcus: 1,
366 wcus: 2,
367 leader_regions: 3,
368 follower_regions: 4,
369 ..
370 }),
371 start_time_ms: 1,
372 ..
373 }
374 );
375 }
376
377 #[test]
378 fn test_node_info_key_prefix() {
379 let prefix = NodeInfoKey::key_prefix();
380 assert_eq!(prefix, "__meta_cluster_node_info-0-");
381
382 let prefix = NodeInfoKey::key_prefix_with_role(Frontend);
383 assert_eq!(prefix, "__meta_cluster_node_info-0-1-");
384 }
385
386 #[test]
387 fn test_calculate_node_id_from_addr() {
388 assert_eq!(calculate_node_id(""), calculate_node_id(""));
390
391 let addr1 = "127.0.0.1:8080";
393 let id1 = calculate_node_id(addr1);
394 let id2 = calculate_node_id(addr1);
395 assert_eq!(id1, id2);
396
397 let addr2 = "127.0.0.1:8081";
399 let id3 = calculate_node_id(addr2);
400 assert_ne!(id1, id3);
401
402 let long_addr = "very.long.domain.name.example.com:9999";
404 let id4 = calculate_node_id(long_addr);
405 assert!(id4 > 0);
406 }
407}