1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
19use async_trait::async_trait;
20use common_meta::key::TableMetadataManagerRef;
21use common_meta::region_keeper::MemoryRegionKeeperRef;
22use store_api::region_engine::GrantedRegion;
23use store_api::storage::RegionId;
24
25use crate::error::Result;
26use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
27use crate::metasrv::Context;
28use crate::region::lease_keeper::{
29 RegionLeaseInfo, RegionLeaseKeeperRef, RenewRegionLeasesResponse,
30};
31use crate::region::RegionLeaseKeeper;
32
33pub struct RegionLeaseHandler {
34 region_lease_seconds: u64,
35 region_lease_keeper: RegionLeaseKeeperRef,
36 customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
37}
38
39pub type CustomizedRegionLeaseRenewerRef = Arc<dyn CustomizedRegionLeaseRenewer>;
40
41pub trait CustomizedRegionLeaseRenewer: Send + Sync {
42 fn renew(
43 &self,
44 ctx: &mut Context,
45 regions: HashMap<RegionId, RegionLeaseInfo>,
46 ) -> Vec<GrantedRegion>;
47}
48
49impl RegionLeaseHandler {
50 pub fn new(
51 region_lease_seconds: u64,
52 table_metadata_manager: TableMetadataManagerRef,
53 memory_region_keeper: MemoryRegionKeeperRef,
54 customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
55 ) -> Self {
56 let region_lease_keeper =
57 RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone());
58
59 Self {
60 region_lease_seconds,
61 region_lease_keeper: Arc::new(region_lease_keeper),
62 customized_region_lease_renewer,
63 }
64 }
65}
66
67#[async_trait]
68impl HeartbeatHandler for RegionLeaseHandler {
69 fn is_acceptable(&self, role: Role) -> bool {
70 role == Role::Datanode
71 }
72
73 async fn handle(
74 &self,
75 req: &HeartbeatRequest,
76 ctx: &mut Context,
77 acc: &mut HeartbeatAccumulator,
78 ) -> Result<HandleControl> {
79 let Some(stat) = acc.stat.as_ref() else {
80 return Ok(HandleControl::Continue);
81 };
82
83 let regions = stat.regions();
84 let datanode_id = stat.id;
85
86 let RenewRegionLeasesResponse {
87 non_exists,
88 renewed,
89 } = self
90 .region_lease_keeper
91 .renew_region_leases(datanode_id, ®ions)
92 .await?;
93
94 let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
95 renewer
96 .renew(ctx, renewed)
97 .into_iter()
98 .map(|region| region.into())
99 .collect()
100 } else {
101 renewed
102 .into_iter()
103 .map(|(region_id, region_lease_info)| {
104 GrantedRegion::new(region_id, region_lease_info.role).into()
105 })
106 .collect::<Vec<_>>()
107 };
108
109 acc.region_lease = Some(RegionLease {
110 regions: renewed,
111 duration_since_epoch: req.duration_since_epoch,
112 lease_seconds: self.region_lease_seconds,
113 closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
114 });
115 acc.inactive_region_ids = non_exists;
116
117 Ok(HandleControl::Continue)
118 }
119}
120
121#[cfg(test)]
122mod test {
123 use std::collections::{HashMap, HashSet};
124 use std::sync::Arc;
125
126 use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
127 use common_meta::distributed_time_constants;
128 use common_meta::key::table_route::TableRouteValue;
129 use common_meta::key::test_utils::new_test_table_info;
130 use common_meta::key::TableMetadataManager;
131 use common_meta::kv_backend::memory::MemoryKvBackend;
132 use common_meta::peer::Peer;
133 use common_meta::region_keeper::MemoryRegionKeeper;
134 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
135 use store_api::region_engine::RegionRole;
136 use store_api::storage::RegionId;
137
138 use super::*;
139 use crate::metasrv::builder::MetasrvBuilder;
140
141 fn new_test_keeper() -> RegionLeaseKeeper {
142 let store = Arc::new(MemoryKvBackend::new());
143
144 let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
145
146 let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
147 RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper)
148 }
149
150 fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
151 RegionStat {
152 id: region_id,
153 role,
154 rcus: 0,
155 wcus: 0,
156 approximate_bytes: 0,
157 engine: String::new(),
158 num_rows: 0,
159 memtable_size: 0,
160 manifest_size: 0,
161 sst_size: 0,
162 sst_num: 0,
163 index_size: 0,
164 region_manifest: RegionManifestInfo::Mito {
165 manifest_version: 0,
166 flushed_entry_id: 0,
167 },
168 data_topic_latest_entry_id: 0,
169 metadata_topic_latest_entry_id: 0,
170 written_bytes: 0,
171 }
172 }
173
174 #[tokio::test]
175 async fn test_handle_upgradable_follower() {
176 let datanode_id = 1;
177 let region_number = 1u32;
178 let table_id = 10;
179 let region_id = RegionId::new(table_id, region_number);
180 let another_region_id = RegionId::new(table_id, region_number + 1);
181 let peer = Peer::empty(datanode_id);
182 let follower_peer = Peer::empty(datanode_id + 1);
183 let table_info = new_test_table_info(table_id, vec![region_number]).into();
184
185 let region_routes = vec![RegionRoute {
186 region: Region::new_test(region_id),
187 leader_peer: Some(peer.clone()),
188 follower_peers: vec![follower_peer.clone()],
189 ..Default::default()
190 }];
191
192 let keeper = new_test_keeper();
193 let table_metadata_manager = keeper.table_metadata_manager();
194
195 table_metadata_manager
196 .create_table_metadata(
197 table_info,
198 TableRouteValue::physical(region_routes),
199 HashMap::default(),
200 )
201 .await
202 .unwrap();
203
204 let builder = MetasrvBuilder::new();
205 let metasrv = builder.build().await.unwrap();
206 let ctx = &mut metasrv.new_ctx();
207
208 let acc = &mut HeartbeatAccumulator::default();
209
210 acc.stat = Some(Stat {
211 id: peer.id,
212 region_stats: vec![
213 new_empty_region_stat(region_id, RegionRole::Follower),
214 new_empty_region_stat(another_region_id, RegionRole::Follower),
215 ],
216 ..Default::default()
217 });
218
219 let req = HeartbeatRequest {
220 duration_since_epoch: 1234,
221 ..Default::default()
222 };
223
224 let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
225
226 let handler = RegionLeaseHandler::new(
227 distributed_time_constants::REGION_LEASE_SECS,
228 table_metadata_manager.clone(),
229 opening_region_keeper.clone(),
230 None,
231 );
232
233 handler.handle(&req, ctx, acc).await.unwrap();
234
235 assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
236 assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
237 assert_eq!(
238 acc.region_lease.as_ref().unwrap().closeable_region_ids,
239 vec![another_region_id]
240 );
241
242 let acc = &mut HeartbeatAccumulator::default();
243
244 acc.stat = Some(Stat {
245 id: follower_peer.id,
246 region_stats: vec![
247 new_empty_region_stat(region_id, RegionRole::Follower),
248 new_empty_region_stat(another_region_id, RegionRole::Follower),
249 ],
250 ..Default::default()
251 });
252
253 handler.handle(&req, ctx, acc).await.unwrap();
254
255 assert_eq!(
256 acc.region_lease.as_ref().unwrap().lease_seconds,
257 distributed_time_constants::REGION_LEASE_SECS
258 );
259
260 assert_region_lease(
261 acc,
262 vec![GrantedRegion::new(region_id, RegionRole::Follower)],
263 );
264 assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
265 assert_eq!(
266 acc.region_lease.as_ref().unwrap().closeable_region_ids,
267 vec![another_region_id]
268 );
269
270 let opening_region_id = RegionId::new(table_id, region_number + 2);
271 let _guard = opening_region_keeper
272 .register(follower_peer.id, opening_region_id)
273 .unwrap();
274
275 let acc = &mut HeartbeatAccumulator::default();
276
277 acc.stat = Some(Stat {
278 id: follower_peer.id,
279 region_stats: vec![
280 new_empty_region_stat(region_id, RegionRole::Follower),
281 new_empty_region_stat(another_region_id, RegionRole::Follower),
282 new_empty_region_stat(opening_region_id, RegionRole::Follower),
283 ],
284 ..Default::default()
285 });
286
287 handler.handle(&req, ctx, acc).await.unwrap();
288
289 assert_eq!(
290 acc.region_lease.as_ref().unwrap().lease_seconds,
291 distributed_time_constants::REGION_LEASE_SECS
292 );
293
294 assert_region_lease(
295 acc,
296 vec![
297 GrantedRegion::new(region_id, RegionRole::Follower),
298 GrantedRegion::new(opening_region_id, RegionRole::Follower),
299 ],
300 );
301 assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
302 assert_eq!(
303 acc.region_lease.as_ref().unwrap().closeable_region_ids,
304 vec![another_region_id]
305 );
306 }
307
308 #[tokio::test]
309
310 async fn test_handle_downgradable_leader() {
311 let datanode_id = 1;
312 let region_number = 1u32;
313 let table_id = 10;
314 let region_id = RegionId::new(table_id, region_number);
315 let another_region_id = RegionId::new(table_id, region_number + 1);
316 let no_exist_region_id = RegionId::new(table_id, region_number + 2);
317 let peer = Peer::empty(datanode_id);
318 let follower_peer = Peer::empty(datanode_id + 1);
319 let table_info = new_test_table_info(table_id, vec![region_number]).into();
320
321 let region_routes = vec![
322 RegionRoute {
323 region: Region::new_test(region_id),
324 leader_peer: Some(peer.clone()),
325 follower_peers: vec![follower_peer.clone()],
326 leader_state: Some(LeaderState::Downgrading),
327 leader_down_since: Some(1),
328 },
329 RegionRoute {
330 region: Region::new_test(another_region_id),
331 leader_peer: Some(peer.clone()),
332 ..Default::default()
333 },
334 ];
335
336 let keeper = new_test_keeper();
337 let table_metadata_manager = keeper.table_metadata_manager();
338
339 table_metadata_manager
340 .create_table_metadata(
341 table_info,
342 TableRouteValue::physical(region_routes),
343 HashMap::default(),
344 )
345 .await
346 .unwrap();
347
348 let builder = MetasrvBuilder::new();
349 let metasrv = builder.build().await.unwrap();
350 let ctx = &mut metasrv.new_ctx();
351
352 let req = HeartbeatRequest {
353 duration_since_epoch: 1234,
354 ..Default::default()
355 };
356
357 let acc = &mut HeartbeatAccumulator::default();
358
359 acc.stat = Some(Stat {
360 id: peer.id,
361 region_stats: vec![
362 new_empty_region_stat(region_id, RegionRole::Leader),
363 new_empty_region_stat(another_region_id, RegionRole::Leader),
364 new_empty_region_stat(no_exist_region_id, RegionRole::Leader),
365 ],
366 ..Default::default()
367 });
368
369 let handler = RegionLeaseHandler::new(
370 distributed_time_constants::REGION_LEASE_SECS,
371 table_metadata_manager.clone(),
372 Default::default(),
373 None,
374 );
375
376 handler.handle(&req, ctx, acc).await.unwrap();
377
378 assert_region_lease(
379 acc,
380 vec![
381 GrantedRegion::new(region_id, RegionRole::DowngradingLeader),
382 GrantedRegion::new(another_region_id, RegionRole::Leader),
383 ],
384 );
385 assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id]));
386 }
387
388 fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {
389 let region_lease = acc.region_lease.as_ref().unwrap().clone();
390 let granted: Vec<GrantedRegion> = region_lease
391 .regions
392 .into_iter()
393 .map(Into::into)
394 .collect::<Vec<_>>();
395
396 let granted = granted
397 .into_iter()
398 .map(|region| (region.region_id, region))
399 .collect::<HashMap<_, _>>();
400
401 let expected = expected
402 .into_iter()
403 .map(|region| (region.region_id, region))
404 .collect::<HashMap<_, _>>();
405
406 assert_eq!(granted, expected);
407 }
408}