1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_meta::DatanodeId;
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::key::table_route::TableRouteValue;
21use common_meta::region_keeper::MemoryRegionKeeperRef;
22use common_meta::rpc::router::RegionRoute;
23use common_telemetry::{info, warn};
24use snafu::ResultExt;
25use store_api::region_engine::RegionRole;
26use store_api::storage::{RegionId, TableId};
27
28use crate::error::{self, Result};
29
30pub type RegionLeaseKeeperRef = Arc<RegionLeaseKeeper>;
31
32pub struct RegionLeaseKeeper {
34 table_metadata_manager: TableMetadataManagerRef,
35 memory_region_keeper: MemoryRegionKeeperRef,
36}
37
38pub struct RenewRegionLeasesResponse {
41 pub renewed: HashMap<RegionId, RegionLeaseInfo>,
42 pub non_exists: HashSet<RegionId>,
43}
44
45impl RegionLeaseKeeper {
46 pub fn new(
47 table_metadata_manager: TableMetadataManagerRef,
48 memory_region_keeper: MemoryRegionKeeperRef,
49 ) -> Self {
50 Self {
51 table_metadata_manager,
52 memory_region_keeper,
53 }
54 }
55}
56
57fn renew_region_lease_via_region_route(
58 region_route: &RegionRoute,
59 datanode_id: DatanodeId,
60 region_id: RegionId,
61) -> Option<(RegionId, RegionRole)> {
62 if let Some(leader) = ®ion_route.leader_peer
64 && leader.id == datanode_id
65 {
66 return region_route
67 .leader_region_role()
68 .map(|region_role| (region_id, region_role));
69 }
70
71 if region_route
73 .follower_peers
74 .iter()
75 .any(|peer| peer.id == datanode_id)
76 {
77 return Some((region_id, RegionRole::Follower));
78 }
79
80 None
81}
82
83fn renew_region_lease_via_operating_regions(
84 operating_regions: &HashMap<RegionId, RegionRole>,
85 datanode_id: DatanodeId,
86 region_id: RegionId,
87 reported_role: RegionRole,
88) -> Option<RegionLeaseInfo> {
89 if let Some(role) = operating_regions.get(®ion_id) {
92 let region_lease_info = RegionLeaseInfo::operating(region_id, *role);
93 if *role != reported_role {
94 info!(
95 "The region {} on datanode {} is operating with role {:?}, but reported as {:?}",
96 region_id, datanode_id, role, reported_role
97 );
98 }
99 return Some(region_lease_info);
100 }
101
102 None
103}
104
105#[derive(Debug, PartialEq, Eq)]
107pub struct RegionLeaseInfo {
108 pub region_id: RegionId,
109 pub is_operating: bool,
113 pub role: RegionRole,
115}
116
117impl RegionLeaseInfo {
118 pub fn operating(region_id: RegionId, role: RegionRole) -> Self {
120 Self {
121 region_id,
122 is_operating: true,
123 role,
124 }
125 }
126}
127
128impl From<(RegionId, RegionRole)> for RegionLeaseInfo {
129 fn from((region_id, role): (RegionId, RegionRole)) -> Self {
130 Self {
131 region_id,
132 is_operating: false,
133 role,
134 }
135 }
136}
137
138impl RegionLeaseKeeper {
139 async fn collect_table_metadata(
140 &self,
141 table_ids: &[TableId],
142 ) -> Result<HashMap<TableId, TableRouteValue>> {
143 let table_route_manager = self.table_metadata_manager.table_route_manager();
144
145 let table_routes = table_route_manager
148 .table_route_storage()
149 .batch_get(table_ids)
150 .await
151 .context(error::TableMetadataManagerSnafu)?;
152
153 let metadata_subset = table_routes
154 .into_iter()
155 .zip(table_ids)
156 .filter_map(|(route, id)| route.map(|route| (*id, route)))
157 .collect::<HashMap<_, _>>();
158
159 Ok(metadata_subset)
160 }
161
162 fn renew_region_lease(
166 &self,
167 table_metadata: &HashMap<TableId, TableRouteValue>,
168 operating_regions: &HashMap<RegionId, RegionRole>,
169 datanode_id: DatanodeId,
170 region_id: RegionId,
171 reported_role: RegionRole,
172 ) -> Option<RegionLeaseInfo> {
173 if let Some(table_route) = table_metadata.get(®ion_id.table_id())
175 && let Ok(Some(region_route)) = table_route.region_route(region_id)
176 && let Some(region_lease) =
177 renew_region_lease_via_region_route(®ion_route, datanode_id, region_id)
178 {
179 return Some(RegionLeaseInfo::from(region_lease));
180 }
181 if let Some(region_lease_info) = renew_region_lease_via_operating_regions(
183 operating_regions,
184 datanode_id,
185 region_id,
186 reported_role,
187 ) {
188 return Some(region_lease_info);
189 }
190
191 warn!(
192 "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, no matching metadata or operating region found",
193 );
194 None
195 }
196
197 async fn collect_metadata(
198 &self,
199 datanode_id: DatanodeId,
200 region_ids: HashSet<RegionId>,
201 ) -> Result<(
202 HashMap<TableId, TableRouteValue>,
203 HashMap<RegionId, RegionRole>,
204 )> {
205 let operating_regions = self
206 .memory_region_keeper
207 .extract_operating_region_roles(datanode_id, ®ion_ids);
208 let table_ids = region_ids
209 .into_iter()
210 .map(|region_id| region_id.table_id())
211 .collect::<HashSet<_>>()
212 .into_iter()
213 .collect::<Vec<_>>();
214 let table_metadata = self.collect_table_metadata(&table_ids).await?;
215 Ok((table_metadata, operating_regions))
216 }
217
218 pub async fn renew_region_leases(
227 &self,
228 datanode_id: DatanodeId,
229 regions: &[(RegionId, RegionRole)],
230 ) -> Result<RenewRegionLeasesResponse> {
231 let region_ids = regions
232 .iter()
233 .map(|(region_id, _)| *region_id)
234 .collect::<HashSet<_>>();
235 let (table_metadata, operating_regions) =
236 self.collect_metadata(datanode_id, region_ids).await?;
237 let mut renewed = HashMap::new();
238 let mut non_exists = HashSet::new();
239
240 for &(region, reported_role) in regions {
241 match self.renew_region_lease(
242 &table_metadata,
243 &operating_regions,
244 datanode_id,
245 region,
246 reported_role,
247 ) {
248 Some(region_lease_info) => {
249 renewed.insert(region_lease_info.region_id, region_lease_info);
250 }
251 None => {
252 non_exists.insert(region);
253 }
254 }
255 }
256
257 Ok(RenewRegionLeasesResponse {
258 renewed,
259 non_exists,
260 })
261 }
262
263 #[cfg(test)]
264 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
265 &self.table_metadata_manager
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use std::collections::{HashMap, HashSet};
272 use std::sync::Arc;
273
274 use common_meta::key::TableMetadataManager;
275 use common_meta::key::table_route::{LogicalTableRouteValue, TableRouteValue};
276 use common_meta::key::test_utils::new_test_table_info;
277 use common_meta::kv_backend::memory::MemoryKvBackend;
278 use common_meta::peer::Peer;
279 use common_meta::region_keeper::MemoryRegionKeeper;
280 use common_meta::rpc::router::{LeaderState, Region, RegionRouteBuilder};
281 use store_api::region_engine::RegionRole;
282 use store_api::storage::RegionId;
283 use table::metadata::TableInfo;
284
285 use super::{RegionLeaseKeeper, renew_region_lease_via_region_route};
286 use crate::region::lease_keeper::{RegionLeaseInfo, RenewRegionLeasesResponse};
287
288 fn new_test_keeper() -> RegionLeaseKeeper {
289 let store = Arc::new(MemoryKvBackend::new());
290
291 let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
292
293 let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
294 RegionLeaseKeeper::new(table_metadata_manager, opening_region_keeper)
295 }
296
297 #[test]
298 fn test_renew_region_lease_via_region_route() {
299 let region_id = RegionId::new(1024, 1);
300 let leader_peer_id = 1024;
301 let follower_peer_id = 2048;
302 let mut region_route = RegionRouteBuilder::default()
303 .region(Region::new_test(region_id))
304 .leader_peer(Peer::empty(leader_peer_id))
305 .follower_peers(vec![Peer::empty(follower_peer_id)])
306 .build()
307 .unwrap();
308
309 for region_id in [RegionId::new(1024, 2), region_id] {
311 assert!(renew_region_lease_via_region_route(®ion_route, 1, region_id).is_none());
312 }
313
314 assert_eq!(
316 renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id),
317 Some((region_id, RegionRole::Leader))
318 );
319 assert_eq!(
321 renew_region_lease_via_region_route(®ion_route, follower_peer_id, region_id),
322 Some((region_id, RegionRole::Follower))
323 );
324
325 region_route.leader_state = Some(LeaderState::Downgrading);
326 assert_eq!(
328 renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id),
329 Some((region_id, RegionRole::DowngradingLeader))
330 );
331
332 region_route.leader_state = Some(LeaderState::Staging);
333 assert_eq!(
334 renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id),
335 Some((region_id, RegionRole::StagingLeader))
336 );
337 }
338
339 #[tokio::test]
340 async fn test_renew_region_leases_non_exists_regions() {
341 let keeper = new_test_keeper();
342
343 let RenewRegionLeasesResponse {
344 non_exists,
345 renewed,
346 } = keeper
347 .renew_region_leases(
348 1,
349 &[
350 (RegionId::new(1024, 1), RegionRole::Follower),
351 (RegionId::new(1024, 2), RegionRole::Leader),
352 ],
353 )
354 .await
355 .unwrap();
356
357 assert!(renewed.is_empty());
358 assert_eq!(
359 non_exists,
360 HashSet::from([RegionId::new(1024, 1), RegionId::new(1024, 2)])
361 );
362 }
363
364 #[tokio::test]
365 async fn test_collect_metadata() {
366 let table_id = 1024;
367 let table_info: TableInfo = new_test_table_info(table_id);
368
369 let region_id = RegionId::new(table_id, 1);
370 let leader_peer_id = 1024;
371 let follower_peer_id = 2048;
372 let region_route = RegionRouteBuilder::default()
373 .region(Region::new_test(region_id))
374 .leader_peer(Peer::empty(leader_peer_id))
375 .follower_peers(vec![Peer::empty(follower_peer_id)])
376 .build()
377 .unwrap();
378
379 let keeper = new_test_keeper();
380 let table_metadata_manager = keeper.table_metadata_manager();
381 table_metadata_manager
382 .create_table_metadata(
383 table_info,
384 TableRouteValue::physical(vec![region_route]),
385 HashMap::default(),
386 )
387 .await
388 .unwrap();
389 let opening_region_id = RegionId::new(1025, 1);
390 let _guard = keeper
391 .memory_region_keeper
392 .register_with_role(leader_peer_id, opening_region_id, RegionRole::Leader)
393 .unwrap();
394 let another_opening_region_id = RegionId::new(1025, 2);
395 let _guard2 = keeper
396 .memory_region_keeper
397 .register_with_role(
398 follower_peer_id,
399 another_opening_region_id,
400 RegionRole::Follower,
401 )
402 .unwrap();
403
404 let (metadata, regions) = keeper
405 .collect_metadata(
406 leader_peer_id,
407 HashSet::from([region_id, opening_region_id]),
408 )
409 .await
410 .unwrap();
411 assert_eq!(
412 metadata.keys().cloned().collect::<Vec<_>>(),
413 vec![region_id.table_id()]
414 );
415 assert_eq!(
416 regions,
417 HashMap::from([(opening_region_id, RegionRole::Leader)])
418 );
419 }
420
421 #[tokio::test]
422 async fn test_renew_region_leases_basic() {
423 let table_id = 1024;
424 let table_info: TableInfo = new_test_table_info(table_id);
425
426 let region_id = RegionId::new(table_id, 1);
427 let leader_peer_id = 1024;
428 let follower_peer_id = 2048;
429 let region_route = RegionRouteBuilder::default()
430 .region(Region::new_test(region_id))
431 .leader_peer(Peer::empty(leader_peer_id))
432 .follower_peers(vec![Peer::empty(follower_peer_id)])
433 .build()
434 .unwrap();
435
436 let keeper = new_test_keeper();
437 let table_metadata_manager = keeper.table_metadata_manager();
438 table_metadata_manager
439 .create_table_metadata(
440 table_info,
441 TableRouteValue::physical(vec![region_route]),
442 HashMap::default(),
443 )
444 .await
445 .unwrap();
446
447 for region_id in [RegionId::new(1024, 2), region_id] {
449 let RenewRegionLeasesResponse {
450 non_exists,
451 renewed,
452 } = keeper
453 .renew_region_leases(1, &[(region_id, RegionRole::Follower)])
454 .await
455 .unwrap();
456 assert!(renewed.is_empty());
457 assert_eq!(non_exists, HashSet::from([region_id]));
458 }
459
460 for role in [RegionRole::Leader, RegionRole::Follower] {
462 let RenewRegionLeasesResponse {
463 non_exists,
464 renewed,
465 } = keeper
466 .renew_region_leases(leader_peer_id, &[(region_id, role)])
467 .await
468 .unwrap();
469
470 assert!(non_exists.is_empty());
471 assert_eq!(
472 renewed,
473 HashMap::from([(
474 region_id,
475 RegionLeaseInfo::from((region_id, RegionRole::Leader))
476 )])
477 );
478 }
479
480 for role in [RegionRole::Leader, RegionRole::Follower] {
482 let RenewRegionLeasesResponse {
483 non_exists,
484 renewed,
485 } = keeper
486 .renew_region_leases(follower_peer_id, &[(region_id, role)])
487 .await
488 .unwrap();
489
490 assert!(non_exists.is_empty());
491 assert_eq!(
492 renewed,
493 HashMap::from([(
494 region_id,
495 RegionLeaseInfo::from((region_id, RegionRole::Follower))
496 )])
497 );
498 }
499
500 let opening_region_id = RegionId::new(2048, 1);
501 let _guard = keeper
502 .memory_region_keeper
503 .register_with_role(leader_peer_id, opening_region_id, RegionRole::Leader)
504 .unwrap();
505
506 for reported_role in [RegionRole::Leader, RegionRole::Follower] {
509 let RenewRegionLeasesResponse {
510 non_exists,
511 renewed,
512 } = keeper
513 .renew_region_leases(leader_peer_id, &[(opening_region_id, reported_role)])
514 .await
515 .unwrap();
516
517 assert!(non_exists.is_empty());
518 assert_eq!(
519 renewed,
520 HashMap::from([(
521 opening_region_id,
522 RegionLeaseInfo::operating(opening_region_id, RegionRole::Leader)
523 )])
524 );
525 }
526 }
527
528 #[tokio::test]
529 async fn test_renew_unexpected_logic_table() {
530 let table_id = 1024;
531 let table_info: TableInfo = new_test_table_info(table_id);
532
533 let region_id = RegionId::new(table_id, 1);
534 let keeper = new_test_keeper();
535 let table_metadata_manager = keeper.table_metadata_manager();
536 table_metadata_manager
537 .create_table_metadata(
538 table_info,
539 TableRouteValue::Logical(LogicalTableRouteValue::new(table_id)),
540 HashMap::default(),
541 )
542 .await
543 .unwrap();
544
545 for region_id in [region_id, RegionId::new(1024, 2)] {
546 let RenewRegionLeasesResponse {
547 non_exists,
548 renewed,
549 } = keeper
550 .renew_region_leases(
551 1,
552 &[
553 (region_id, RegionRole::Follower),
554 (region_id, RegionRole::Leader),
555 ],
556 )
557 .await
558 .unwrap();
559 assert!(renewed.is_empty());
560 assert_eq!(non_exists, HashSet::from([region_id]));
561 }
562 }
563
564 #[tokio::test]
565 async fn test_renew_region_leases_with_downgrade_leader() {
566 let table_id = 1024;
567 let table_info: TableInfo = new_test_table_info(table_id);
568
569 let region_id = RegionId::new(table_id, 1);
570 let leader_peer_id = 1024;
571 let follower_peer_id = 2048;
572 let region_route = RegionRouteBuilder::default()
573 .region(Region::new_test(region_id))
574 .leader_peer(Peer::empty(leader_peer_id))
575 .follower_peers(vec![Peer::empty(follower_peer_id)])
576 .leader_state(LeaderState::Downgrading)
577 .build()
578 .unwrap();
579
580 let keeper = new_test_keeper();
581 let table_metadata_manager = keeper.table_metadata_manager();
582 table_metadata_manager
583 .create_table_metadata(
584 table_info,
585 TableRouteValue::physical(vec![region_route]),
586 HashMap::default(),
587 )
588 .await
589 .unwrap();
590
591 for role in [RegionRole::Leader, RegionRole::Follower] {
593 let RenewRegionLeasesResponse {
594 non_exists,
595 renewed,
596 } = keeper
597 .renew_region_leases(follower_peer_id, &[(region_id, role)])
598 .await
599 .unwrap();
600
601 assert!(non_exists.is_empty());
602 assert_eq!(
603 renewed,
604 HashMap::from([(
605 region_id,
606 RegionLeaseInfo::from((region_id, RegionRole::Follower))
607 )])
608 );
609 }
610 }
611
612 #[tokio::test]
613 async fn test_renew_region_leases_reported_staging_expected_leader() {
614 let table_id = 1024;
615 let table_info: TableInfo = new_test_table_info(table_id);
616
617 let region_id = RegionId::new(table_id, 1);
618 let leader_peer_id = 1024;
619 let region_route = RegionRouteBuilder::default()
620 .region(Region::new_test(region_id))
621 .leader_peer(Peer::empty(leader_peer_id))
622 .build()
623 .unwrap();
624
625 let keeper = new_test_keeper();
626 let table_metadata_manager = keeper.table_metadata_manager();
627 table_metadata_manager
628 .create_table_metadata(
629 table_info,
630 TableRouteValue::physical(vec![region_route]),
631 HashMap::default(),
632 )
633 .await
634 .unwrap();
635
636 let RenewRegionLeasesResponse {
637 non_exists,
638 renewed,
639 } = keeper
640 .renew_region_leases(leader_peer_id, &[(region_id, RegionRole::StagingLeader)])
641 .await
642 .unwrap();
643
644 assert!(non_exists.is_empty());
645 assert_eq!(
646 renewed,
647 HashMap::from([(
648 region_id,
649 RegionLeaseInfo::from((region_id, RegionRole::Leader))
650 )])
651 );
652 }
653
654 #[tokio::test]
655 async fn test_renew_region_leases_reported_staging_expected_staging() {
656 let table_id = 1024;
657 let table_info: TableInfo = new_test_table_info(table_id);
658
659 let region_id = RegionId::new(table_id, 1);
660 let leader_peer_id = 1024;
661 let region_route = RegionRouteBuilder::default()
662 .region(Region::new_test(region_id))
663 .leader_peer(Peer::empty(leader_peer_id))
664 .leader_state(LeaderState::Staging)
665 .build()
666 .unwrap();
667
668 let keeper = new_test_keeper();
669 let table_metadata_manager = keeper.table_metadata_manager();
670 table_metadata_manager
671 .create_table_metadata(
672 table_info,
673 TableRouteValue::physical(vec![region_route]),
674 HashMap::default(),
675 )
676 .await
677 .unwrap();
678
679 let RenewRegionLeasesResponse {
680 non_exists,
681 renewed,
682 } = keeper
683 .renew_region_leases(leader_peer_id, &[(region_id, RegionRole::StagingLeader)])
684 .await
685 .unwrap();
686
687 assert!(non_exists.is_empty());
688 assert_eq!(
689 renewed,
690 HashMap::from([(
691 region_id,
692 RegionLeaseInfo::from((region_id, RegionRole::StagingLeader))
693 )])
694 );
695 }
696
697 #[tokio::test]
698 async fn test_renew_region_leases_metadata_role_beats_keeper_role() {
699 let table_id = 2048;
700 let table_info: TableInfo = new_test_table_info(table_id);
701
702 let datanode_id = 1024;
703 let region_id = RegionId::new(table_id, 1);
704 let region_route = RegionRouteBuilder::default()
705 .region(Region::new_test(region_id))
706 .leader_peer(Peer::empty(datanode_id))
707 .build()
708 .unwrap();
709
710 let keeper = new_test_keeper();
711 let table_metadata_manager = keeper.table_metadata_manager();
712 table_metadata_manager
713 .create_table_metadata(
714 table_info,
715 TableRouteValue::physical(vec![region_route]),
716 HashMap::default(),
717 )
718 .await
719 .unwrap();
720
721 let _guard = keeper
722 .memory_region_keeper
723 .register_with_role(datanode_id, region_id, RegionRole::Follower)
724 .unwrap();
725
726 let RenewRegionLeasesResponse {
727 non_exists,
728 renewed,
729 } = keeper
730 .renew_region_leases(datanode_id, &[(region_id, RegionRole::Follower)])
731 .await
732 .unwrap();
733
734 assert!(non_exists.is_empty());
735 assert_eq!(
736 renewed,
737 HashMap::from([(
738 region_id,
739 RegionLeaseInfo::from((region_id, RegionRole::Leader))
740 )])
741 );
742 }
743
744 #[tokio::test]
745 async fn test_renew_region_leases_missing_route_falls_back_to_keeper_role() {
746 let table_id = 2048;
747 let table_info: TableInfo = new_test_table_info(table_id);
748
749 let datanode_id = 1024;
750 let region_id = RegionId::new(table_id, 1);
751 let another_region_id = RegionId::new(table_id, 2);
752 let region_route = RegionRouteBuilder::default()
753 .region(Region::new_test(another_region_id))
754 .leader_peer(Peer::empty(datanode_id))
755 .build()
756 .unwrap();
757
758 let keeper = new_test_keeper();
759 let table_metadata_manager = keeper.table_metadata_manager();
760 table_metadata_manager
761 .create_table_metadata(
762 table_info,
763 TableRouteValue::physical(vec![region_route]),
764 HashMap::default(),
765 )
766 .await
767 .unwrap();
768
769 let _guard = keeper
770 .memory_region_keeper
771 .register_with_role(datanode_id, region_id, RegionRole::DowngradingLeader)
772 .unwrap();
773
774 let RenewRegionLeasesResponse {
775 non_exists,
776 renewed,
777 } = keeper
778 .renew_region_leases(datanode_id, &[(region_id, RegionRole::StagingLeader)])
779 .await
780 .unwrap();
781
782 assert!(non_exists.is_empty());
783 assert_eq!(
784 renewed,
785 HashMap::from([(
786 region_id,
787 RegionLeaseInfo::operating(region_id, RegionRole::DowngradingLeader)
788 )])
789 );
790 }
791
792 #[tokio::test]
793 async fn test_renew_region_leases_operating_region_uses_keeper_role() {
794 let keeper = new_test_keeper();
795 let datanode_id = 1024;
796 let region_id = RegionId::new(2048, 1);
797
798 let _guard = keeper
799 .memory_region_keeper
800 .register_with_role(datanode_id, region_id, RegionRole::DowngradingLeader)
801 .unwrap();
802
803 let RenewRegionLeasesResponse {
804 non_exists,
805 renewed,
806 } = keeper
807 .renew_region_leases(datanode_id, &[(region_id, RegionRole::StagingLeader)])
808 .await
809 .unwrap();
810
811 assert!(non_exists.is_empty());
812 assert_eq!(
813 renewed,
814 HashMap::from([(
815 region_id,
816 RegionLeaseInfo::operating(region_id, RegionRole::DowngradingLeader)
817 )])
818 );
819 }
820}