1use std::hash::{DefaultHasher, Hash, Hasher};
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest};
19use common_error::ext::ErrorExt;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt, ensure};
24
25use crate::datanode::RegionStat;
26use crate::error::{
27 DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
28 InvalidRoleSnafu, ParseNumSnafu, Result,
29};
30use crate::key::flow::flow_state::FlowStat;
31use crate::peer::Peer;
32
33const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";
34
35lazy_static! {
36 static ref CLUSTER_NODE_INFO_PREFIX_PATTERN: Regex = Regex::new(&format!(
37 "^{CLUSTER_NODE_INFO_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
38 ))
39 .unwrap();
40}
41
42#[async_trait::async_trait]
44pub trait ClusterInfo {
45 type Error: ErrorExt;
46
47 async fn list_nodes(
49 &self,
50 role: Option<Role>,
51 ) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
52
53 async fn list_region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
55
56 async fn list_flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
58
59 }
61
62#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
64pub struct NodeInfoKey {
65 pub role: Role,
67 pub node_id: u64,
69}
70
71impl NodeInfoKey {
72 pub fn new(request: &HeartbeatRequest) -> Option<Self> {
75 let HeartbeatRequest { header, peer, .. } = request;
76 let header = header.as_ref()?;
77 let peer = peer.as_ref()?;
78
79 let role = header.role.try_into().ok()?;
80 let node_id = match role {
81 Role::Frontend => calculate_node_id(&peer.addr),
84 _ => peer.id,
85 };
86
87 Some(NodeInfoKey { role, node_id })
88 }
89
90 pub fn key_prefix() -> String {
91 format!("{}-0-", CLUSTER_NODE_INFO_PREFIX)
92 }
93
94 pub fn key_prefix_with_role(role: Role) -> String {
95 format!("{}-0-{}-", CLUSTER_NODE_INFO_PREFIX, i32::from(role))
96 }
97}
98
99fn calculate_node_id(addr: &str) -> u64 {
101 let mut hasher = DefaultHasher::new();
102 addr.hash(&mut hasher);
103 hasher.finish()
104}
105
106#[derive(Debug, Serialize, Deserialize)]
108pub struct NodeInfo {
109 pub peer: Peer,
111 pub last_activity_ts: i64,
113 pub status: NodeStatus,
115 pub version: String,
117 pub git_commit: String,
119 pub start_time_ms: u64,
121 #[serde(default)]
123 pub cpus: u32,
124 #[serde(default)]
126 pub memory_bytes: u64,
127}
128
129#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
130pub enum Role {
131 Datanode,
132 Frontend,
133 Flownode,
134 Metasrv,
135}
136
137#[derive(Debug, Serialize, Deserialize)]
138pub enum NodeStatus {
139 Datanode(DatanodeStatus),
140 Frontend(FrontendStatus),
141 Flownode(FlownodeStatus),
142 Metasrv(MetasrvStatus),
143 Standalone,
144}
145
146impl NodeStatus {
147 pub fn role_name(&self) -> &str {
149 match self {
150 NodeStatus::Datanode(_) => "DATANODE",
151 NodeStatus::Frontend(_) => "FRONTEND",
152 NodeStatus::Flownode(_) => "FLOWNODE",
153 NodeStatus::Metasrv(_) => "METASRV",
154 NodeStatus::Standalone => "STANDALONE",
155 }
156 }
157}
158
159#[derive(Debug, Serialize, Deserialize)]
161pub struct DatanodeStatus {
162 pub rcus: i64,
164 pub wcus: i64,
166 pub leader_regions: usize,
168 pub follower_regions: usize,
170 pub workloads: DatanodeWorkloads,
172}
173
174#[derive(Debug, Serialize, Deserialize)]
176pub struct FrontendStatus {}
177
178#[derive(Debug, Serialize, Deserialize)]
180pub struct FlownodeStatus {}
181
182#[derive(Debug, Serialize, Deserialize)]
184pub struct MetasrvStatus {
185 pub is_leader: bool,
186}
187
188impl FromStr for NodeInfoKey {
189 type Err = Error;
190
191 fn from_str(key: &str) -> Result<Self> {
192 let caps = CLUSTER_NODE_INFO_PREFIX_PATTERN
193 .captures(key)
194 .context(InvalidNodeInfoKeySnafu { key })?;
195 ensure!(caps.len() == 4, InvalidNodeInfoKeySnafu { key });
196
197 let role = caps[2].to_string();
198 let node_id = caps[3].to_string();
199 let role: i32 = role.parse().context(ParseNumSnafu {
200 err_msg: format!("invalid role {role}"),
201 })?;
202 let role = Role::try_from(role)?;
203 let node_id: u64 = node_id.parse().context(ParseNumSnafu {
204 err_msg: format!("invalid node_id: {node_id}"),
205 })?;
206
207 Ok(Self { role, node_id })
208 }
209}
210
211impl TryFrom<Vec<u8>> for NodeInfoKey {
212 type Error = Error;
213
214 fn try_from(bytes: Vec<u8>) -> Result<Self> {
215 String::from_utf8(bytes)
216 .context(FromUtf8Snafu {
217 name: "NodeInfoKey",
218 })
219 .map(|x| x.parse())?
220 }
221}
222
223impl From<&NodeInfoKey> for Vec<u8> {
224 fn from(key: &NodeInfoKey) -> Self {
225 format!(
226 "{}-0-{}-{}",
227 CLUSTER_NODE_INFO_PREFIX,
228 i32::from(key.role),
229 key.node_id
230 )
231 .into_bytes()
232 }
233}
234
235impl FromStr for NodeInfo {
236 type Err = Error;
237
238 fn from_str(value: &str) -> Result<Self> {
239 serde_json::from_str(value).context(DecodeJsonSnafu)
240 }
241}
242
243impl TryFrom<Vec<u8>> for NodeInfo {
244 type Error = Error;
245
246 fn try_from(bytes: Vec<u8>) -> Result<Self> {
247 String::from_utf8(bytes)
248 .context(FromUtf8Snafu { name: "NodeInfo" })
249 .map(|x| x.parse())?
250 }
251}
252
253impl TryFrom<NodeInfo> for Vec<u8> {
254 type Error = Error;
255
256 fn try_from(info: NodeInfo) -> Result<Self> {
257 Ok(serde_json::to_string(&info)
258 .context(EncodeJsonSnafu)?
259 .into_bytes())
260 }
261}
262
263impl From<Role> for i32 {
264 fn from(role: Role) -> Self {
265 match role {
266 Role::Datanode => 0,
267 Role::Frontend => 1,
268 Role::Flownode => 2,
269 Role::Metasrv => 99,
270 }
271 }
272}
273
274impl TryFrom<i32> for Role {
275 type Error = Error;
276
277 fn try_from(role: i32) -> Result<Self> {
278 match role {
279 0 => Ok(Self::Datanode),
280 1 => Ok(Self::Frontend),
281 2 => Ok(Self::Flownode),
282 99 => Ok(Self::Metasrv),
283 _ => InvalidRoleSnafu { role }.fail(),
284 }
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use std::assert_matches::assert_matches;
291
292 use common_workload::DatanodeWorkloadType;
293
294 use super::*;
295 use crate::cluster::Role::{Datanode, Frontend};
296 use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
297 use crate::peer::Peer;
298
299 #[test]
300 fn test_node_info_key_round_trip() {
301 let key = NodeInfoKey {
302 role: Datanode,
303 node_id: 2,
304 };
305
306 let key_bytes: Vec<u8> = (&key).into();
307 let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
308
309 assert_eq!(Datanode, new_key.role);
310 assert_eq!(2, new_key.node_id);
311 }
312
313 #[test]
314 fn test_node_info_round_trip() {
315 let node_info = NodeInfo {
316 peer: Peer {
317 id: 1,
318 addr: "127.0.0.1".to_string(),
319 },
320 last_activity_ts: 123,
321 status: NodeStatus::Datanode(DatanodeStatus {
322 rcus: 1,
323 wcus: 2,
324 leader_regions: 3,
325 follower_regions: 4,
326 workloads: DatanodeWorkloads {
327 types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
328 },
329 }),
330 version: "".to_string(),
331 git_commit: "".to_string(),
332 start_time_ms: 1,
333 cpus: 0,
334 memory_bytes: 0,
335 };
336
337 let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
338 let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
339
340 assert_matches!(
341 new_node_info,
342 NodeInfo {
343 peer: Peer { id: 1, .. },
344 last_activity_ts: 123,
345 status: NodeStatus::Datanode(DatanodeStatus {
346 rcus: 1,
347 wcus: 2,
348 leader_regions: 3,
349 follower_regions: 4,
350 ..
351 }),
352 start_time_ms: 1,
353 ..
354 }
355 );
356 }
357
358 #[test]
359 fn test_node_info_key_prefix() {
360 let prefix = NodeInfoKey::key_prefix();
361 assert_eq!(prefix, "__meta_cluster_node_info-0-");
362
363 let prefix = NodeInfoKey::key_prefix_with_role(Frontend);
364 assert_eq!(prefix, "__meta_cluster_node_info-0-1-");
365 }
366
367 #[test]
368 fn test_calculate_node_id_from_addr() {
369 assert_eq!(calculate_node_id(""), calculate_node_id(""));
371
372 let addr1 = "127.0.0.1:8080";
374 let id1 = calculate_node_id(addr1);
375 let id2 = calculate_node_id(addr1);
376 assert_eq!(id1, id2);
377
378 let addr2 = "127.0.0.1:8081";
380 let id3 = calculate_node_id(addr2);
381 assert_ne!(id1, id3);
382
383 let long_addr = "very.long.domain.name.example.com:9999";
385 let id4 = calculate_node_id(long_addr);
386 assert!(id4 > 0);
387 }
388}