common_meta/
cluster.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::{DefaultHasher, Hash, Hasher};
16use std::str::FromStr;
17
18use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest};
19use common_error::ext::ErrorExt;
20use lazy_static::lazy_static;
21use regex::Regex;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt, ensure};
24
25use crate::datanode::RegionStat;
26use crate::error::{
27    DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
28    InvalidRoleSnafu, ParseNumSnafu, Result,
29};
30use crate::key::flow::flow_state::FlowStat;
31use crate::peer::Peer;
32
33const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";
34
35lazy_static! {
36    static ref CLUSTER_NODE_INFO_PREFIX_PATTERN: Regex = Regex::new(&format!(
37        "^{CLUSTER_NODE_INFO_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
38    ))
39    .unwrap();
40}
41
42/// [ClusterInfo] provides information about the cluster.
43#[async_trait::async_trait]
44pub trait ClusterInfo {
45    type Error: ErrorExt;
46
47    /// List all nodes by role in the cluster. If `role` is `None`, list all nodes.
48    async fn list_nodes(
49        &self,
50        role: Option<Role>,
51    ) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
52
53    /// List all region stats in the cluster.
54    async fn list_region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
55
56    /// List all flow stats in the cluster.
57    async fn list_flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
58
59    // TODO(jeremy): Other info, like region status, etc.
60}
61
62/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-0-{role}-{node_id}`.
63#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
64pub struct NodeInfoKey {
65    /// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
66    pub role: Role,
67    /// The node id.
68    pub node_id: u64,
69}
70
71impl NodeInfoKey {
72    /// Try to create a `NodeInfoKey` from a "good" heartbeat request. "good" as in every needed
73    /// piece of information is provided and valid.  
74    pub fn new(request: &HeartbeatRequest) -> Option<Self> {
75        let HeartbeatRequest { header, peer, .. } = request;
76        let header = header.as_ref()?;
77        let peer = peer.as_ref()?;
78
79        let role = header.role.try_into().ok()?;
80        let node_id = match role {
81            // Because the Frontend is stateless, it's too easy to neglect choosing a unique id
82            // for it when setting up a cluster. So we calculate its id from its address.
83            Role::Frontend => calculate_node_id(&peer.addr),
84            _ => peer.id,
85        };
86
87        Some(NodeInfoKey { role, node_id })
88    }
89
90    pub fn key_prefix() -> String {
91        format!("{}-0-", CLUSTER_NODE_INFO_PREFIX)
92    }
93
94    pub fn key_prefix_with_role(role: Role) -> String {
95        format!("{}-0-{}-", CLUSTER_NODE_INFO_PREFIX, i32::from(role))
96    }
97}
98
99/// Calculate (by using the DefaultHasher) the node's id from its address.
100fn calculate_node_id(addr: &str) -> u64 {
101    let mut hasher = DefaultHasher::new();
102    addr.hash(&mut hasher);
103    hasher.finish()
104}
105
106/// The information of a node in the cluster.
107#[derive(Debug, Serialize, Deserialize)]
108pub struct NodeInfo {
109    /// The peer information. [node_id, address]
110    pub peer: Peer,
111    /// Last activity time in milliseconds.
112    pub last_activity_ts: i64,
113    /// The status of the node. Different roles have different node status.
114    pub status: NodeStatus,
115    // The node build version
116    pub version: String,
117    // The node build git commit hash
118    pub git_commit: String,
119    // The node star timestamp
120    pub start_time_ms: u64,
121    // The node build cpus
122    #[serde(default)]
123    pub total_cpu_millicores: i64,
124    // The node build memory bytes
125    #[serde(default)]
126    pub total_memory_bytes: i64,
127    // The node build cpu usage millicores
128    #[serde(default)]
129    pub cpu_usage_millicores: i64,
130    // The node build memory usage bytes
131    #[serde(default)]
132    pub memory_usage_bytes: i64,
133    // The node build hostname
134    #[serde(default)]
135    pub hostname: String,
136}
137
138#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
139pub enum Role {
140    Datanode,
141    Frontend,
142    Flownode,
143    Metasrv,
144}
145
146#[derive(Debug, Serialize, Deserialize)]
147pub enum NodeStatus {
148    Datanode(DatanodeStatus),
149    Frontend(FrontendStatus),
150    Flownode(FlownodeStatus),
151    Metasrv(MetasrvStatus),
152    Standalone,
153}
154
155impl NodeStatus {
156    // Get the role name of the node status
157    pub fn role_name(&self) -> &str {
158        match self {
159            NodeStatus::Datanode(_) => "DATANODE",
160            NodeStatus::Frontend(_) => "FRONTEND",
161            NodeStatus::Flownode(_) => "FLOWNODE",
162            NodeStatus::Metasrv(_) => "METASRV",
163            NodeStatus::Standalone => "STANDALONE",
164        }
165    }
166}
167
168/// The status of a datanode.
169#[derive(Debug, Serialize, Deserialize)]
170pub struct DatanodeStatus {
171    /// The read capacity units during this period.
172    pub rcus: i64,
173    /// The write capacity units during this period.
174    pub wcus: i64,
175    /// How many leader regions on this node.
176    pub leader_regions: usize,
177    /// How many follower regions on this node.
178    pub follower_regions: usize,
179    /// The workloads of the datanode.
180    pub workloads: DatanodeWorkloads,
181}
182
183/// The status of a frontend.
184#[derive(Debug, Serialize, Deserialize)]
185pub struct FrontendStatus {}
186
187/// The status of a flownode.
188#[derive(Debug, Serialize, Deserialize)]
189pub struct FlownodeStatus {}
190
191/// The status of a metasrv.
192#[derive(Debug, Serialize, Deserialize)]
193pub struct MetasrvStatus {
194    pub is_leader: bool,
195}
196
197impl FromStr for NodeInfoKey {
198    type Err = Error;
199
200    fn from_str(key: &str) -> Result<Self> {
201        let caps = CLUSTER_NODE_INFO_PREFIX_PATTERN
202            .captures(key)
203            .context(InvalidNodeInfoKeySnafu { key })?;
204        ensure!(caps.len() == 4, InvalidNodeInfoKeySnafu { key });
205
206        let role = caps[2].to_string();
207        let node_id = caps[3].to_string();
208        let role: i32 = role.parse().context(ParseNumSnafu {
209            err_msg: format!("invalid role {role}"),
210        })?;
211        let role = Role::try_from(role)?;
212        let node_id: u64 = node_id.parse().context(ParseNumSnafu {
213            err_msg: format!("invalid node_id: {node_id}"),
214        })?;
215
216        Ok(Self { role, node_id })
217    }
218}
219
220impl TryFrom<Vec<u8>> for NodeInfoKey {
221    type Error = Error;
222
223    fn try_from(bytes: Vec<u8>) -> Result<Self> {
224        String::from_utf8(bytes)
225            .context(FromUtf8Snafu {
226                name: "NodeInfoKey",
227            })
228            .map(|x| x.parse())?
229    }
230}
231
232impl From<&NodeInfoKey> for Vec<u8> {
233    fn from(key: &NodeInfoKey) -> Self {
234        format!(
235            "{}-0-{}-{}",
236            CLUSTER_NODE_INFO_PREFIX,
237            i32::from(key.role),
238            key.node_id
239        )
240        .into_bytes()
241    }
242}
243
244impl FromStr for NodeInfo {
245    type Err = Error;
246
247    fn from_str(value: &str) -> Result<Self> {
248        serde_json::from_str(value).context(DecodeJsonSnafu)
249    }
250}
251
252impl TryFrom<Vec<u8>> for NodeInfo {
253    type Error = Error;
254
255    fn try_from(bytes: Vec<u8>) -> Result<Self> {
256        String::from_utf8(bytes)
257            .context(FromUtf8Snafu { name: "NodeInfo" })
258            .map(|x| x.parse())?
259    }
260}
261
262impl TryFrom<NodeInfo> for Vec<u8> {
263    type Error = Error;
264
265    fn try_from(info: NodeInfo) -> Result<Self> {
266        Ok(serde_json::to_string(&info)
267            .context(EncodeJsonSnafu)?
268            .into_bytes())
269    }
270}
271
272impl From<Role> for i32 {
273    fn from(role: Role) -> Self {
274        match role {
275            Role::Datanode => 0,
276            Role::Frontend => 1,
277            Role::Flownode => 2,
278            Role::Metasrv => 99,
279        }
280    }
281}
282
283impl TryFrom<i32> for Role {
284    type Error = Error;
285
286    fn try_from(role: i32) -> Result<Self> {
287        match role {
288            0 => Ok(Self::Datanode),
289            1 => Ok(Self::Frontend),
290            2 => Ok(Self::Flownode),
291            99 => Ok(Self::Metasrv),
292            _ => InvalidRoleSnafu { role }.fail(),
293        }
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use std::assert_matches::assert_matches;
300
301    use common_workload::DatanodeWorkloadType;
302
303    use super::*;
304    use crate::cluster::Role::{Datanode, Frontend};
305    use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
306    use crate::peer::Peer;
307
308    #[test]
309    fn test_node_info_key_round_trip() {
310        let key = NodeInfoKey {
311            role: Datanode,
312            node_id: 2,
313        };
314
315        let key_bytes: Vec<u8> = (&key).into();
316        let new_key: NodeInfoKey = key_bytes.try_into().unwrap();
317
318        assert_eq!(Datanode, new_key.role);
319        assert_eq!(2, new_key.node_id);
320    }
321
322    #[test]
323    fn test_node_info_round_trip() {
324        let node_info = NodeInfo {
325            peer: Peer {
326                id: 1,
327                addr: "127.0.0.1".to_string(),
328            },
329            last_activity_ts: 123,
330            status: NodeStatus::Datanode(DatanodeStatus {
331                rcus: 1,
332                wcus: 2,
333                leader_regions: 3,
334                follower_regions: 4,
335                workloads: DatanodeWorkloads {
336                    types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
337                },
338            }),
339            version: "".to_string(),
340            git_commit: "".to_string(),
341            start_time_ms: 1,
342            total_cpu_millicores: 0,
343            total_memory_bytes: 0,
344            cpu_usage_millicores: 0,
345            memory_usage_bytes: 0,
346            hostname: "test_hostname".to_string(),
347        };
348
349        let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
350        let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
351
352        assert_matches!(
353            new_node_info,
354            NodeInfo {
355                peer: Peer { id: 1, .. },
356                last_activity_ts: 123,
357                status: NodeStatus::Datanode(DatanodeStatus {
358                    rcus: 1,
359                    wcus: 2,
360                    leader_regions: 3,
361                    follower_regions: 4,
362                    ..
363                }),
364                start_time_ms: 1,
365                ..
366            }
367        );
368    }
369
370    #[test]
371    fn test_node_info_key_prefix() {
372        let prefix = NodeInfoKey::key_prefix();
373        assert_eq!(prefix, "__meta_cluster_node_info-0-");
374
375        let prefix = NodeInfoKey::key_prefix_with_role(Frontend);
376        assert_eq!(prefix, "__meta_cluster_node_info-0-1-");
377    }
378
379    #[test]
380    fn test_calculate_node_id_from_addr() {
381        // Test empty string
382        assert_eq!(calculate_node_id(""), calculate_node_id(""));
383
384        // Test same addresses return same ids
385        let addr1 = "127.0.0.1:8080";
386        let id1 = calculate_node_id(addr1);
387        let id2 = calculate_node_id(addr1);
388        assert_eq!(id1, id2);
389
390        // Test different addresses return different ids
391        let addr2 = "127.0.0.1:8081";
392        let id3 = calculate_node_id(addr2);
393        assert_ne!(id1, id3);
394
395        // Test long address
396        let long_addr = "very.long.domain.name.example.com:9999";
397        let id4 = calculate_node_id(long_addr);
398        assert!(id4 > 0);
399    }
400}