1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_meta::key::table_route::TableRouteValue;
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::region_keeper::MemoryRegionKeeperRef;
21use common_meta::rpc::router::RegionRoute;
22use common_meta::DatanodeId;
23use common_telemetry::warn;
24use snafu::ResultExt;
25use store_api::region_engine::RegionRole;
26use store_api::storage::{RegionId, TableId};
27
28use crate::error::{self, Result};
29
30pub type RegionLeaseKeeperRef = Arc<RegionLeaseKeeper>;
31
32pub struct RegionLeaseKeeper {
34 table_metadata_manager: TableMetadataManagerRef,
35 memory_region_keeper: MemoryRegionKeeperRef,
36}
37
38pub struct RenewRegionLeasesResponse {
41 pub renewed: HashMap<RegionId, RegionLeaseInfo>,
42 pub non_exists: HashSet<RegionId>,
43}
44
45impl RegionLeaseKeeper {
46 pub fn new(
47 table_metadata_manager: TableMetadataManagerRef,
48 memory_region_keeper: MemoryRegionKeeperRef,
49 ) -> Self {
50 Self {
51 table_metadata_manager,
52 memory_region_keeper,
53 }
54 }
55}
56
57fn renew_region_lease_via_region_route(
58 region_route: &RegionRoute,
59 datanode_id: DatanodeId,
60 region_id: RegionId,
61) -> Option<(RegionId, RegionRole)> {
62 if let Some(leader) = ®ion_route.leader_peer {
64 if leader.id == datanode_id {
65 let region_role = if region_route.is_leader_downgrading() {
66 RegionRole::DowngradingLeader
67 } else {
68 RegionRole::Leader
69 };
70
71 return Some((region_id, region_role));
72 }
73 }
74
75 if region_route
77 .follower_peers
78 .iter()
79 .any(|peer| peer.id == datanode_id)
80 {
81 return Some((region_id, RegionRole::Follower));
82 }
83
84 warn!(
85 "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region_routes: {:?}",
86 region_route
87 );
88 None
90}
91
92#[derive(Debug, PartialEq, Eq)]
94pub struct RegionLeaseInfo {
95 pub region_id: RegionId,
96 pub is_operating: bool,
100 pub role: RegionRole,
102}
103
104impl RegionLeaseInfo {
105 pub fn operating(region_id: RegionId, role: RegionRole) -> Self {
107 Self {
108 region_id,
109 is_operating: true,
110 role,
111 }
112 }
113}
114
115impl From<(RegionId, RegionRole)> for RegionLeaseInfo {
116 fn from((region_id, role): (RegionId, RegionRole)) -> Self {
117 Self {
118 region_id,
119 is_operating: false,
120 role,
121 }
122 }
123}
124
125impl RegionLeaseKeeper {
126 async fn collect_table_metadata(
127 &self,
128 table_ids: &[TableId],
129 ) -> Result<HashMap<TableId, TableRouteValue>> {
130 let table_route_manager = self.table_metadata_manager.table_route_manager();
131
132 let table_routes = table_route_manager
135 .table_route_storage()
136 .batch_get(table_ids)
137 .await
138 .context(error::TableMetadataManagerSnafu)?;
139
140 let metadata_subset = table_routes
141 .into_iter()
142 .zip(table_ids)
143 .filter_map(|(route, id)| route.map(|route| (*id, route)))
144 .collect::<HashMap<_, _>>();
145
146 Ok(metadata_subset)
147 }
148
149 fn renew_region_lease(
153 &self,
154 table_metadata: &HashMap<TableId, TableRouteValue>,
155 operating_regions: &HashSet<RegionId>,
156 datanode_id: DatanodeId,
157 region_id: RegionId,
158 role: RegionRole,
159 ) -> Option<RegionLeaseInfo> {
160 if operating_regions.contains(®ion_id) {
161 let region_lease_info = RegionLeaseInfo::operating(region_id, role);
162 return Some(region_lease_info);
163 }
164
165 if let Some(table_route) = table_metadata.get(®ion_id.table_id()) {
166 if let Ok(Some(region_route)) = table_route.region_route(region_id) {
167 return renew_region_lease_via_region_route(®ion_route, datanode_id, region_id)
168 .map(RegionLeaseInfo::from);
169 } else {
170 warn!(
171 "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region route is not found in table({})",
172 region_id.table_id()
173 );
174 }
175 } else {
176 warn!(
177 "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, table({}) is not found",
178 region_id.table_id()
179 );
180 }
181 None
182 }
183
184 async fn collect_metadata(
185 &self,
186 datanode_id: DatanodeId,
187 mut region_ids: HashSet<RegionId>,
188 ) -> Result<(HashMap<TableId, TableRouteValue>, HashSet<RegionId>)> {
189 let operating_regions = self
191 .memory_region_keeper
192 .extract_operating_regions(datanode_id, &mut region_ids);
193 let table_ids = region_ids
194 .into_iter()
195 .map(|region_id| region_id.table_id())
196 .collect::<HashSet<_>>()
197 .into_iter()
198 .collect::<Vec<_>>();
199 let table_metadata = self.collect_table_metadata(&table_ids).await?;
200 Ok((table_metadata, operating_regions))
201 }
202
203 pub async fn renew_region_leases(
212 &self,
213 datanode_id: DatanodeId,
214 regions: &[(RegionId, RegionRole)],
215 ) -> Result<RenewRegionLeasesResponse> {
216 let region_ids = regions
217 .iter()
218 .map(|(region_id, _)| *region_id)
219 .collect::<HashSet<_>>();
220 let (table_metadata, operating_regions) =
221 self.collect_metadata(datanode_id, region_ids).await?;
222 let mut renewed = HashMap::new();
223 let mut non_exists = HashSet::new();
224
225 for &(region, role) in regions {
226 match self.renew_region_lease(
227 &table_metadata,
228 &operating_regions,
229 datanode_id,
230 region,
231 role,
232 ) {
233 Some(region_lease_info) => {
234 renewed.insert(region_lease_info.region_id, region_lease_info);
235 }
236 None => {
237 non_exists.insert(region);
238 }
239 }
240 }
241
242 Ok(RenewRegionLeasesResponse {
243 renewed,
244 non_exists,
245 })
246 }
247
248 #[cfg(test)]
249 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
250 &self.table_metadata_manager
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use std::collections::{HashMap, HashSet};
257 use std::sync::Arc;
258
259 use common_meta::key::table_route::{LogicalTableRouteValue, TableRouteValue};
260 use common_meta::key::test_utils::new_test_table_info;
261 use common_meta::key::TableMetadataManager;
262 use common_meta::kv_backend::memory::MemoryKvBackend;
263 use common_meta::peer::Peer;
264 use common_meta::region_keeper::MemoryRegionKeeper;
265 use common_meta::rpc::router::{LeaderState, Region, RegionRouteBuilder};
266 use store_api::region_engine::RegionRole;
267 use store_api::storage::RegionId;
268 use table::metadata::RawTableInfo;
269
270 use super::{renew_region_lease_via_region_route, RegionLeaseKeeper};
271 use crate::region::lease_keeper::{RegionLeaseInfo, RenewRegionLeasesResponse};
272
273 fn new_test_keeper() -> RegionLeaseKeeper {
274 let store = Arc::new(MemoryKvBackend::new());
275
276 let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
277
278 let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
279 RegionLeaseKeeper::new(table_metadata_manager, opening_region_keeper)
280 }
281
282 #[test]
283 fn test_renew_region_lease_via_region_route() {
284 let region_id = RegionId::new(1024, 1);
285 let leader_peer_id = 1024;
286 let follower_peer_id = 2048;
287 let mut region_route = RegionRouteBuilder::default()
288 .region(Region::new_test(region_id))
289 .leader_peer(Peer::empty(leader_peer_id))
290 .follower_peers(vec![Peer::empty(follower_peer_id)])
291 .build()
292 .unwrap();
293
294 for region_id in [RegionId::new(1024, 2), region_id] {
296 assert!(renew_region_lease_via_region_route(®ion_route, 1, region_id).is_none());
297 }
298
299 assert_eq!(
301 renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id),
302 Some((region_id, RegionRole::Leader))
303 );
304 assert_eq!(
306 renew_region_lease_via_region_route(®ion_route, follower_peer_id, region_id),
307 Some((region_id, RegionRole::Follower))
308 );
309
310 region_route.leader_state = Some(LeaderState::Downgrading);
311 assert_eq!(
313 renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id),
314 Some((region_id, RegionRole::DowngradingLeader))
315 );
316 }
317
318 #[tokio::test]
319 async fn test_renew_region_leases_non_exists_regions() {
320 let keeper = new_test_keeper();
321
322 let RenewRegionLeasesResponse {
323 non_exists,
324 renewed,
325 } = keeper
326 .renew_region_leases(
327 1,
328 &[
329 (RegionId::new(1024, 1), RegionRole::Follower),
330 (RegionId::new(1024, 2), RegionRole::Leader),
331 ],
332 )
333 .await
334 .unwrap();
335
336 assert!(renewed.is_empty());
337 assert_eq!(
338 non_exists,
339 HashSet::from([RegionId::new(1024, 1), RegionId::new(1024, 2)])
340 );
341 }
342
343 #[tokio::test]
344 async fn test_collect_metadata() {
345 let region_number = 1u32;
346 let table_id = 1024;
347 let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
348
349 let region_id = RegionId::new(table_id, 1);
350 let leader_peer_id = 1024;
351 let follower_peer_id = 2048;
352 let region_route = RegionRouteBuilder::default()
353 .region(Region::new_test(region_id))
354 .leader_peer(Peer::empty(leader_peer_id))
355 .follower_peers(vec![Peer::empty(follower_peer_id)])
356 .build()
357 .unwrap();
358
359 let keeper = new_test_keeper();
360 let table_metadata_manager = keeper.table_metadata_manager();
361 table_metadata_manager
362 .create_table_metadata(
363 table_info,
364 TableRouteValue::physical(vec![region_route]),
365 HashMap::default(),
366 )
367 .await
368 .unwrap();
369 let opening_region_id = RegionId::new(1025, 1);
370 let _guard = keeper
371 .memory_region_keeper
372 .register(leader_peer_id, opening_region_id)
373 .unwrap();
374 let another_opening_region_id = RegionId::new(1025, 2);
375 let _guard2 = keeper
376 .memory_region_keeper
377 .register(follower_peer_id, another_opening_region_id)
378 .unwrap();
379
380 let (metadata, regions) = keeper
381 .collect_metadata(
382 leader_peer_id,
383 HashSet::from([region_id, opening_region_id]),
384 )
385 .await
386 .unwrap();
387 assert_eq!(
388 metadata.keys().cloned().collect::<Vec<_>>(),
389 vec![region_id.table_id()]
390 );
391 assert!(regions.contains(&opening_region_id));
392 assert_eq!(regions.len(), 1);
393 }
394
395 #[tokio::test]
396 async fn test_renew_region_leases_basic() {
397 let region_number = 1u32;
398 let table_id = 1024;
399 let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
400
401 let region_id = RegionId::new(table_id, 1);
402 let leader_peer_id = 1024;
403 let follower_peer_id = 2048;
404 let region_route = RegionRouteBuilder::default()
405 .region(Region::new_test(region_id))
406 .leader_peer(Peer::empty(leader_peer_id))
407 .follower_peers(vec![Peer::empty(follower_peer_id)])
408 .build()
409 .unwrap();
410
411 let keeper = new_test_keeper();
412 let table_metadata_manager = keeper.table_metadata_manager();
413 table_metadata_manager
414 .create_table_metadata(
415 table_info,
416 TableRouteValue::physical(vec![region_route]),
417 HashMap::default(),
418 )
419 .await
420 .unwrap();
421
422 for region_id in [RegionId::new(1024, 2), region_id] {
424 let RenewRegionLeasesResponse {
425 non_exists,
426 renewed,
427 } = keeper
428 .renew_region_leases(1, &[(region_id, RegionRole::Follower)])
429 .await
430 .unwrap();
431 assert!(renewed.is_empty());
432 assert_eq!(non_exists, HashSet::from([region_id]));
433 }
434
435 for role in [RegionRole::Leader, RegionRole::Follower] {
437 let RenewRegionLeasesResponse {
438 non_exists,
439 renewed,
440 } = keeper
441 .renew_region_leases(leader_peer_id, &[(region_id, role)])
442 .await
443 .unwrap();
444
445 assert!(non_exists.is_empty());
446 assert_eq!(
447 renewed,
448 HashMap::from([(
449 region_id,
450 RegionLeaseInfo::from((region_id, RegionRole::Leader))
451 )])
452 );
453 }
454
455 for role in [RegionRole::Leader, RegionRole::Follower] {
457 let RenewRegionLeasesResponse {
458 non_exists,
459 renewed,
460 } = keeper
461 .renew_region_leases(follower_peer_id, &[(region_id, role)])
462 .await
463 .unwrap();
464
465 assert!(non_exists.is_empty());
466 assert_eq!(
467 renewed,
468 HashMap::from([(
469 region_id,
470 RegionLeaseInfo::from((region_id, RegionRole::Follower))
471 )])
472 );
473 }
474
475 let opening_region_id = RegionId::new(2048, 1);
476 let _guard = keeper
477 .memory_region_keeper
478 .register(leader_peer_id, opening_region_id)
479 .unwrap();
480
481 for role in [RegionRole::Leader, RegionRole::Follower] {
484 let RenewRegionLeasesResponse {
485 non_exists,
486 renewed,
487 } = keeper
488 .renew_region_leases(leader_peer_id, &[(opening_region_id, role)])
489 .await
490 .unwrap();
491
492 assert!(non_exists.is_empty());
493 assert_eq!(
494 renewed,
495 HashMap::from([(
496 opening_region_id,
497 RegionLeaseInfo::operating(opening_region_id, role)
498 )])
499 );
500 }
501 }
502
503 #[tokio::test]
504 async fn test_renew_unexpected_logic_table() {
505 let region_number = 1u32;
506 let table_id = 1024;
507 let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
508
509 let region_id = RegionId::new(table_id, 1);
510 let keeper = new_test_keeper();
511 let table_metadata_manager = keeper.table_metadata_manager();
512 table_metadata_manager
513 .create_table_metadata(
514 table_info,
515 TableRouteValue::Logical(LogicalTableRouteValue::new(table_id, vec![region_id])),
516 HashMap::default(),
517 )
518 .await
519 .unwrap();
520
521 for region_id in [region_id, RegionId::new(1024, 2)] {
522 let RenewRegionLeasesResponse {
523 non_exists,
524 renewed,
525 } = keeper
526 .renew_region_leases(
527 1,
528 &[
529 (region_id, RegionRole::Follower),
530 (region_id, RegionRole::Leader),
531 ],
532 )
533 .await
534 .unwrap();
535 assert!(renewed.is_empty());
536 assert_eq!(non_exists, HashSet::from([region_id]));
537 }
538 }
539
540 #[tokio::test]
541 async fn test_renew_region_leases_with_downgrade_leader() {
542 let region_number = 1u32;
543 let table_id = 1024;
544 let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
545
546 let region_id = RegionId::new(table_id, 1);
547 let leader_peer_id = 1024;
548 let follower_peer_id = 2048;
549 let region_route = RegionRouteBuilder::default()
550 .region(Region::new_test(region_id))
551 .leader_peer(Peer::empty(leader_peer_id))
552 .follower_peers(vec![Peer::empty(follower_peer_id)])
553 .leader_state(LeaderState::Downgrading)
554 .build()
555 .unwrap();
556
557 let keeper = new_test_keeper();
558 let table_metadata_manager = keeper.table_metadata_manager();
559 table_metadata_manager
560 .create_table_metadata(
561 table_info,
562 TableRouteValue::physical(vec![region_route]),
563 HashMap::default(),
564 )
565 .await
566 .unwrap();
567
568 for role in [RegionRole::Leader, RegionRole::Follower] {
570 let RenewRegionLeasesResponse {
571 non_exists,
572 renewed,
573 } = keeper
574 .renew_region_leases(follower_peer_id, &[(region_id, role)])
575 .await
576 .unwrap();
577
578 assert!(non_exists.is_empty());
579 assert_eq!(
580 renewed,
581 HashMap::from([(
582 region_id,
583 RegionLeaseInfo::from((region_id, RegionRole::Follower))
584 )])
585 );
586 }
587 }
588}