1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
19use async_trait::async_trait;
20use common_meta::key::TableMetadataManagerRef;
21use common_meta::region_keeper::MemoryRegionKeeperRef;
22use store_api::region_engine::GrantedRegion;
23use store_api::storage::RegionId;
24
25use crate::error::Result;
26use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
27use crate::metasrv::Context;
28use crate::region::lease_keeper::{
29 RegionLeaseInfo, RegionLeaseKeeperRef, RenewRegionLeasesResponse,
30};
31use crate::region::RegionLeaseKeeper;
32
33pub struct RegionLeaseHandler {
34 region_lease_seconds: u64,
35 region_lease_keeper: RegionLeaseKeeperRef,
36 customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
37}
38
39pub type CustomizedRegionLeaseRenewerRef = Arc<dyn CustomizedRegionLeaseRenewer>;
40
41pub trait CustomizedRegionLeaseRenewer: Send + Sync {
42 fn renew(
43 &self,
44 ctx: &mut Context,
45 regions: HashMap<RegionId, RegionLeaseInfo>,
46 ) -> Vec<GrantedRegion>;
47}
48
49impl RegionLeaseHandler {
50 pub fn new(
51 region_lease_seconds: u64,
52 table_metadata_manager: TableMetadataManagerRef,
53 memory_region_keeper: MemoryRegionKeeperRef,
54 customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
55 ) -> Self {
56 let region_lease_keeper =
57 RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone());
58
59 Self {
60 region_lease_seconds,
61 region_lease_keeper: Arc::new(region_lease_keeper),
62 customized_region_lease_renewer,
63 }
64 }
65}
66
67#[async_trait]
68impl HeartbeatHandler for RegionLeaseHandler {
69 fn is_acceptable(&self, role: Role) -> bool {
70 role == Role::Datanode
71 }
72
73 async fn handle(
74 &self,
75 req: &HeartbeatRequest,
76 ctx: &mut Context,
77 acc: &mut HeartbeatAccumulator,
78 ) -> Result<HandleControl> {
79 let Some(stat) = acc.stat.as_ref() else {
80 return Ok(HandleControl::Continue);
81 };
82
83 let regions = stat.regions();
84 let datanode_id = stat.id;
85
86 let RenewRegionLeasesResponse {
87 non_exists,
88 renewed,
89 } = self
90 .region_lease_keeper
91 .renew_region_leases(datanode_id, ®ions)
92 .await?;
93
94 let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
95 renewer
96 .renew(ctx, renewed)
97 .into_iter()
98 .map(|region| region.into())
99 .collect()
100 } else {
101 renewed
102 .into_iter()
103 .map(|(region_id, region_lease_info)| {
104 GrantedRegion::new(region_id, region_lease_info.role).into()
105 })
106 .collect::<Vec<_>>()
107 };
108
109 acc.region_lease = Some(RegionLease {
110 regions: renewed,
111 duration_since_epoch: req.duration_since_epoch,
112 lease_seconds: self.region_lease_seconds,
113 closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
114 });
115 acc.inactive_region_ids = non_exists;
116
117 Ok(HandleControl::Continue)
118 }
119}
120
121#[cfg(test)]
122mod test {
123 use std::collections::{HashMap, HashSet};
124 use std::sync::Arc;
125
126 use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
127 use common_meta::distributed_time_constants;
128 use common_meta::key::table_route::TableRouteValue;
129 use common_meta::key::test_utils::new_test_table_info;
130 use common_meta::key::TableMetadataManager;
131 use common_meta::kv_backend::memory::MemoryKvBackend;
132 use common_meta::peer::Peer;
133 use common_meta::region_keeper::MemoryRegionKeeper;
134 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
135 use store_api::region_engine::RegionRole;
136 use store_api::storage::RegionId;
137
138 use super::*;
139 use crate::metasrv::builder::MetasrvBuilder;
140
141 fn new_test_keeper() -> RegionLeaseKeeper {
142 let store = Arc::new(MemoryKvBackend::new());
143
144 let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
145
146 let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
147 RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper)
148 }
149
150 fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
151 RegionStat {
152 id: region_id,
153 role,
154 rcus: 0,
155 wcus: 0,
156 approximate_bytes: 0,
157 engine: String::new(),
158 num_rows: 0,
159 memtable_size: 0,
160 manifest_size: 0,
161 sst_size: 0,
162 index_size: 0,
163 region_manifest: RegionManifestInfo::Mito {
164 manifest_version: 0,
165 flushed_entry_id: 0,
166 },
167 data_topic_latest_entry_id: 0,
168 metadata_topic_latest_entry_id: 0,
169 }
170 }
171
172 #[tokio::test]
173 async fn test_handle_upgradable_follower() {
174 let datanode_id = 1;
175 let region_number = 1u32;
176 let table_id = 10;
177 let region_id = RegionId::new(table_id, region_number);
178 let another_region_id = RegionId::new(table_id, region_number + 1);
179 let peer = Peer::empty(datanode_id);
180 let follower_peer = Peer::empty(datanode_id + 1);
181 let table_info = new_test_table_info(table_id, vec![region_number]).into();
182
183 let region_routes = vec![RegionRoute {
184 region: Region::new_test(region_id),
185 leader_peer: Some(peer.clone()),
186 follower_peers: vec![follower_peer.clone()],
187 ..Default::default()
188 }];
189
190 let keeper = new_test_keeper();
191 let table_metadata_manager = keeper.table_metadata_manager();
192
193 table_metadata_manager
194 .create_table_metadata(
195 table_info,
196 TableRouteValue::physical(region_routes),
197 HashMap::default(),
198 )
199 .await
200 .unwrap();
201
202 let builder = MetasrvBuilder::new();
203 let metasrv = builder.build().await.unwrap();
204 let ctx = &mut metasrv.new_ctx();
205
206 let acc = &mut HeartbeatAccumulator::default();
207
208 acc.stat = Some(Stat {
209 id: peer.id,
210 region_stats: vec![
211 new_empty_region_stat(region_id, RegionRole::Follower),
212 new_empty_region_stat(another_region_id, RegionRole::Follower),
213 ],
214 ..Default::default()
215 });
216
217 let req = HeartbeatRequest {
218 duration_since_epoch: 1234,
219 ..Default::default()
220 };
221
222 let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
223
224 let handler = RegionLeaseHandler::new(
225 distributed_time_constants::REGION_LEASE_SECS,
226 table_metadata_manager.clone(),
227 opening_region_keeper.clone(),
228 None,
229 );
230
231 handler.handle(&req, ctx, acc).await.unwrap();
232
233 assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
234 assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
235 assert_eq!(
236 acc.region_lease.as_ref().unwrap().closeable_region_ids,
237 vec![another_region_id]
238 );
239
240 let acc = &mut HeartbeatAccumulator::default();
241
242 acc.stat = Some(Stat {
243 id: follower_peer.id,
244 region_stats: vec![
245 new_empty_region_stat(region_id, RegionRole::Follower),
246 new_empty_region_stat(another_region_id, RegionRole::Follower),
247 ],
248 ..Default::default()
249 });
250
251 handler.handle(&req, ctx, acc).await.unwrap();
252
253 assert_eq!(
254 acc.region_lease.as_ref().unwrap().lease_seconds,
255 distributed_time_constants::REGION_LEASE_SECS
256 );
257
258 assert_region_lease(
259 acc,
260 vec![GrantedRegion::new(region_id, RegionRole::Follower)],
261 );
262 assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
263 assert_eq!(
264 acc.region_lease.as_ref().unwrap().closeable_region_ids,
265 vec![another_region_id]
266 );
267
268 let opening_region_id = RegionId::new(table_id, region_number + 2);
269 let _guard = opening_region_keeper
270 .register(follower_peer.id, opening_region_id)
271 .unwrap();
272
273 let acc = &mut HeartbeatAccumulator::default();
274
275 acc.stat = Some(Stat {
276 id: follower_peer.id,
277 region_stats: vec![
278 new_empty_region_stat(region_id, RegionRole::Follower),
279 new_empty_region_stat(another_region_id, RegionRole::Follower),
280 new_empty_region_stat(opening_region_id, RegionRole::Follower),
281 ],
282 ..Default::default()
283 });
284
285 handler.handle(&req, ctx, acc).await.unwrap();
286
287 assert_eq!(
288 acc.region_lease.as_ref().unwrap().lease_seconds,
289 distributed_time_constants::REGION_LEASE_SECS
290 );
291
292 assert_region_lease(
293 acc,
294 vec![
295 GrantedRegion::new(region_id, RegionRole::Follower),
296 GrantedRegion::new(opening_region_id, RegionRole::Follower),
297 ],
298 );
299 assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
300 assert_eq!(
301 acc.region_lease.as_ref().unwrap().closeable_region_ids,
302 vec![another_region_id]
303 );
304 }
305
306 #[tokio::test]
307
308 async fn test_handle_downgradable_leader() {
309 let datanode_id = 1;
310 let region_number = 1u32;
311 let table_id = 10;
312 let region_id = RegionId::new(table_id, region_number);
313 let another_region_id = RegionId::new(table_id, region_number + 1);
314 let no_exist_region_id = RegionId::new(table_id, region_number + 2);
315 let peer = Peer::empty(datanode_id);
316 let follower_peer = Peer::empty(datanode_id + 1);
317 let table_info = new_test_table_info(table_id, vec![region_number]).into();
318
319 let region_routes = vec![
320 RegionRoute {
321 region: Region::new_test(region_id),
322 leader_peer: Some(peer.clone()),
323 follower_peers: vec![follower_peer.clone()],
324 leader_state: Some(LeaderState::Downgrading),
325 leader_down_since: Some(1),
326 },
327 RegionRoute {
328 region: Region::new_test(another_region_id),
329 leader_peer: Some(peer.clone()),
330 ..Default::default()
331 },
332 ];
333
334 let keeper = new_test_keeper();
335 let table_metadata_manager = keeper.table_metadata_manager();
336
337 table_metadata_manager
338 .create_table_metadata(
339 table_info,
340 TableRouteValue::physical(region_routes),
341 HashMap::default(),
342 )
343 .await
344 .unwrap();
345
346 let builder = MetasrvBuilder::new();
347 let metasrv = builder.build().await.unwrap();
348 let ctx = &mut metasrv.new_ctx();
349
350 let req = HeartbeatRequest {
351 duration_since_epoch: 1234,
352 ..Default::default()
353 };
354
355 let acc = &mut HeartbeatAccumulator::default();
356
357 acc.stat = Some(Stat {
358 id: peer.id,
359 region_stats: vec![
360 new_empty_region_stat(region_id, RegionRole::Leader),
361 new_empty_region_stat(another_region_id, RegionRole::Leader),
362 new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
363 ],
364 ..Default::default()
365 });
366
367 let handler = RegionLeaseHandler::new(
368 distributed_time_constants::REGION_LEASE_SECS,
369 table_metadata_manager.clone(),
370 Default::default(),
371 None,
372 );
373
374 handler.handle(&req, ctx, acc).await.unwrap();
375
376 assert_region_lease(
377 acc,
378 vec![
379 GrantedRegion::new(region_id, RegionRole::DowngradingLeader),
380 GrantedRegion::new(another_region_id, RegionRole::Leader),
381 ],
382 );
383 assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id]));
384 }
385
386 fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {
387 let region_lease = acc.region_lease.as_ref().unwrap().clone();
388 let granted: Vec<GrantedRegion> = region_lease
389 .regions
390 .into_iter()
391 .map(Into::into)
392 .collect::<Vec<_>>();
393
394 let granted = granted
395 .into_iter()
396 .map(|region| (region.region_id, region))
397 .collect::<HashMap<_, _>>();
398
399 let expected = expected
400 .into_iter()
401 .map(|region| (region.region_id, region))
402 .collect::<HashMap<_, _>>();
403
404 assert_eq!(granted, expected);
405 }
406}