1use std::hash::{DefaultHasher, Hash, Hasher};
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest};
19use common_error::ext::ErrorExt;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{ensure, OptionExt, ResultExt};
24
25use crate::datanode::RegionStat;
26use crate::error::{
27 DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
28 InvalidRoleSnafu, ParseNumSnafu, Result,
29};
30use crate::key::flow::flow_state::FlowStat;
31use crate::peer::Peer;
32
33const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";
34
35lazy_static! {
36 static ref CLUSTER_NODE_INFO_PREFIX_PATTERN: Regex = Regex::new(&format!(
37 "^{CLUSTER_NODE_INFO_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
38 ))
39 .unwrap();
40}
41
42#[async_trait::async_trait]
44pub trait ClusterInfo {
45 type Error: ErrorExt;
46
47 async fn list_nodes(
49 &self,
50 role: Option<Role>,
51 ) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
52
53 async fn list_region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
55
56 async fn list_flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
58
59 }
61
62#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
64pub struct NodeInfoKey {
65 pub role: Role,
67 pub node_id: u64,
69}
70
71impl NodeInfoKey {
72 pub fn new(request: &HeartbeatRequest) -> Option<Self> {
75 let HeartbeatRequest { header, peer, .. } = request;
76 let header = header.as_ref()?;
77 let peer = peer.as_ref()?;
78
79 let role = header.role.try_into().ok()?;
80 let node_id = match role {
81 Role::Frontend => calculate_node_id(&peer.addr),
84 _ => peer.id,
85 };
86
87 Some(NodeInfoKey { role, node_id })
88 }
89
90 pub fn key_prefix() -> String {
91 format!("{}-0-", CLUSTER_NODE_INFO_PREFIX)
92 }
93
94 pub fn key_prefix_with_role(role: Role) -> String {
95 format!("{}-0-{}-", CLUSTER_NODE_INFO_PREFIX, i32::from(role))
96 }
97}
98
99fn calculate_node_id(addr: &str) -> u64 {
101 let mut hasher = DefaultHasher::new();
102 addr.hash(&mut hasher);
103 hasher.finish()
104}
105
106#[derive(Debug, Serialize, Deserialize)]
108pub struct NodeInfo {
109 pub peer: Peer,
111 pub last_activity_ts: i64,
113 pub status: NodeStatus,
115 pub version: String,
117 pub git_commit: String,
119 pub start_time_ms: u64,
121}
122
123#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
124pub enum Role {
125 Datanode,
126 Frontend,
127 Flownode,
128 Metasrv,
129}
130
131#[derive(Debug, Serialize, Deserialize)]
132pub enum NodeStatus {
133 Datanode(DatanodeStatus),
134 Frontend(FrontendStatus),
135 Flownode(FlownodeStatus),
136 Metasrv(MetasrvStatus),
137 Standalone,
138}
139
140impl NodeStatus {
141 pub fn role_name(&self) -> &str {
143 match self {
144 NodeStatus::Datanode(_) => "DATANODE",
145 NodeStatus::Frontend(_) => "FRONTEND",
146 NodeStatus::Flownode(_) => "FLOWNODE",
147 NodeStatus::Metasrv(_) => "METASRV",
148 NodeStatus::Standalone => "STANDALONE",
149 }
150 }
151}
152
153#[derive(Debug, Serialize, Deserialize)]
155pub struct DatanodeStatus {
156 pub rcus: i64,
158 pub wcus: i64,
160 pub leader_regions: usize,
162 pub follower_regions: usize,
164 pub workloads: DatanodeWorkloads,
166}
167
168#[derive(Debug, Serialize, Deserialize)]
170pub struct FrontendStatus {}
171
172#[derive(Debug, Serialize, Deserialize)]
174pub struct FlownodeStatus {}
175
176#[derive(Debug, Serialize, Deserialize)]
178pub struct MetasrvStatus {
179 pub is_leader: bool,
180}
181
182impl FromStr for NodeInfoKey {
183 type Err = Error;
184
185 fn from_str(key: &str) -> Result<Self> {
186 let caps = CLUSTER_NODE_INFO_PREFIX_PATTERN
187 .captures(key)
188 .context(InvalidNodeInfoKeySnafu { key })?;
189 ensure!(caps.len() == 4, InvalidNodeInfoKeySnafu { key });
190
191 let role = caps[2].to_string();
192 let node_id = caps[3].to_string();
193 let role: i32 = role.parse().context(ParseNumSnafu {
194 err_msg: format!("invalid role {role}"),
195 })?;
196 let role = Role::try_from(role)?;
197 let node_id: u64 = node_id.parse().context(ParseNumSnafu {
198 err_msg: format!("invalid node_id: {node_id}"),
199 })?;
200
201 Ok(Self { role, node_id })
202 }
203}
204
205impl TryFrom<Vec<u8>> for NodeInfoKey {
206 type Error = Error;
207
208 fn try_from(bytes: Vec<u8>) -> Result<Self> {
209 String::from_utf8(bytes)
210 .context(FromUtf8Snafu {
211 name: "NodeInfoKey",
212 })
213 .map(|x| x.parse())?
214 }
215}
216
217impl From<&NodeInfoKey> for Vec<u8> {
218 fn from(key: &NodeInfoKey) -> Self {
219 format!(
220 "{}-0-{}-{}",
221 CLUSTER_NODE_INFO_PREFIX,
222 i32::from(key.role),
223 key.node_id
224 )
225 .into_bytes()
226 }
227}
228
229impl FromStr for NodeInfo {
230 type Err = Error;
231
232 fn from_str(value: &str) -> Result<Self> {
233 serde_json::from_str(value).context(DecodeJsonSnafu)
234 }
235}
236
237impl TryFrom<Vec<u8>> for NodeInfo {
238 type Error = Error;
239
240 fn try_from(bytes: Vec<u8>) -> Result<Self> {
241 String::from_utf8(bytes)
242 .context(FromUtf8Snafu { name: "NodeInfo" })
243 .map(|x| x.parse())?
244 }
245}
246
247impl TryFrom<NodeInfo> for Vec<u8> {
248 type Error = Error;
249
250 fn try_from(info: NodeInfo) -> Result<Self> {
251 Ok(serde_json::to_string(&info)
252 .context(EncodeJsonSnafu)?
253 .into_bytes())
254 }
255}
256
257impl From<Role> for i32 {
258 fn from(role: Role) -> Self {
259 match role {
260 Role::Datanode => 0,
261 Role::Frontend => 1,
262 Role::Flownode => 2,
263 Role::Metasrv => 99,
264 }
265 }
266}
267
268impl TryFrom<i32> for Role {
269 type Error = Error;
270
271 fn try_from(role: i32) -> Result<Self> {
272 match role {
273 0 => Ok(Self::Datanode),
274 1 => Ok(Self::Frontend),
275 2 => Ok(Self::Flownode),
276 99 => Ok(Self::Metasrv),
277 _ => InvalidRoleSnafu { role }.fail(),
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use std::assert_matches::assert_matches;
285
286 use common_workload::DatanodeWorkloadType;
287
288 use super::*;
289 use crate::cluster::Role::{Datanode, Frontend};
290 use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
291 use crate::peer::Peer;
292
293 #[test]
294 fn test_node_info_key_round_trip() {
295 let key = NodeInfoKey {
296 role: Datanode,
297 node_id: 2,
298 };
299
300 let key_bytes: Vec<u8> = (&key).into();
301 let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
302
303 assert_eq!(Datanode, new_key.role);
304 assert_eq!(2, new_key.node_id);
305 }
306
307 #[test]
308 fn test_node_info_round_trip() {
309 let node_info = NodeInfo {
310 peer: Peer {
311 id: 1,
312 addr: "127.0.0.1".to_string(),
313 },
314 last_activity_ts: 123,
315 status: NodeStatus::Datanode(DatanodeStatus {
316 rcus: 1,
317 wcus: 2,
318 leader_regions: 3,
319 follower_regions: 4,
320 workloads: DatanodeWorkloads {
321 types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
322 },
323 }),
324 version: "".to_string(),
325 git_commit: "".to_string(),
326 start_time_ms: 1,
327 };
328
329 let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
330 let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
331
332 assert_matches!(
333 new_node_info,
334 NodeInfo {
335 peer: Peer { id: 1, .. },
336 last_activity_ts: 123,
337 status: NodeStatus::Datanode(DatanodeStatus {
338 rcus: 1,
339 wcus: 2,
340 leader_regions: 3,
341 follower_regions: 4,
342 ..
343 }),
344 start_time_ms: 1,
345 ..
346 }
347 );
348 }
349
350 #[test]
351 fn test_node_info_key_prefix() {
352 let prefix = NodeInfoKey::key_prefix();
353 assert_eq!(prefix, "__meta_cluster_node_info-0-");
354
355 let prefix = NodeInfoKey::key_prefix_with_role(Frontend);
356 assert_eq!(prefix, "__meta_cluster_node_info-0-1-");
357 }
358
359 #[test]
360 fn test_calculate_node_id_from_addr() {
361 assert_eq!(calculate_node_id(""), calculate_node_id(""));
363
364 let addr1 = "127.0.0.1:8080";
366 let id1 = calculate_node_id(addr1);
367 let id2 = calculate_node_id(addr1);
368 assert_eq!(id1, id2);
369
370 let addr2 = "127.0.0.1:8081";
372 let id3 = calculate_node_id(addr2);
373 assert_ne!(id1, id3);
374
375 let long_addr = "very.long.domain.name.example.com:9999";
377 let id4 = calculate_node_id(long_addr);
378 assert!(id4 > 0);
379 }
380}