meta_srv/handler/
keep_lease_handler.rs1use api::v1::meta::heartbeat_request::NodeWorkloads;
16use api::v1::meta::{FlownodeWorkloads, HeartbeatRequest, Peer, Role};
17use common_meta::heartbeat::utils::get_datanode_workloads;
18use common_meta::rpc::store::PutRequest;
19use common_telemetry::{trace, warn};
20use common_time::util as time_util;
21
22use crate::error::Result;
23use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
24use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
25use crate::metasrv::Context;
26
27pub struct DatanodeKeepLeaseHandler;
29
30#[async_trait::async_trait]
31impl HeartbeatHandler for DatanodeKeepLeaseHandler {
32 fn is_acceptable(&self, role: Role) -> bool {
33 role == Role::Datanode
34 }
35
36 async fn handle(
37 &self,
38 req: &HeartbeatRequest,
39 ctx: &mut Context,
40 _acc: &mut HeartbeatAccumulator,
41 ) -> Result<HandleControl> {
42 let HeartbeatRequest {
43 header,
44 peer,
45 node_workloads,
46 ..
47 } = req;
48 let Some(_header) = &header else {
49 return Ok(HandleControl::Continue);
50 };
51 let Some(peer) = &peer else {
52 return Ok(HandleControl::Continue);
53 };
54
55 let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
56 let key = DatanodeLeaseKey { node_id: peer.id };
57 let value = LeaseValue {
58 timestamp_millis: time_util::current_time_millis(),
59 node_addr: peer.addr.clone(),
60 workloads: NodeWorkloads::Datanode(datanode_workloads),
61 };
62
63 trace!("Receive a heartbeat from datanode: {key:?}, {value:?}");
64
65 let key = key.try_into()?;
66 let value = value.try_into()?;
67 put_into_memory_store(ctx, key, value, peer).await;
68
69 Ok(HandleControl::Continue)
70 }
71}
72
73pub struct FlownodeKeepLeaseHandler;
75
76#[async_trait::async_trait]
77impl HeartbeatHandler for FlownodeKeepLeaseHandler {
78 fn is_acceptable(&self, role: Role) -> bool {
79 role == Role::Flownode
80 }
81
82 async fn handle(
83 &self,
84 req: &HeartbeatRequest,
85 ctx: &mut Context,
86 _acc: &mut HeartbeatAccumulator,
87 ) -> Result<HandleControl> {
88 let HeartbeatRequest { header, peer, .. } = req;
89 let Some(_header) = &header else {
90 return Ok(HandleControl::Continue);
91 };
92 let Some(peer) = &peer else {
93 return Ok(HandleControl::Continue);
94 };
95
96 let key = FlownodeLeaseKey { node_id: peer.id };
97 let value = LeaseValue {
98 timestamp_millis: time_util::current_time_millis(),
99 node_addr: peer.addr.clone(),
100 workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
101 };
102
103 trace!("Receive a heartbeat from flownode: {key:?}, {value:?}");
104
105 let key = key.try_into()?;
106 let value = value.try_into()?;
107 put_into_memory_store(ctx, key, value, peer).await;
108
109 Ok(HandleControl::Continue)
110 }
111}
112
113async fn put_into_memory_store(ctx: &mut Context, key: Vec<u8>, value: Vec<u8>, peer: &Peer) {
114 let put_req = PutRequest {
115 key,
116 value,
117 ..Default::default()
118 };
119
120 let res = ctx.in_memory.put(put_req).await;
121
122 if let Err(err) = res {
123 warn!(err; "Failed to update lease KV, peer: {peer:?}");
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use std::collections::HashMap;
130 use std::sync::Arc;
131
132 use api::v1::meta::RequestHeader;
133 use common_meta::cache_invalidator::DummyCacheInvalidator;
134 use common_meta::datanode::Stat;
135 use common_meta::key::TableMetadataManager;
136 use common_meta::kv_backend::memory::MemoryKvBackend;
137 use common_meta::region_registry::LeaderRegionRegistry;
138 use common_meta::sequence::SequenceBuilder;
139
140 use super::*;
141 use crate::cluster::MetaPeerClientBuilder;
142 use crate::handler::{HeartbeatMailbox, Pushers};
143 use crate::lease::find_datanode_lease_value;
144 use crate::service::store::cached_kv::LeaderCachedKvBackend;
145
146 #[tokio::test]
147 async fn test_put_into_memory_store() {
148 let in_memory = Arc::new(MemoryKvBackend::new());
149 let kv_backend = Arc::new(MemoryKvBackend::new());
150 let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
151 kv_backend.clone(),
152 ));
153 let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
154 let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
155 let meta_peer_client = MetaPeerClientBuilder::default()
156 .election(None)
157 .in_memory(in_memory.clone())
158 .build()
159 .map(Arc::new)
160 .unwrap();
162 let ctx = Context {
163 server_addr: "127.0.0.1:0000".to_string(),
164 in_memory,
165 kv_backend: kv_backend.clone(),
166 leader_cached_kv_backend,
167 meta_peer_client,
168 mailbox,
169 election: None,
170 is_infancy: false,
171 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
172 cache_invalidator: Arc::new(DummyCacheInvalidator),
173 leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
174 };
175
176 let handler = DatanodeKeepLeaseHandler;
177 handle_request_many_times(ctx.clone(), &handler, 1).await;
178
179 let lease_value = find_datanode_lease_value(1, &ctx.in_memory)
180 .await
181 .unwrap()
182 .unwrap();
183 assert_eq!(lease_value.node_addr, "127.0.0.1:1");
184 assert!(lease_value.timestamp_millis != 0);
185 }
186
187 async fn handle_request_many_times(
188 mut ctx: Context,
189 handler: &DatanodeKeepLeaseHandler,
190 loop_times: i32,
191 ) {
192 let req = HeartbeatRequest {
193 header: Some(RequestHeader::new(1, Role::Datanode, HashMap::new())),
194 peer: Some(Peer::new(1, "127.0.0.1:1")),
195 ..Default::default()
196 };
197
198 for i in 1..=loop_times {
199 let mut acc = HeartbeatAccumulator {
200 stat: Some(Stat {
201 id: 101,
202 region_num: i as _,
203 ..Default::default()
204 }),
205 ..Default::default()
206 };
207 handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
208 }
209 }
210}