Skip to main content

common_meta/
cluster.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::hash::{DefaultHasher, Hash, Hasher};
18use std::str::FromStr;
19
20use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, HeartbeatRequest};
21use common_error::ext::ErrorExt;
22use lazy_static::lazy_static;
23use regex::Regex;
24use serde::{Deserialize, Serialize};
25use snafu::{OptionExt, ResultExt, ensure};
26
27use crate::datanode::RegionStat;
28use crate::error::{
29    DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
30    InvalidRoleSnafu, ParseNumSnafu, Result,
31};
32use crate::key::flow::flow_state::FlowStat;
33use crate::peer::Peer;
34
35const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";
36
37lazy_static! {
38    static ref CLUSTER_NODE_INFO_PREFIX_PATTERN: Regex = Regex::new(&format!(
39        "^{CLUSTER_NODE_INFO_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
40    ))
41    .unwrap();
42}
43
44/// [ClusterInfo] provides information about the cluster.
45#[async_trait::async_trait]
46pub trait ClusterInfo {
47    type Error: ErrorExt;
48
49    /// List all nodes by role in the cluster. If `role` is `None`, list all nodes.
50    async fn list_nodes(
51        &self,
52        role: Option<Role>,
53    ) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
54
55    /// List all region stats in the cluster.
56    async fn list_region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
57
58    /// List all flow stats in the cluster.
59    async fn list_flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
60
61    // TODO(jeremy): Other info, like region status, etc.
62}
63
64/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-0-{role}-{node_id}`.
65#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
66pub struct NodeInfoKey {
67    /// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
68    pub role: Role,
69    /// The node id.
70    pub node_id: u64,
71}
72
73impl NodeInfoKey {
74    /// Try to create a `NodeInfoKey` from a "good" heartbeat request. "good" as in every needed
75    /// piece of information is provided and valid.  
76    pub fn new(request: &HeartbeatRequest) -> Option<Self> {
77        let HeartbeatRequest { header, peer, .. } = request;
78        let header = header.as_ref()?;
79        let peer = peer.as_ref()?;
80
81        let role = header.role.try_into().ok()?;
82        let node_id = match role {
83            // Because the Frontend is stateless, it's too easy to neglect choosing a unique id
84            // for it when setting up a cluster. So we calculate its id from its address.
85            Role::Frontend => calculate_node_id(&peer.addr),
86            _ => peer.id,
87        };
88
89        Some(NodeInfoKey { role, node_id })
90    }
91
92    pub fn key_prefix() -> String {
93        format!("{}-0-", CLUSTER_NODE_INFO_PREFIX)
94    }
95
96    pub fn key_prefix_with_role(role: Role) -> String {
97        format!("{}-0-{}-", CLUSTER_NODE_INFO_PREFIX, i32::from(role))
98    }
99}
100
101/// Calculate (by using the DefaultHasher) the node's id from its address.
102fn calculate_node_id(addr: &str) -> u64 {
103    let mut hasher = DefaultHasher::new();
104    addr.hash(&mut hasher);
105    hasher.finish()
106}
107
108/// The information of a node in the cluster.
109#[derive(Debug, Serialize, Deserialize)]
110pub struct NodeInfo {
111    /// The peer information. [node_id, address]
112    pub peer: Peer,
113    /// Last activity time in milliseconds.
114    pub last_activity_ts: i64,
115    /// The status of the node. Different roles have different node status.
116    pub status: NodeStatus,
117    // The node build version
118    pub version: String,
119    // The node build git commit hash
120    pub git_commit: String,
121    // The node star timestamp
122    pub start_time_ms: u64,
123    // The node build cpus
124    #[serde(default)]
125    pub total_cpu_millicores: i64,
126    // The node build memory bytes
127    #[serde(default)]
128    pub total_memory_bytes: i64,
129    // The node build cpu usage millicores
130    #[serde(default)]
131    pub cpu_usage_millicores: i64,
132    // The node build memory usage bytes
133    #[serde(default)]
134    pub memory_usage_bytes: i64,
135    // The node build hostname
136    #[serde(default)]
137    pub hostname: String,
138    /// Environment variables reported by the node.
139    #[serde(default)]
140    pub env_vars: HashMap<String, String>,
141}
142
143#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
144pub enum Role {
145    Datanode,
146    Frontend,
147    Flownode,
148    Metasrv,
149}
150
151#[derive(Debug, Serialize, Deserialize)]
152pub enum NodeStatus {
153    Datanode(DatanodeStatus),
154    Frontend(FrontendStatus),
155    Flownode(FlownodeStatus),
156    Metasrv(MetasrvStatus),
157    Standalone,
158}
159
160impl NodeStatus {
161    // Get the role name of the node status
162    pub fn role_name(&self) -> &str {
163        match self {
164            NodeStatus::Datanode(_) => "DATANODE",
165            NodeStatus::Frontend(_) => "FRONTEND",
166            NodeStatus::Flownode(_) => "FLOWNODE",
167            NodeStatus::Metasrv(_) => "METASRV",
168            NodeStatus::Standalone => "STANDALONE",
169        }
170    }
171}
172
173/// The status of a datanode.
174#[derive(Debug, Serialize, Deserialize)]
175pub struct DatanodeStatus {
176    /// The read capacity units during this period.
177    pub rcus: i64,
178    /// The write capacity units during this period.
179    pub wcus: i64,
180    /// How many leader regions on this node.
181    pub leader_regions: usize,
182    /// How many follower regions on this node.
183    pub follower_regions: usize,
184    /// The workloads of the datanode.
185    pub workloads: DatanodeWorkloads,
186}
187
188/// The status of a frontend.
189#[derive(Debug, Serialize, Deserialize)]
190pub struct FrontendStatus {}
191
192/// The status of a flownode.
193#[derive(Debug, Serialize, Deserialize)]
194pub struct FlownodeStatus {
195    /// The workloads of the flownode.
196    #[serde(default)]
197    pub workloads: FlownodeWorkloads,
198}
199
200/// The status of a metasrv.
201#[derive(Debug, Serialize, Deserialize)]
202pub struct MetasrvStatus {
203    pub is_leader: bool,
204}
205
206impl FromStr for NodeInfoKey {
207    type Err = Error;
208
209    fn from_str(key: &str) -> Result<Self> {
210        let caps = CLUSTER_NODE_INFO_PREFIX_PATTERN
211            .captures(key)
212            .context(InvalidNodeInfoKeySnafu { key })?;
213        ensure!(caps.len() == 4, InvalidNodeInfoKeySnafu { key });
214
215        let role = caps[2].to_string();
216        let node_id = caps[3].to_string();
217        let role: i32 = role.parse().context(ParseNumSnafu {
218            err_msg: format!("invalid role {role}"),
219        })?;
220        let role = Role::try_from(role)?;
221        let node_id: u64 = node_id.parse().context(ParseNumSnafu {
222            err_msg: format!("invalid node_id: {node_id}"),
223        })?;
224
225        Ok(Self { role, node_id })
226    }
227}
228
229impl TryFrom<Vec<u8>> for NodeInfoKey {
230    type Error = Error;
231
232    fn try_from(bytes: Vec<u8>) -> Result<Self> {
233        String::from_utf8(bytes)
234            .context(FromUtf8Snafu {
235                name: "NodeInfoKey",
236            })
237            .map(|x| x.parse())?
238    }
239}
240
241impl From<&NodeInfoKey> for Vec<u8> {
242    fn from(key: &NodeInfoKey) -> Self {
243        format!(
244            "{}-0-{}-{}",
245            CLUSTER_NODE_INFO_PREFIX,
246            i32::from(key.role),
247            key.node_id
248        )
249        .into_bytes()
250    }
251}
252
253impl Display for NodeInfoKey {
254    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
255        write!(f, "{:?}-{}", self.role, self.node_id)
256    }
257}
258
259impl FromStr for NodeInfo {
260    type Err = Error;
261
262    fn from_str(value: &str) -> Result<Self> {
263        serde_json::from_str(value).context(DecodeJsonSnafu)
264    }
265}
266
267impl TryFrom<Vec<u8>> for NodeInfo {
268    type Error = Error;
269
270    fn try_from(bytes: Vec<u8>) -> Result<Self> {
271        String::from_utf8(bytes)
272            .context(FromUtf8Snafu { name: "NodeInfo" })
273            .map(|x| x.parse())?
274    }
275}
276
277impl TryFrom<NodeInfo> for Vec<u8> {
278    type Error = Error;
279
280    fn try_from(info: NodeInfo) -> Result<Self> {
281        Ok(serde_json::to_string(&info)
282            .context(EncodeJsonSnafu)?
283            .into_bytes())
284    }
285}
286
287impl From<Role> for i32 {
288    fn from(role: Role) -> Self {
289        match role {
290            Role::Datanode => 0,
291            Role::Frontend => 1,
292            Role::Flownode => 2,
293            Role::Metasrv => 99,
294        }
295    }
296}
297
298impl TryFrom<i32> for Role {
299    type Error = Error;
300
301    fn try_from(role: i32) -> Result<Self> {
302        match role {
303            0 => Ok(Self::Datanode),
304            1 => Ok(Self::Frontend),
305            2 => Ok(Self::Flownode),
306            99 => Ok(Self::Metasrv),
307            _ => InvalidRoleSnafu { role }.fail(),
308        }
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use std::assert_matches;
315
316    use common_workload::DatanodeWorkloadType;
317
318    use super::*;
319    use crate::cluster::Role::{Datanode, Frontend};
320    use crate::cluster::{DatanodeStatus, FlownodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
321    use crate::peer::Peer;
322
323    #[test]
324    fn test_node_info_key_round_trip() {
325        let key = NodeInfoKey {
326            role: Datanode,
327            node_id: 2,
328        };
329
330        let key_bytes: Vec<u8> = (&key).into();
331        let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
332
333        assert_eq!(Datanode, new_key.role);
334        assert_eq!(2, new_key.node_id);
335    }
336
337    #[test]
338    fn test_node_info_round_trip() {
339        let node_info = NodeInfo {
340            peer: Peer {
341                id: 1,
342                addr: "127.0.0.1".to_string(),
343            },
344            last_activity_ts: 123,
345            status: NodeStatus::Datanode(DatanodeStatus {
346                rcus: 1,
347                wcus: 2,
348                leader_regions: 3,
349                follower_regions: 4,
350                workloads: DatanodeWorkloads {
351                    types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
352                },
353            }),
354            version: "".to_string(),
355            git_commit: "".to_string(),
356            start_time_ms: 1,
357            total_cpu_millicores: 0,
358            total_memory_bytes: 0,
359            cpu_usage_millicores: 0,
360            memory_usage_bytes: 0,
361            hostname: "test_hostname".to_string(),
362            env_vars: Default::default(),
363        };
364
365        let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
366        let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
367
368        assert_matches!(
369            new_node_info,
370            NodeInfo {
371                peer: Peer { id: 1, .. },
372                last_activity_ts: 123,
373                status: NodeStatus::Datanode(DatanodeStatus {
374                    rcus: 1,
375                    wcus: 2,
376                    leader_regions: 3,
377                    follower_regions: 4,
378                    ..
379                }),
380                start_time_ms: 1,
381                ..
382            }
383        );
384    }
385
386    #[test]
387    fn test_node_info_key_prefix() {
388        let prefix = NodeInfoKey::key_prefix();
389        assert_eq!(prefix, "__meta_cluster_node_info-0-");
390
391        let prefix = NodeInfoKey::key_prefix_with_role(Frontend);
392        assert_eq!(prefix, "__meta_cluster_node_info-0-1-");
393    }
394
395    #[test]
396    fn test_calculate_node_id_from_addr() {
397        // Test empty string
398        assert_eq!(calculate_node_id(""), calculate_node_id(""));
399
400        // Test same addresses return same ids
401        let addr1 = "127.0.0.1:8080";
402        let id1 = calculate_node_id(addr1);
403        let id2 = calculate_node_id(addr1);
404        assert_eq!(id1, id2);
405
406        // Test different addresses return different ids
407        let addr2 = "127.0.0.1:8081";
408        let id3 = calculate_node_id(addr2);
409        assert_ne!(id1, id3);
410
411        // Test long address
412        let long_addr = "very.long.domain.name.example.com:9999";
413        let id4 = calculate_node_id(long_addr);
414        assert!(id4 > 0);
415    }
416
417    #[test]
418    fn test_flownode_status_backward_compatible_without_workloads() {
419        let raw = r#"{
420            "peer":{"id":1,"addr":"127.0.0.1"},
421            "last_activity_ts":123,
422            "status":{"Flownode":{}},
423            "version":"",
424            "git_commit":"",
425            "start_time_ms":1,
426            "total_cpu_millicores":0,
427            "total_memory_bytes":0,
428            "cpu_usage_millicores":0,
429            "memory_usage_bytes":0,
430            "hostname":""
431        }"#;
432
433        let node_info: NodeInfo = raw.parse().unwrap();
434        assert_matches!(
435            node_info.status,
436            NodeStatus::Flownode(FlownodeStatus { workloads }) if workloads.types.is_empty()
437        );
438    }
439
440    #[test]
441    fn test_flownode_status_round_trip_with_workloads() {
442        let node_info = NodeInfo {
443            peer: Peer {
444                id: 1,
445                addr: "127.0.0.1".to_string(),
446            },
447            last_activity_ts: 123,
448            status: NodeStatus::Flownode(FlownodeStatus {
449                workloads: FlownodeWorkloads { types: vec![7] },
450            }),
451            version: "".to_string(),
452            git_commit: "".to_string(),
453            start_time_ms: 1,
454            total_cpu_millicores: 0,
455            total_memory_bytes: 0,
456            cpu_usage_millicores: 0,
457            memory_usage_bytes: 0,
458            hostname: "test_hostname".to_string(),
459            env_vars: Default::default(),
460        };
461
462        let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
463        let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
464
465        assert_matches!(
466            new_node_info,
467            NodeInfo {
468                status: NodeStatus::Flownode(FlownodeStatus { workloads }),
469                ..
470            } if workloads.types == vec![7]
471        );
472    }
473
474    #[test]
475    fn test_node_info_backward_compatible_without_env_vars() {
476        // Simulate a NodeInfo serialized before env_vars was added
477        let raw = r#"{
478            "peer":{"id":1,"addr":"127.0.0.1"},
479            "last_activity_ts":123,
480            "status":{"Datanode":{"rcus":0,"wcus":0,"leader_regions":0,"follower_regions":0,"workloads":{"types":[0]}}},
481            "version":"",
482            "git_commit":"",
483            "start_time_ms":1,
484            "total_cpu_millicores":0,
485            "total_memory_bytes":0,
486            "cpu_usage_millicores":0,
487            "memory_usage_bytes":0,
488            "hostname":"test"
489        }"#;
490
491        let node_info: NodeInfo = raw.parse().unwrap();
492        assert!(node_info.env_vars.is_empty());
493    }
494
495    #[test]
496    fn test_node_info_with_env_vars_round_trip() {
497        let mut env_vars = HashMap::new();
498        env_vars.insert("AZ".to_string(), "us-east-1a".to_string());
499
500        let node_info = NodeInfo {
501            peer: Peer {
502                id: 1,
503                addr: "127.0.0.1".to_string(),
504            },
505            last_activity_ts: 123,
506            status: NodeStatus::Datanode(DatanodeStatus {
507                rcus: 0,
508                wcus: 0,
509                leader_regions: 0,
510                follower_regions: 0,
511                workloads: DatanodeWorkloads { types: vec![] },
512            }),
513            version: "".to_string(),
514            git_commit: "".to_string(),
515            start_time_ms: 1,
516            total_cpu_millicores: 0,
517            total_memory_bytes: 0,
518            cpu_usage_millicores: 0,
519            memory_usage_bytes: 0,
520            hostname: "test".to_string(),
521            env_vars,
522        };
523
524        let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
525        let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
526        assert_eq!(new_node_info.env_vars.get("AZ").unwrap(), "us-east-1a");
527    }
528}