1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
19use async_trait::async_trait;
20use common_meta::key::TableMetadataManagerRef;
21use common_meta::region_keeper::MemoryRegionKeeperRef;
22use common_telemetry::error;
23use store_api::region_engine::GrantedRegion;
24use store_api::storage::RegionId;
25
26use crate::error::Result;
27use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
28use crate::metasrv::Context;
29use crate::region::RegionLeaseKeeper;
30use crate::region::lease_keeper::{
31 RegionLeaseInfo, RegionLeaseKeeperRef, RenewRegionLeasesResponse,
32};
33
34pub struct RegionLeaseHandler {
35 region_lease_seconds: u64,
36 region_lease_keeper: RegionLeaseKeeperRef,
37 customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
38}
39
40pub type CustomizedRegionLeaseRenewerRef = Arc<dyn CustomizedRegionLeaseRenewer>;
41
42pub trait CustomizedRegionLeaseRenewer: Send + Sync {
43 fn renew(
44 &self,
45 ctx: &mut Context,
46 regions: HashMap<RegionId, RegionLeaseInfo>,
47 ) -> Vec<GrantedRegion>;
48}
49
50impl RegionLeaseHandler {
51 pub fn new(
52 region_lease_seconds: u64,
53 table_metadata_manager: TableMetadataManagerRef,
54 memory_region_keeper: MemoryRegionKeeperRef,
55 customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
56 ) -> Self {
57 let region_lease_keeper =
58 RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone());
59
60 Self {
61 region_lease_seconds,
62 region_lease_keeper: Arc::new(region_lease_keeper),
63 customized_region_lease_renewer,
64 }
65 }
66}
67
68#[async_trait]
69impl HeartbeatHandler for RegionLeaseHandler {
70 fn is_acceptable(&self, role: Role) -> bool {
71 role == Role::Datanode
72 }
73
74 async fn handle(
75 &self,
76 req: &HeartbeatRequest,
77 ctx: &mut Context,
78 acc: &mut HeartbeatAccumulator,
79 ) -> Result<HandleControl> {
80 let Some(stat) = acc.stat.as_ref() else {
81 return Ok(HandleControl::Continue);
82 };
83
84 let regions = stat.regions();
85 let datanode_id = stat.id;
86
87 match self
88 .region_lease_keeper
89 .renew_region_leases(datanode_id, ®ions)
90 .await
91 {
92 Ok(RenewRegionLeasesResponse {
93 non_exists,
94 renewed,
95 }) => {
96 let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
97 renewer
98 .renew(ctx, renewed)
99 .into_iter()
100 .map(|region| region.into())
101 .collect()
102 } else {
103 renewed
104 .into_iter()
105 .map(|(region_id, region_lease_info)| {
106 GrantedRegion::new(region_id, region_lease_info.role).into()
107 })
108 .collect::<Vec<_>>()
109 };
110
111 acc.region_lease = Some(RegionLease {
112 regions: renewed,
113 duration_since_epoch: req.duration_since_epoch,
114 lease_seconds: self.region_lease_seconds,
115 closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
116 });
117 acc.inactive_region_ids = non_exists;
118 }
119 Err(e) => {
120 error!(e; "Failed to renew region leases for datanode: {datanode_id:?}, regions: {:?}", regions);
121 }
124 }
125
126 Ok(HandleControl::Continue)
127 }
128}
129
130#[cfg(test)]
131mod test {
132 use std::collections::{HashMap, HashSet};
133 use std::sync::Arc;
134
135 use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
136 use common_meta::distributed_time_constants;
137 use common_meta::key::TableMetadataManager;
138 use common_meta::key::table_route::TableRouteValue;
139 use common_meta::key::test_utils::new_test_table_info;
140 use common_meta::kv_backend::memory::MemoryKvBackend;
141 use common_meta::peer::Peer;
142 use common_meta::region_keeper::MemoryRegionKeeper;
143 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
144 use store_api::region_engine::RegionRole;
145 use store_api::storage::RegionId;
146
147 use super::*;
148 use crate::metasrv::builder::MetasrvBuilder;
149
150 fn new_test_keeper() -> RegionLeaseKeeper {
151 let store = Arc::new(MemoryKvBackend::new());
152
153 let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
154
155 let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
156 RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper)
157 }
158
159 fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
160 RegionStat {
161 id: region_id,
162 role,
163 rcus: 0,
164 wcus: 0,
165 approximate_bytes: 0,
166 engine: String::new(),
167 num_rows: 0,
168 memtable_size: 0,
169 manifest_size: 0,
170 sst_size: 0,
171 sst_num: 0,
172 index_size: 0,
173 region_manifest: RegionManifestInfo::Mito {
174 manifest_version: 0,
175 flushed_entry_id: 0,
176 },
177 data_topic_latest_entry_id: 0,
178 metadata_topic_latest_entry_id: 0,
179 written_bytes: 0,
180 }
181 }
182
183 #[tokio::test]
184 async fn test_handle_upgradable_follower() {
185 let datanode_id = 1;
186 let region_number = 1u32;
187 let table_id = 10;
188 let region_id = RegionId::new(table_id, region_number);
189 let another_region_id = RegionId::new(table_id, region_number + 1);
190 let peer = Peer::empty(datanode_id);
191 let follower_peer = Peer::empty(datanode_id + 1);
192 let table_info = new_test_table_info(table_id, vec![region_number]).into();
193
194 let region_routes = vec![RegionRoute {
195 region: Region::new_test(region_id),
196 leader_peer: Some(peer.clone()),
197 follower_peers: vec![follower_peer.clone()],
198 ..Default::default()
199 }];
200
201 let keeper = new_test_keeper();
202 let table_metadata_manager = keeper.table_metadata_manager();
203
204 table_metadata_manager
205 .create_table_metadata(
206 table_info,
207 TableRouteValue::physical(region_routes),
208 HashMap::default(),
209 )
210 .await
211 .unwrap();
212
213 let builder = MetasrvBuilder::new();
214 let metasrv = builder.build().await.unwrap();
215 let ctx = &mut metasrv.new_ctx();
216
217 let acc = &mut HeartbeatAccumulator::default();
218
219 acc.stat = Some(Stat {
220 id: peer.id,
221 region_stats: vec![
222 new_empty_region_stat(region_id, RegionRole::Follower),
223 new_empty_region_stat(another_region_id, RegionRole::Follower),
224 ],
225 ..Default::default()
226 });
227
228 let req = HeartbeatRequest {
229 duration_since_epoch: 1234,
230 ..Default::default()
231 };
232
233 let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
234
235 let handler = RegionLeaseHandler::new(
236 distributed_time_constants::REGION_LEASE_SECS,
237 table_metadata_manager.clone(),
238 opening_region_keeper.clone(),
239 None,
240 );
241
242 handler.handle(&req, ctx, acc).await.unwrap();
243
244 assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
245 assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
246 assert_eq!(
247 acc.region_lease.as_ref().unwrap().closeable_region_ids,
248 vec![another_region_id]
249 );
250
251 let acc = &mut HeartbeatAccumulator::default();
252
253 acc.stat = Some(Stat {
254 id: follower_peer.id,
255 region_stats: vec![
256 new_empty_region_stat(region_id, RegionRole::Follower),
257 new_empty_region_stat(another_region_id, RegionRole::Follower),
258 ],
259 ..Default::default()
260 });
261
262 handler.handle(&req, ctx, acc).await.unwrap();
263
264 assert_eq!(
265 acc.region_lease.as_ref().unwrap().lease_seconds,
266 distributed_time_constants::REGION_LEASE_SECS
267 );
268
269 assert_region_lease(
270 acc,
271 vec![GrantedRegion::new(region_id, RegionRole::Follower)],
272 );
273 assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
274 assert_eq!(
275 acc.region_lease.as_ref().unwrap().closeable_region_ids,
276 vec![another_region_id]
277 );
278
279 let opening_region_id = RegionId::new(table_id, region_number + 2);
280 let _guard = opening_region_keeper
281 .register(follower_peer.id, opening_region_id)
282 .unwrap();
283
284 let acc = &mut HeartbeatAccumulator::default();
285
286 acc.stat = Some(Stat {
287 id: follower_peer.id,
288 region_stats: vec![
289 new_empty_region_stat(region_id, RegionRole::Follower),
290 new_empty_region_stat(another_region_id, RegionRole::Follower),
291 new_empty_region_stat(opening_region_id, RegionRole::Follower),
292 ],
293 ..Default::default()
294 });
295
296 handler.handle(&req, ctx, acc).await.unwrap();
297
298 assert_eq!(
299 acc.region_lease.as_ref().unwrap().lease_seconds,
300 distributed_time_constants::REGION_LEASE_SECS
301 );
302
303 assert_region_lease(
304 acc,
305 vec![
306 GrantedRegion::new(region_id, RegionRole::Follower),
307 GrantedRegion::new(opening_region_id, RegionRole::Follower),
308 ],
309 );
310 assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
311 assert_eq!(
312 acc.region_lease.as_ref().unwrap().closeable_region_ids,
313 vec![another_region_id]
314 );
315 }
316
317 #[tokio::test]
318
319 async fn test_handle_downgradable_leader() {
320 let datanode_id = 1;
321 let region_number = 1u32;
322 let table_id = 10;
323 let region_id = RegionId::new(table_id, region_number);
324 let another_region_id = RegionId::new(table_id, region_number + 1);
325 let no_exist_region_id = RegionId::new(table_id, region_number + 2);
326 let peer = Peer::empty(datanode_id);
327 let follower_peer = Peer::empty(datanode_id + 1);
328 let table_info = new_test_table_info(table_id, vec![region_number]).into();
329
330 let region_routes = vec![
331 RegionRoute {
332 region: Region::new_test(region_id),
333 leader_peer: Some(peer.clone()),
334 follower_peers: vec![follower_peer.clone()],
335 leader_state: Some(LeaderState::Downgrading),
336 leader_down_since: Some(1),
337 },
338 RegionRoute {
339 region: Region::new_test(another_region_id),
340 leader_peer: Some(peer.clone()),
341 ..Default::default()
342 },
343 ];
344
345 let keeper = new_test_keeper();
346 let table_metadata_manager = keeper.table_metadata_manager();
347
348 table_metadata_manager
349 .create_table_metadata(
350 table_info,
351 TableRouteValue::physical(region_routes),
352 HashMap::default(),
353 )
354 .await
355 .unwrap();
356
357 let builder = MetasrvBuilder::new();
358 let metasrv = builder.build().await.unwrap();
359 let ctx = &mut metasrv.new_ctx();
360
361 let req = HeartbeatRequest {
362 duration_since_epoch: 1234,
363 ..Default::default()
364 };
365
366 let acc = &mut HeartbeatAccumulator::default();
367
368 acc.stat = Some(Stat {
369 id: peer.id,
370 region_stats: vec![
371 new_empty_region_stat(region_id, RegionRole::Leader),
372 new_empty_region_stat(another_region_id, RegionRole::Leader),
373 new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
374 ],
375 ..Default::default()
376 });
377
378 let handler = RegionLeaseHandler::new(
379 distributed_time_constants::REGION_LEASE_SECS,
380 table_metadata_manager.clone(),
381 Default::default(),
382 None,
383 );
384
385 handler.handle(&req, ctx, acc).await.unwrap();
386
387 assert_region_lease(
388 acc,
389 vec![
390 GrantedRegion::new(region_id, RegionRole::DowngradingLeader),
391 GrantedRegion::new(another_region_id, RegionRole::Leader),
392 ],
393 );
394 assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id]));
395 }
396
397 fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {
398 let region_lease = acc.region_lease.as_ref().unwrap().clone();
399 let granted: Vec<GrantedRegion> = region_lease
400 .regions
401 .into_iter()
402 .map(Into::into)
403 .collect::<Vec<_>>();
404
405 let granted = granted
406 .into_iter()
407 .map(|region| (region.region_id, region))
408 .collect::<HashMap<_, _>>();
409
410 let expected = expected
411 .into_iter()
412 .map(|region| (region.region_id, region))
413 .collect::<HashMap<_, _>>();
414
415 assert_eq!(granted, expected);
416 }
417}