meta_srv/
key.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod datanode;
16mod flownode;
17
18use std::str::FromStr;
19
20use api::v1::meta::heartbeat_request::NodeWorkloads;
21pub use datanode::*;
22pub use flownode::*;
23use serde::{Deserialize, Serialize};
24use snafu::{ensure, OptionExt, ResultExt};
25
26use crate::error;
27
28macro_rules! impl_from_str_lease_key {
29    ($key_type:ty, $pattern:expr) => {
30        impl FromStr for $key_type {
31            type Err = error::Error;
32
33            fn from_str(key: &str) -> error::Result<Self> {
34                let caps = $pattern
35                    .captures(key)
36                    .context(error::InvalidLeaseKeySnafu { key })?;
37
38                ensure!(caps.len() == 3, error::InvalidLeaseKeySnafu { key });
39                let node_id = caps[2].to_string();
40                let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
41                    err_msg: format!("invalid node_id: {node_id}"),
42                })?;
43
44                Ok(Self { node_id })
45            }
46        }
47    };
48}
49
50impl_from_str_lease_key!(FlownodeLeaseKey, FLOWNODE_LEASE_KEY_PATTERN);
51impl_from_str_lease_key!(DatanodeLeaseKey, DATANODE_LEASE_KEY_PATTERN);
52
53macro_rules! impl_try_from_lease_key {
54    ($key_type:ty, $prefix:expr) => {
55        impl TryFrom<Vec<u8>> for $key_type {
56            type Error = error::Error;
57
58            fn try_from(bytes: Vec<u8>) -> error::Result<Self> {
59                String::from_utf8(bytes)
60                    .context(error::LeaseKeyFromUtf8Snafu {})
61                    .map(|x| x.parse())?
62            }
63        }
64
65        impl TryFrom<$key_type> for Vec<u8> {
66            type Error = error::Error;
67
68            fn try_from(key: $key_type) -> error::Result<Self> {
69                Ok(format!("{}-0-{}", $prefix, key.node_id).into_bytes())
70            }
71        }
72    };
73}
74
75impl_try_from_lease_key!(FlownodeLeaseKey, FLOWNODE_LEASE_PREFIX);
76impl_try_from_lease_key!(DatanodeLeaseKey, DATANODE_LEASE_PREFIX);
77
78#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
79pub struct LeaseValue {
80    // last activity
81    pub timestamp_millis: i64,
82    pub node_addr: String,
83    pub workloads: NodeWorkloads,
84}
85
86impl FromStr for LeaseValue {
87    type Err = error::Error;
88
89    fn from_str(value: &str) -> crate::Result<Self> {
90        serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
91    }
92}
93
94impl TryFrom<Vec<u8>> for LeaseValue {
95    type Error = error::Error;
96
97    fn try_from(bytes: Vec<u8>) -> crate::Result<Self> {
98        String::from_utf8(bytes)
99            .context(error::LeaseValueFromUtf8Snafu {})
100            .map(|x| x.parse())?
101    }
102}
103
104impl TryFrom<LeaseValue> for Vec<u8> {
105    type Error = error::Error;
106
107    fn try_from(value: LeaseValue) -> crate::Result<Self> {
108        Ok(serde_json::to_string(&value)
109            .context(error::SerializeToJsonSnafu {
110                input: format!("{value:?}"),
111            })?
112            .into_bytes())
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use api::v1::meta::DatanodeWorkloads;
119
120    use super::*;
121
122    #[test]
123    fn test_lease_value_round_trip() {
124        let value = LeaseValue {
125            timestamp_millis: 111,
126            node_addr: "127.0.0.1:3002".to_string(),
127            workloads: NodeWorkloads::Datanode(DatanodeWorkloads { types: vec![] }),
128        };
129
130        let value_bytes: Vec<u8> = value.clone().try_into().unwrap();
131        let new_value: LeaseValue = value_bytes.try_into().unwrap();
132
133        assert_eq!(new_value, value);
134    }
135}