1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
19use async_trait::async_trait;
20use common_meta::key::TableMetadataManagerRef;
21use common_meta::region_keeper::MemoryRegionKeeperRef;
22use common_telemetry::error;
23use store_api::region_engine::GrantedRegion;
24use store_api::storage::RegionId;
25
26use crate::error::Result;
27use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
28use crate::metasrv::Context;
29use crate::region::RegionLeaseKeeper;
30use crate::region::lease_keeper::{
31 RegionLeaseInfo, RegionLeaseKeeperRef, RenewRegionLeasesResponse,
32};
33
34pub struct RegionLeaseHandler {
35 region_lease_seconds: u64,
36 region_lease_keeper: RegionLeaseKeeperRef,
37 customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
38}
39
40pub type CustomizedRegionLeaseRenewerRef = Arc<dyn CustomizedRegionLeaseRenewer>;
41
42pub trait CustomizedRegionLeaseRenewer: Send + Sync {
43 fn renew(
44 &self,
45 ctx: &mut Context,
46 regions: HashMap<RegionId, RegionLeaseInfo>,
47 ) -> Vec<GrantedRegion>;
48}
49
50impl RegionLeaseHandler {
51 pub fn new(
52 region_lease_seconds: u64,
53 table_metadata_manager: TableMetadataManagerRef,
54 memory_region_keeper: MemoryRegionKeeperRef,
55 customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
56 ) -> Self {
57 let region_lease_keeper =
58 RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone());
59
60 Self {
61 region_lease_seconds,
62 region_lease_keeper: Arc::new(region_lease_keeper),
63 customized_region_lease_renewer,
64 }
65 }
66}
67
68#[async_trait]
69impl HeartbeatHandler for RegionLeaseHandler {
70 fn is_acceptable(&self, role: Role) -> bool {
71 role == Role::Datanode
72 }
73
74 async fn handle(
75 &self,
76 req: &HeartbeatRequest,
77 ctx: &mut Context,
78 acc: &mut HeartbeatAccumulator,
79 ) -> Result<HandleControl> {
80 let Some(stat) = acc.stat.as_ref() else {
81 return Ok(HandleControl::Continue);
82 };
83
84 let regions = stat.regions();
85 let datanode_id = stat.id;
86
87 match self
88 .region_lease_keeper
89 .renew_region_leases(datanode_id, ®ions)
90 .await
91 {
92 Ok(RenewRegionLeasesResponse {
93 non_exists,
94 renewed,
95 }) => {
96 let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
97 renewer
98 .renew(ctx, renewed)
99 .into_iter()
100 .map(|region| region.into())
101 .collect()
102 } else {
103 renewed
104 .into_iter()
105 .map(|(region_id, region_lease_info)| {
106 GrantedRegion::new(region_id, region_lease_info.role).into()
107 })
108 .collect::<Vec<_>>()
109 };
110
111 acc.region_lease = Some(RegionLease {
112 regions: renewed,
113 duration_since_epoch: req.duration_since_epoch,
114 lease_seconds: self.region_lease_seconds,
115 closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
116 });
117 acc.inactive_region_ids = non_exists;
118 }
119 Err(e) => {
120 error!(e; "Failed to renew region leases for datanode: {datanode_id:?}, regions: {:?}", regions);
121 }
124 }
125
126 Ok(HandleControl::Continue)
127 }
128}
129
130#[cfg(test)]
131mod test {
132
133 use std::collections::{HashMap, HashSet};
134 use std::sync::Arc;
135
136 use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
137 use common_meta::distributed_time_constants::default_distributed_time_constants;
138 use common_meta::key::TableMetadataManager;
139 use common_meta::key::table_route::TableRouteValue;
140 use common_meta::key::test_utils::new_test_table_info;
141 use common_meta::kv_backend::memory::MemoryKvBackend;
142 use common_meta::kv_backend::test_util::MockKvBackendBuilder;
143 use common_meta::peer::Peer;
144 use common_meta::region_keeper::MemoryRegionKeeper;
145 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
146 use store_api::region_engine::RegionRole;
147 use store_api::storage::RegionId;
148
149 use super::*;
150 use crate::metasrv::builder::MetasrvBuilder;
151
152 fn new_test_keeper() -> RegionLeaseKeeper {
153 let store = Arc::new(MemoryKvBackend::new());
154
155 let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
156
157 let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
158 RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper)
159 }
160
161 fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
162 RegionStat {
163 id: region_id,
164 role,
165 rcus: 0,
166 wcus: 0,
167 approximate_bytes: 0,
168 engine: String::new(),
169 num_rows: 0,
170 memtable_size: 0,
171 manifest_size: 0,
172 sst_size: 0,
173 sst_num: 0,
174 index_size: 0,
175 region_manifest: RegionManifestInfo::Mito {
176 manifest_version: 0,
177 flushed_entry_id: 0,
178 file_removed_cnt: 0,
179 },
180 data_topic_latest_entry_id: 0,
181 metadata_topic_latest_entry_id: 0,
182 written_bytes: 0,
183 }
184 }
185
186 #[tokio::test]
187 async fn test_handle_upgradable_follower() {
188 let datanode_id = 1;
189 let region_number = 1u32;
190 let table_id = 10;
191 let region_id = RegionId::new(table_id, region_number);
192 let another_region_id = RegionId::new(table_id, region_number + 1);
193 let peer = Peer::empty(datanode_id);
194 let follower_peer = Peer::empty(datanode_id + 1);
195 let table_info = new_test_table_info(table_id, vec![region_number]).into();
196
197 let region_routes = vec![RegionRoute {
198 region: Region::new_test(region_id),
199 leader_peer: Some(peer.clone()),
200 follower_peers: vec![follower_peer.clone()],
201 ..Default::default()
202 }];
203
204 let keeper = new_test_keeper();
205 let table_metadata_manager = keeper.table_metadata_manager();
206
207 table_metadata_manager
208 .create_table_metadata(
209 table_info,
210 TableRouteValue::physical(region_routes),
211 HashMap::default(),
212 )
213 .await
214 .unwrap();
215
216 let builder = MetasrvBuilder::new();
217 let metasrv = builder.build().await.unwrap();
218 let ctx = &mut metasrv.new_ctx();
219
220 let acc = &mut HeartbeatAccumulator::default();
221
222 acc.stat = Some(Stat {
223 id: peer.id,
224 region_stats: vec![
225 new_empty_region_stat(region_id, RegionRole::Follower),
226 new_empty_region_stat(another_region_id, RegionRole::Follower),
227 ],
228 ..Default::default()
229 });
230
231 let req = HeartbeatRequest {
232 duration_since_epoch: 1234,
233 ..Default::default()
234 };
235
236 let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
237
238 let handler = RegionLeaseHandler::new(
239 default_distributed_time_constants().region_lease.as_secs(),
240 table_metadata_manager.clone(),
241 opening_region_keeper.clone(),
242 None,
243 );
244
245 handler.handle(&req, ctx, acc).await.unwrap();
246
247 assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
248 assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
249 assert_eq!(
250 acc.region_lease.as_ref().unwrap().closeable_region_ids,
251 vec![another_region_id]
252 );
253
254 let acc = &mut HeartbeatAccumulator::default();
255
256 acc.stat = Some(Stat {
257 id: follower_peer.id,
258 region_stats: vec![
259 new_empty_region_stat(region_id, RegionRole::Follower),
260 new_empty_region_stat(another_region_id, RegionRole::Follower),
261 ],
262 ..Default::default()
263 });
264
265 handler.handle(&req, ctx, acc).await.unwrap();
266
267 assert_eq!(
268 acc.region_lease.as_ref().unwrap().lease_seconds,
269 default_distributed_time_constants().region_lease.as_secs()
270 );
271
272 assert_region_lease(
273 acc,
274 vec![GrantedRegion::new(region_id, RegionRole::Follower)],
275 );
276 assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
277 assert_eq!(
278 acc.region_lease.as_ref().unwrap().closeable_region_ids,
279 vec![another_region_id]
280 );
281
282 let opening_region_id = RegionId::new(table_id, region_number + 2);
283 let _guard = opening_region_keeper
284 .register(follower_peer.id, opening_region_id)
285 .unwrap();
286
287 let acc = &mut HeartbeatAccumulator::default();
288
289 acc.stat = Some(Stat {
290 id: follower_peer.id,
291 region_stats: vec![
292 new_empty_region_stat(region_id, RegionRole::Follower),
293 new_empty_region_stat(another_region_id, RegionRole::Follower),
294 new_empty_region_stat(opening_region_id, RegionRole::Follower),
295 ],
296 ..Default::default()
297 });
298
299 handler.handle(&req, ctx, acc).await.unwrap();
300
301 assert_eq!(
302 acc.region_lease.as_ref().unwrap().lease_seconds,
303 default_distributed_time_constants().region_lease.as_secs()
304 );
305
306 assert_region_lease(
307 acc,
308 vec![
309 GrantedRegion::new(region_id, RegionRole::Follower),
310 GrantedRegion::new(opening_region_id, RegionRole::Follower),
311 ],
312 );
313 assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
314 assert_eq!(
315 acc.region_lease.as_ref().unwrap().closeable_region_ids,
316 vec![another_region_id]
317 );
318 }
319
320 #[tokio::test]
321
322 async fn test_handle_downgradable_leader() {
323 let datanode_id = 1;
324 let region_number = 1u32;
325 let table_id = 10;
326 let region_id = RegionId::new(table_id, region_number);
327 let another_region_id = RegionId::new(table_id, region_number + 1);
328 let no_exist_region_id = RegionId::new(table_id, region_number + 2);
329 let peer = Peer::empty(datanode_id);
330 let follower_peer = Peer::empty(datanode_id + 1);
331 let table_info = new_test_table_info(table_id, vec![region_number]).into();
332
333 let region_routes = vec![
334 RegionRoute {
335 region: Region::new_test(region_id),
336 leader_peer: Some(peer.clone()),
337 follower_peers: vec![follower_peer.clone()],
338 leader_state: Some(LeaderState::Downgrading),
339 leader_down_since: Some(1),
340 },
341 RegionRoute {
342 region: Region::new_test(another_region_id),
343 leader_peer: Some(peer.clone()),
344 ..Default::default()
345 },
346 ];
347
348 let keeper = new_test_keeper();
349 let table_metadata_manager = keeper.table_metadata_manager();
350
351 table_metadata_manager
352 .create_table_metadata(
353 table_info,
354 TableRouteValue::physical(region_routes),
355 HashMap::default(),
356 )
357 .await
358 .unwrap();
359
360 let builder = MetasrvBuilder::new();
361 let metasrv = builder.build().await.unwrap();
362 let ctx = &mut metasrv.new_ctx();
363
364 let req = HeartbeatRequest {
365 duration_since_epoch: 1234,
366 ..Default::default()
367 };
368
369 let acc = &mut HeartbeatAccumulator::default();
370
371 acc.stat = Some(Stat {
372 id: peer.id,
373 region_stats: vec![
374 new_empty_region_stat(region_id, RegionRole::Leader),
375 new_empty_region_stat(another_region_id, RegionRole::Leader),
376 new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
377 ],
378 ..Default::default()
379 });
380
381 let handler = RegionLeaseHandler::new(
382 default_distributed_time_constants().region_lease.as_secs(),
383 table_metadata_manager.clone(),
384 Default::default(),
385 None,
386 );
387
388 handler.handle(&req, ctx, acc).await.unwrap();
389
390 assert_region_lease(
391 acc,
392 vec![
393 GrantedRegion::new(region_id, RegionRole::DowngradingLeader),
394 GrantedRegion::new(another_region_id, RegionRole::Leader),
395 ],
396 );
397 assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id]));
398 }
399
400 fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {
401 let region_lease = acc.region_lease.as_ref().unwrap().clone();
402 let granted: Vec<GrantedRegion> = region_lease
403 .regions
404 .into_iter()
405 .map(Into::into)
406 .collect::<Vec<_>>();
407
408 let granted = granted
409 .into_iter()
410 .map(|region| (region.region_id, region))
411 .collect::<HashMap<_, _>>();
412
413 let expected = expected
414 .into_iter()
415 .map(|region| (region.region_id, region))
416 .collect::<HashMap<_, _>>();
417
418 assert_eq!(granted, expected);
419 }
420
421 #[tokio::test]
422 async fn test_handle_renew_region_lease_failure() {
423 common_telemetry::init_default_ut_logging();
424 let kv = MockKvBackendBuilder::default()
425 .batch_get_fn(Arc::new(|_| {
426 common_meta::error::UnexpectedSnafu {
427 err_msg: "mock err",
428 }
429 .fail()
430 }) as _)
431 .build()
432 .unwrap();
433 let kvbackend = Arc::new(kv);
434 let table_metadata_manager = Arc::new(TableMetadataManager::new(kvbackend));
435
436 let datanode_id = 1;
437 let region_number = 1u32;
438 let table_id = 10;
439 let region_id = RegionId::new(table_id, region_number);
440 let another_region_id = RegionId::new(table_id, region_number + 1);
441 let no_exist_region_id = RegionId::new(table_id, region_number + 2);
442 let peer = Peer::empty(datanode_id);
443
444 let builder = MetasrvBuilder::new();
445 let metasrv = builder.build().await.unwrap();
446 let ctx = &mut metasrv.new_ctx();
447
448 let req = HeartbeatRequest {
449 duration_since_epoch: 1234,
450 ..Default::default()
451 };
452
453 let acc = &mut HeartbeatAccumulator::default();
454 acc.stat = Some(Stat {
455 id: peer.id,
456 region_stats: vec![
457 new_empty_region_stat(region_id, RegionRole::Leader),
458 new_empty_region_stat(another_region_id, RegionRole::Leader),
459 new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
460 ],
461 ..Default::default()
462 });
463 let handler = RegionLeaseHandler::new(
464 default_distributed_time_constants().region_lease.as_secs(),
465 table_metadata_manager.clone(),
466 Default::default(),
467 None,
468 );
469 handler.handle(&req, ctx, acc).await.unwrap();
470
471 assert!(acc.region_lease.is_none());
472 assert!(acc.inactive_region_ids.is_empty());
473 }
474}