1mod datanode;
16mod flownode;
17
18use std::str::FromStr;
19
20use api::v1::meta::heartbeat_request::NodeWorkloads;
21pub use datanode::*;
22pub use flownode::*;
23use serde::{Deserialize, Serialize};
24use snafu::{ensure, OptionExt, ResultExt};
25
26use crate::error;
27
28macro_rules! impl_from_str_lease_key {
29 ($key_type:ty, $pattern:expr) => {
30 impl FromStr for $key_type {
31 type Err = error::Error;
32
33 fn from_str(key: &str) -> error::Result<Self> {
34 let caps = $pattern
35 .captures(key)
36 .context(error::InvalidLeaseKeySnafu { key })?;
37
38 ensure!(caps.len() == 3, error::InvalidLeaseKeySnafu { key });
39 let node_id = caps[2].to_string();
40 let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
41 err_msg: format!("invalid node_id: {node_id}"),
42 })?;
43
44 Ok(Self { node_id })
45 }
46 }
47 };
48}
49
50impl_from_str_lease_key!(FlownodeLeaseKey, FLOWNODE_LEASE_KEY_PATTERN);
51impl_from_str_lease_key!(DatanodeLeaseKey, DATANODE_LEASE_KEY_PATTERN);
52
53macro_rules! impl_try_from_lease_key {
54 ($key_type:ty, $prefix:expr) => {
55 impl TryFrom<Vec<u8>> for $key_type {
56 type Error = error::Error;
57
58 fn try_from(bytes: Vec<u8>) -> error::Result<Self> {
59 String::from_utf8(bytes)
60 .context(error::LeaseKeyFromUtf8Snafu {})
61 .map(|x| x.parse())?
62 }
63 }
64
65 impl TryFrom<$key_type> for Vec<u8> {
66 type Error = error::Error;
67
68 fn try_from(key: $key_type) -> error::Result<Self> {
69 Ok(format!("{}-0-{}", $prefix, key.node_id).into_bytes())
70 }
71 }
72 };
73}
74
75impl_try_from_lease_key!(FlownodeLeaseKey, FLOWNODE_LEASE_PREFIX);
76impl_try_from_lease_key!(DatanodeLeaseKey, DATANODE_LEASE_PREFIX);
77
78#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
79pub struct LeaseValue {
80 pub timestamp_millis: i64,
82 pub node_addr: String,
83 pub workloads: NodeWorkloads,
84}
85
86impl FromStr for LeaseValue {
87 type Err = error::Error;
88
89 fn from_str(value: &str) -> crate::Result<Self> {
90 serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
91 }
92}
93
94impl TryFrom<Vec<u8>> for LeaseValue {
95 type Error = error::Error;
96
97 fn try_from(bytes: Vec<u8>) -> crate::Result<Self> {
98 String::from_utf8(bytes)
99 .context(error::LeaseValueFromUtf8Snafu {})
100 .map(|x| x.parse())?
101 }
102}
103
104impl TryFrom<LeaseValue> for Vec<u8> {
105 type Error = error::Error;
106
107 fn try_from(value: LeaseValue) -> crate::Result<Self> {
108 Ok(serde_json::to_string(&value)
109 .context(error::SerializeToJsonSnafu {
110 input: format!("{value:?}"),
111 })?
112 .into_bytes())
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use api::v1::meta::DatanodeWorkloads;
119
120 use super::*;
121
122 #[test]
123 fn test_lease_value_round_trip() {
124 let value = LeaseValue {
125 timestamp_millis: 111,
126 node_addr: "127.0.0.1:3002".to_string(),
127 workloads: NodeWorkloads::Datanode(DatanodeWorkloads { types: vec![] }),
128 };
129
130 let value_bytes: Vec<u8> = value.clone().try_into().unwrap();
131 let new_value: LeaseValue = value_bytes.try_into().unwrap();
132
133 assert_eq!(new_value, value);
134 }
135}