meta_srv/handler/
keep_lease_handler.rs1use api::v1::meta::heartbeat_request::NodeWorkloads;
16use api::v1::meta::{HeartbeatRequest, Peer, Role};
17use common_meta::heartbeat::utils::{get_datanode_workloads, get_flownode_workloads};
18use common_meta::rpc::store::PutRequest;
19use common_telemetry::{trace, warn};
20use common_time::util as time_util;
21
22use crate::error::Result;
23use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
24use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
25use crate::metasrv::Context;
26
27pub struct DatanodeKeepLeaseHandler;
29
30#[async_trait::async_trait]
31impl HeartbeatHandler for DatanodeKeepLeaseHandler {
32 fn is_acceptable(&self, role: Role) -> bool {
33 role == Role::Datanode
34 }
35
36 async fn handle(
37 &self,
38 req: &HeartbeatRequest,
39 ctx: &mut Context,
40 _acc: &mut HeartbeatAccumulator,
41 ) -> Result<HandleControl> {
42 let HeartbeatRequest {
43 header,
44 peer,
45 node_workloads,
46 ..
47 } = req;
48 let Some(_header) = &header else {
49 return Ok(HandleControl::Continue);
50 };
51 let Some(peer) = &peer else {
52 return Ok(HandleControl::Continue);
53 };
54
55 let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
56 let key = DatanodeLeaseKey { node_id: peer.id };
57 let value = LeaseValue {
58 timestamp_millis: time_util::current_time_millis(),
59 node_addr: peer.addr.clone(),
60 workloads: NodeWorkloads::Datanode(datanode_workloads),
61 };
62
63 trace!("Receive a heartbeat from datanode: {key:?}, {value:?}");
64
65 let key = key.try_into()?;
66 let value = value.try_into()?;
67 put_into_memory_store(ctx, key, value, peer).await;
68
69 Ok(HandleControl::Continue)
70 }
71}
72
73pub struct FlownodeKeepLeaseHandler;
75
76#[async_trait::async_trait]
77impl HeartbeatHandler for FlownodeKeepLeaseHandler {
78 fn is_acceptable(&self, role: Role) -> bool {
79 role == Role::Flownode
80 }
81
82 async fn handle(
83 &self,
84 req: &HeartbeatRequest,
85 ctx: &mut Context,
86 _acc: &mut HeartbeatAccumulator,
87 ) -> Result<HandleControl> {
88 let HeartbeatRequest {
89 header,
90 peer,
91 node_workloads,
92 ..
93 } = req;
94 let Some(_header) = &header else {
95 return Ok(HandleControl::Continue);
96 };
97 let Some(peer) = &peer else {
98 return Ok(HandleControl::Continue);
99 };
100
101 let key = FlownodeLeaseKey { node_id: peer.id };
102 let value = LeaseValue {
103 timestamp_millis: time_util::current_time_millis(),
104 node_addr: peer.addr.clone(),
105 workloads: NodeWorkloads::Flownode(get_flownode_workloads(node_workloads.as_ref())),
106 };
107
108 trace!("Receive a heartbeat from flownode: {key:?}, {value:?}");
109
110 let key = key.try_into()?;
111 let value = value.try_into()?;
112 put_into_memory_store(ctx, key, value, peer).await;
113
114 Ok(HandleControl::Continue)
115 }
116}
117
118async fn put_into_memory_store(ctx: &mut Context, key: Vec<u8>, value: Vec<u8>, peer: &Peer) {
119 let put_req = PutRequest {
120 key,
121 value,
122 ..Default::default()
123 };
124
125 let res = ctx.in_memory.put(put_req).await;
126
127 if let Err(err) = res {
128 warn!(err; "Failed to update lease KV, peer: {peer:?}");
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use std::collections::HashMap;
135
136 use api::v1::meta::RequestHeader;
137 use common_meta::datanode::Stat;
138
139 use super::*;
140 use crate::discovery::utils::find_datanode_lease_value;
141 use crate::handler::test_utils::TestEnv;
142
143 #[tokio::test]
144 async fn test_put_into_memory_store() {
145 let env = TestEnv::new();
146 let ctx = env.ctx();
147
148 let handler = DatanodeKeepLeaseHandler;
149 handle_request_many_times(ctx.clone(), &handler, 1).await;
150
151 let in_memory = ctx.in_memory.clone() as _;
152 let lease_value = find_datanode_lease_value(&in_memory, 1)
153 .await
154 .unwrap()
155 .unwrap();
156 assert_eq!(lease_value.node_addr, "127.0.0.1:1");
157 assert!(lease_value.timestamp_millis != 0);
158 }
159
160 async fn handle_request_many_times(
161 mut ctx: Context,
162 handler: &DatanodeKeepLeaseHandler,
163 loop_times: i32,
164 ) {
165 let req = HeartbeatRequest {
166 header: Some(RequestHeader::new(1, Role::Datanode, HashMap::new())),
167 peer: Some(Peer::new(1, "127.0.0.1:1")),
168 ..Default::default()
169 };
170
171 for i in 1..=loop_times {
172 let mut acc = HeartbeatAccumulator {
173 stat: Some(Stat {
174 id: 101,
175 region_num: i as _,
176 ..Default::default()
177 }),
178 ..Default::default()
179 };
180 handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
181 }
182 }
183}