1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_meta::DatanodeId;
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::key::table_route::TableRouteValue;
21use common_meta::region_keeper::MemoryRegionKeeperRef;
22use common_meta::rpc::router::RegionRoute;
23use common_telemetry::warn;
24use snafu::ResultExt;
25use store_api::region_engine::RegionRole;
26use store_api::storage::{RegionId, TableId};
27
28use crate::error::{self, Result};
29
30pub type RegionLeaseKeeperRef = Arc<RegionLeaseKeeper>;
31
32pub struct RegionLeaseKeeper {
34 table_metadata_manager: TableMetadataManagerRef,
35 memory_region_keeper: MemoryRegionKeeperRef,
36}
37
38pub struct RenewRegionLeasesResponse {
41 pub renewed: HashMap<RegionId, RegionLeaseInfo>,
42 pub non_exists: HashSet<RegionId>,
43}
44
45impl RegionLeaseKeeper {
46 pub fn new(
47 table_metadata_manager: TableMetadataManagerRef,
48 memory_region_keeper: MemoryRegionKeeperRef,
49 ) -> Self {
50 Self {
51 table_metadata_manager,
52 memory_region_keeper,
53 }
54 }
55}
56
57fn renew_region_lease_via_region_route(
58 region_route: &RegionRoute,
59 datanode_id: DatanodeId,
60 region_id: RegionId,
61) -> Option<(RegionId, RegionRole)> {
62 if let Some(leader) = ®ion_route.leader_peer
64 && leader.id == datanode_id
65 {
66 let region_role = if region_route.is_leader_downgrading() {
67 RegionRole::DowngradingLeader
68 } else {
69 RegionRole::Leader
70 };
71
72 return Some((region_id, region_role));
73 }
74
75 if region_route
77 .follower_peers
78 .iter()
79 .any(|peer| peer.id == datanode_id)
80 {
81 return Some((region_id, RegionRole::Follower));
82 }
83
84 warn!(
85 "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region_routes: {:?}",
86 region_route
87 );
88 None
90}
91
92#[derive(Debug, PartialEq, Eq)]
94pub struct RegionLeaseInfo {
95 pub region_id: RegionId,
96 pub is_operating: bool,
100 pub role: RegionRole,
102}
103
104impl RegionLeaseInfo {
105 pub fn operating(region_id: RegionId, role: RegionRole) -> Self {
107 Self {
108 region_id,
109 is_operating: true,
110 role,
111 }
112 }
113}
114
115impl From<(RegionId, RegionRole)> for RegionLeaseInfo {
116 fn from((region_id, role): (RegionId, RegionRole)) -> Self {
117 Self {
118 region_id,
119 is_operating: false,
120 role,
121 }
122 }
123}
124
125impl RegionLeaseKeeper {
126 async fn collect_table_metadata(
127 &self,
128 table_ids: &[TableId],
129 ) -> Result<HashMap<TableId, TableRouteValue>> {
130 let table_route_manager = self.table_metadata_manager.table_route_manager();
131
132 let table_routes = table_route_manager
135 .table_route_storage()
136 .batch_get(table_ids)
137 .await
138 .context(error::TableMetadataManagerSnafu)?;
139
140 let metadata_subset = table_routes
141 .into_iter()
142 .zip(table_ids)
143 .filter_map(|(route, id)| route.map(|route| (*id, route)))
144 .collect::<HashMap<_, _>>();
145
146 Ok(metadata_subset)
147 }
148
149 fn renew_region_lease(
153 &self,
154 table_metadata: &HashMap<TableId, TableRouteValue>,
155 operating_regions: &HashSet<RegionId>,
156 datanode_id: DatanodeId,
157 region_id: RegionId,
158 role: RegionRole,
159 ) -> Option<RegionLeaseInfo> {
160 if operating_regions.contains(®ion_id) {
161 let region_lease_info = RegionLeaseInfo::operating(region_id, role);
162 return Some(region_lease_info);
163 }
164
165 if let Some(table_route) = table_metadata.get(®ion_id.table_id()) {
166 if let Ok(Some(region_route)) = table_route.region_route(region_id) {
167 return renew_region_lease_via_region_route(®ion_route, datanode_id, region_id)
168 .map(RegionLeaseInfo::from);
169 } else {
170 warn!(
171 "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region route is not found in table({})",
172 region_id.table_id()
173 );
174 }
175 } else {
176 warn!(
177 "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, table({}) is not found",
178 region_id.table_id()
179 );
180 }
181 None
182 }
183
184 async fn collect_metadata(
185 &self,
186 datanode_id: DatanodeId,
187 mut region_ids: HashSet<RegionId>,
188 ) -> Result<(HashMap<TableId, TableRouteValue>, HashSet<RegionId>)> {
189 let operating_regions = self
191 .memory_region_keeper
192 .extract_operating_regions(datanode_id, &mut region_ids);
193 let table_ids = region_ids
194 .into_iter()
195 .map(|region_id| region_id.table_id())
196 .collect::<HashSet<_>>()
197 .into_iter()
198 .collect::<Vec<_>>();
199 let table_metadata = self.collect_table_metadata(&table_ids).await?;
200 Ok((table_metadata, operating_regions))
201 }
202
203 pub async fn renew_region_leases(
212 &self,
213 datanode_id: DatanodeId,
214 regions: &[(RegionId, RegionRole)],
215 ) -> Result<RenewRegionLeasesResponse> {
216 let region_ids = regions
217 .iter()
218 .map(|(region_id, _)| *region_id)
219 .collect::<HashSet<_>>();
220 let (table_metadata, operating_regions) =
221 self.collect_metadata(datanode_id, region_ids).await?;
222 let mut renewed = HashMap::new();
223 let mut non_exists = HashSet::new();
224
225 for &(region, role) in regions {
226 match self.renew_region_lease(
227 &table_metadata,
228 &operating_regions,
229 datanode_id,
230 region,
231 role,
232 ) {
233 Some(region_lease_info) => {
234 renewed.insert(region_lease_info.region_id, region_lease_info);
235 }
236 None => {
237 non_exists.insert(region);
238 }
239 }
240 }
241
242 Ok(RenewRegionLeasesResponse {
243 renewed,
244 non_exists,
245 })
246 }
247
248 #[cfg(test)]
249 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
250 &self.table_metadata_manager
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use std::collections::{HashMap, HashSet};
257 use std::sync::Arc;
258
259 use common_meta::key::TableMetadataManager;
260 use common_meta::key::table_route::{LogicalTableRouteValue, TableRouteValue};
261 use common_meta::key::test_utils::new_test_table_info;
262 use common_meta::kv_backend::memory::MemoryKvBackend;
263 use common_meta::peer::Peer;
264 use common_meta::region_keeper::MemoryRegionKeeper;
265 use common_meta::rpc::router::{LeaderState, Region, RegionRouteBuilder};
266 use store_api::region_engine::RegionRole;
267 use store_api::storage::RegionId;
268 use table::metadata::RawTableInfo;
269
270 use super::{RegionLeaseKeeper, renew_region_lease_via_region_route};
271 use crate::region::lease_keeper::{RegionLeaseInfo, RenewRegionLeasesResponse};
272
273 fn new_test_keeper() -> RegionLeaseKeeper {
274 let store = Arc::new(MemoryKvBackend::new());
275
276 let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
277
278 let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
279 RegionLeaseKeeper::new(table_metadata_manager, opening_region_keeper)
280 }
281
282 #[test]
283 fn test_renew_region_lease_via_region_route() {
284 let region_id = RegionId::new(1024, 1);
285 let leader_peer_id = 1024;
286 let follower_peer_id = 2048;
287 let mut region_route = RegionRouteBuilder::default()
288 .region(Region::new_test(region_id))
289 .leader_peer(Peer::empty(leader_peer_id))
290 .follower_peers(vec![Peer::empty(follower_peer_id)])
291 .build()
292 .unwrap();
293
294 for region_id in [RegionId::new(1024, 2), region_id] {
296 assert!(renew_region_lease_via_region_route(®ion_route, 1, region_id).is_none());
297 }
298
299 assert_eq!(
301 renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id),
302 Some((region_id, RegionRole::Leader))
303 );
304 assert_eq!(
306 renew_region_lease_via_region_route(®ion_route, follower_peer_id, region_id),
307 Some((region_id, RegionRole::Follower))
308 );
309
310 region_route.leader_state = Some(LeaderState::Downgrading);
311 assert_eq!(
313 renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id),
314 Some((region_id, RegionRole::DowngradingLeader))
315 );
316 }
317
318 #[tokio::test]
319 async fn test_renew_region_leases_non_exists_regions() {
320 let keeper = new_test_keeper();
321
322 let RenewRegionLeasesResponse {
323 non_exists,
324 renewed,
325 } = keeper
326 .renew_region_leases(
327 1,
328 &[
329 (RegionId::new(1024, 1), RegionRole::Follower),
330 (RegionId::new(1024, 2), RegionRole::Leader),
331 ],
332 )
333 .await
334 .unwrap();
335
336 assert!(renewed.is_empty());
337 assert_eq!(
338 non_exists,
339 HashSet::from([RegionId::new(1024, 1), RegionId::new(1024, 2)])
340 );
341 }
342
343 #[tokio::test]
344 async fn test_collect_metadata() {
345 let table_id = 1024;
346 let table_info: RawTableInfo = new_test_table_info(table_id).into();
347
348 let region_id = RegionId::new(table_id, 1);
349 let leader_peer_id = 1024;
350 let follower_peer_id = 2048;
351 let region_route = RegionRouteBuilder::default()
352 .region(Region::new_test(region_id))
353 .leader_peer(Peer::empty(leader_peer_id))
354 .follower_peers(vec![Peer::empty(follower_peer_id)])
355 .build()
356 .unwrap();
357
358 let keeper = new_test_keeper();
359 let table_metadata_manager = keeper.table_metadata_manager();
360 table_metadata_manager
361 .create_table_metadata(
362 table_info,
363 TableRouteValue::physical(vec![region_route]),
364 HashMap::default(),
365 )
366 .await
367 .unwrap();
368 let opening_region_id = RegionId::new(1025, 1);
369 let _guard = keeper
370 .memory_region_keeper
371 .register(leader_peer_id, opening_region_id)
372 .unwrap();
373 let another_opening_region_id = RegionId::new(1025, 2);
374 let _guard2 = keeper
375 .memory_region_keeper
376 .register(follower_peer_id, another_opening_region_id)
377 .unwrap();
378
379 let (metadata, regions) = keeper
380 .collect_metadata(
381 leader_peer_id,
382 HashSet::from([region_id, opening_region_id]),
383 )
384 .await
385 .unwrap();
386 assert_eq!(
387 metadata.keys().cloned().collect::<Vec<_>>(),
388 vec![region_id.table_id()]
389 );
390 assert!(regions.contains(&opening_region_id));
391 assert_eq!(regions.len(), 1);
392 }
393
394 #[tokio::test]
395 async fn test_renew_region_leases_basic() {
396 let table_id = 1024;
397 let table_info: RawTableInfo = new_test_table_info(table_id).into();
398
399 let region_id = RegionId::new(table_id, 1);
400 let leader_peer_id = 1024;
401 let follower_peer_id = 2048;
402 let region_route = RegionRouteBuilder::default()
403 .region(Region::new_test(region_id))
404 .leader_peer(Peer::empty(leader_peer_id))
405 .follower_peers(vec![Peer::empty(follower_peer_id)])
406 .build()
407 .unwrap();
408
409 let keeper = new_test_keeper();
410 let table_metadata_manager = keeper.table_metadata_manager();
411 table_metadata_manager
412 .create_table_metadata(
413 table_info,
414 TableRouteValue::physical(vec![region_route]),
415 HashMap::default(),
416 )
417 .await
418 .unwrap();
419
420 for region_id in [RegionId::new(1024, 2), region_id] {
422 let RenewRegionLeasesResponse {
423 non_exists,
424 renewed,
425 } = keeper
426 .renew_region_leases(1, &[(region_id, RegionRole::Follower)])
427 .await
428 .unwrap();
429 assert!(renewed.is_empty());
430 assert_eq!(non_exists, HashSet::from([region_id]));
431 }
432
433 for role in [RegionRole::Leader, RegionRole::Follower] {
435 let RenewRegionLeasesResponse {
436 non_exists,
437 renewed,
438 } = keeper
439 .renew_region_leases(leader_peer_id, &[(region_id, role)])
440 .await
441 .unwrap();
442
443 assert!(non_exists.is_empty());
444 assert_eq!(
445 renewed,
446 HashMap::from([(
447 region_id,
448 RegionLeaseInfo::from((region_id, RegionRole::Leader))
449 )])
450 );
451 }
452
453 for role in [RegionRole::Leader, RegionRole::Follower] {
455 let RenewRegionLeasesResponse {
456 non_exists,
457 renewed,
458 } = keeper
459 .renew_region_leases(follower_peer_id, &[(region_id, role)])
460 .await
461 .unwrap();
462
463 assert!(non_exists.is_empty());
464 assert_eq!(
465 renewed,
466 HashMap::from([(
467 region_id,
468 RegionLeaseInfo::from((region_id, RegionRole::Follower))
469 )])
470 );
471 }
472
473 let opening_region_id = RegionId::new(2048, 1);
474 let _guard = keeper
475 .memory_region_keeper
476 .register(leader_peer_id, opening_region_id)
477 .unwrap();
478
479 for role in [RegionRole::Leader, RegionRole::Follower] {
482 let RenewRegionLeasesResponse {
483 non_exists,
484 renewed,
485 } = keeper
486 .renew_region_leases(leader_peer_id, &[(opening_region_id, role)])
487 .await
488 .unwrap();
489
490 assert!(non_exists.is_empty());
491 assert_eq!(
492 renewed,
493 HashMap::from([(
494 opening_region_id,
495 RegionLeaseInfo::operating(opening_region_id, role)
496 )])
497 );
498 }
499 }
500
501 #[tokio::test]
502 async fn test_renew_unexpected_logic_table() {
503 let table_id = 1024;
504 let table_info: RawTableInfo = new_test_table_info(table_id).into();
505
506 let region_id = RegionId::new(table_id, 1);
507 let keeper = new_test_keeper();
508 let table_metadata_manager = keeper.table_metadata_manager();
509 table_metadata_manager
510 .create_table_metadata(
511 table_info,
512 TableRouteValue::Logical(LogicalTableRouteValue::new(table_id)),
513 HashMap::default(),
514 )
515 .await
516 .unwrap();
517
518 for region_id in [region_id, RegionId::new(1024, 2)] {
519 let RenewRegionLeasesResponse {
520 non_exists,
521 renewed,
522 } = keeper
523 .renew_region_leases(
524 1,
525 &[
526 (region_id, RegionRole::Follower),
527 (region_id, RegionRole::Leader),
528 ],
529 )
530 .await
531 .unwrap();
532 assert!(renewed.is_empty());
533 assert_eq!(non_exists, HashSet::from([region_id]));
534 }
535 }
536
537 #[tokio::test]
538 async fn test_renew_region_leases_with_downgrade_leader() {
539 let table_id = 1024;
540 let table_info: RawTableInfo = new_test_table_info(table_id).into();
541
542 let region_id = RegionId::new(table_id, 1);
543 let leader_peer_id = 1024;
544 let follower_peer_id = 2048;
545 let region_route = RegionRouteBuilder::default()
546 .region(Region::new_test(region_id))
547 .leader_peer(Peer::empty(leader_peer_id))
548 .follower_peers(vec![Peer::empty(follower_peer_id)])
549 .leader_state(LeaderState::Downgrading)
550 .build()
551 .unwrap();
552
553 let keeper = new_test_keeper();
554 let table_metadata_manager = keeper.table_metadata_manager();
555 table_metadata_manager
556 .create_table_metadata(
557 table_info,
558 TableRouteValue::physical(vec![region_route]),
559 HashMap::default(),
560 )
561 .await
562 .unwrap();
563
564 for role in [RegionRole::Leader, RegionRole::Follower] {
566 let RenewRegionLeasesResponse {
567 non_exists,
568 renewed,
569 } = keeper
570 .renew_region_leases(follower_peer_id, &[(region_id, role)])
571 .await
572 .unwrap();
573
574 assert!(non_exists.is_empty());
575 assert_eq!(
576 renewed,
577 HashMap::from([(
578 region_id,
579 RegionLeaseInfo::from((region_id, RegionRole::Follower))
580 )])
581 );
582 }
583 }
584}