meta_srv/handler/
keep_lease_handler.rs1use api::v1::meta::heartbeat_request::NodeWorkloads;
16use api::v1::meta::{FlownodeWorkloads, HeartbeatRequest, Peer, Role};
17use common_meta::heartbeat::utils::get_datanode_workloads;
18use common_meta::rpc::store::PutRequest;
19use common_telemetry::{trace, warn};
20use common_time::util as time_util;
21
22use crate::error::Result;
23use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
24use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
25use crate::metasrv::Context;
26
27pub struct DatanodeKeepLeaseHandler;
29
30#[async_trait::async_trait]
31impl HeartbeatHandler for DatanodeKeepLeaseHandler {
32 fn is_acceptable(&self, role: Role) -> bool {
33 role == Role::Datanode
34 }
35
36 async fn handle(
37 &self,
38 req: &HeartbeatRequest,
39 ctx: &mut Context,
40 _acc: &mut HeartbeatAccumulator,
41 ) -> Result<HandleControl> {
42 let HeartbeatRequest {
43 header,
44 peer,
45 node_workloads,
46 ..
47 } = req;
48 let Some(_header) = &header else {
49 return Ok(HandleControl::Continue);
50 };
51 let Some(peer) = &peer else {
52 return Ok(HandleControl::Continue);
53 };
54
55 let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
56 let key = DatanodeLeaseKey { node_id: peer.id };
57 let value = LeaseValue {
58 timestamp_millis: time_util::current_time_millis(),
59 node_addr: peer.addr.clone(),
60 workloads: NodeWorkloads::Datanode(datanode_workloads),
61 };
62
63 trace!("Receive a heartbeat from datanode: {key:?}, {value:?}");
64
65 let key = key.try_into()?;
66 let value = value.try_into()?;
67 put_into_memory_store(ctx, key, value, peer).await;
68
69 Ok(HandleControl::Continue)
70 }
71}
72
73pub struct FlownodeKeepLeaseHandler;
75
76#[async_trait::async_trait]
77impl HeartbeatHandler for FlownodeKeepLeaseHandler {
78 fn is_acceptable(&self, role: Role) -> bool {
79 role == Role::Flownode
80 }
81
82 async fn handle(
83 &self,
84 req: &HeartbeatRequest,
85 ctx: &mut Context,
86 _acc: &mut HeartbeatAccumulator,
87 ) -> Result<HandleControl> {
88 let HeartbeatRequest { header, peer, .. } = req;
89 let Some(_header) = &header else {
90 return Ok(HandleControl::Continue);
91 };
92 let Some(peer) = &peer else {
93 return Ok(HandleControl::Continue);
94 };
95
96 let key = FlownodeLeaseKey { node_id: peer.id };
97 let value = LeaseValue {
98 timestamp_millis: time_util::current_time_millis(),
99 node_addr: peer.addr.clone(),
100 workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
101 };
102
103 trace!("Receive a heartbeat from flownode: {key:?}, {value:?}");
104
105 let key = key.try_into()?;
106 let value = value.try_into()?;
107 put_into_memory_store(ctx, key, value, peer).await;
108
109 Ok(HandleControl::Continue)
110 }
111}
112
113async fn put_into_memory_store(ctx: &mut Context, key: Vec<u8>, value: Vec<u8>, peer: &Peer) {
114 let put_req = PutRequest {
115 key,
116 value,
117 ..Default::default()
118 };
119
120 let res = ctx.in_memory.put(put_req).await;
121
122 if let Err(err) = res {
123 warn!(err; "Failed to update lease KV, peer: {peer:?}");
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use std::collections::HashMap;
130
131 use api::v1::meta::RequestHeader;
132 use common_meta::datanode::Stat;
133
134 use super::*;
135 use crate::handler::test_utils::TestEnv;
136 use crate::lease::find_datanode_lease_value;
137
138 #[tokio::test]
139 async fn test_put_into_memory_store() {
140 let env = TestEnv::new();
141 let ctx = env.ctx();
142
143 let handler = DatanodeKeepLeaseHandler;
144 handle_request_many_times(ctx.clone(), &handler, 1).await;
145
146 let lease_value = find_datanode_lease_value(1, &ctx.in_memory)
147 .await
148 .unwrap()
149 .unwrap();
150 assert_eq!(lease_value.node_addr, "127.0.0.1:1");
151 assert!(lease_value.timestamp_millis != 0);
152 }
153
154 async fn handle_request_many_times(
155 mut ctx: Context,
156 handler: &DatanodeKeepLeaseHandler,
157 loop_times: i32,
158 ) {
159 let req = HeartbeatRequest {
160 header: Some(RequestHeader::new(1, Role::Datanode, HashMap::new())),
161 peer: Some(Peer::new(1, "127.0.0.1:1")),
162 ..Default::default()
163 };
164
165 for i in 1..=loop_times {
166 let mut acc = HeartbeatAccumulator {
167 stat: Some(Stat {
168 id: 101,
169 region_num: i as _,
170 ..Default::default()
171 }),
172 ..Default::default()
173 };
174 handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
175 }
176 }
177}