Skip to main content

meta_srv/region/
lease_keeper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_meta::DatanodeId;
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::key::table_route::TableRouteValue;
21use common_meta::region_keeper::MemoryRegionKeeperRef;
22use common_meta::rpc::router::RegionRoute;
23use common_telemetry::{info, warn};
24use snafu::ResultExt;
25use store_api::region_engine::RegionRole;
26use store_api::storage::{RegionId, TableId};
27
28use crate::error::{self, Result};
29
30pub type RegionLeaseKeeperRef = Arc<RegionLeaseKeeper>;
31
32/// Renews lease of regions.
33pub struct RegionLeaseKeeper {
34    table_metadata_manager: TableMetadataManagerRef,
35    memory_region_keeper: MemoryRegionKeeperRef,
36}
37
38/// The result of region lease renewal,
39/// contains the renewed region leases and [RegionId] of non-existing regions.
40pub struct RenewRegionLeasesResponse {
41    pub renewed: HashMap<RegionId, RegionLeaseInfo>,
42    pub non_exists: HashSet<RegionId>,
43}
44
45impl RegionLeaseKeeper {
46    pub fn new(
47        table_metadata_manager: TableMetadataManagerRef,
48        memory_region_keeper: MemoryRegionKeeperRef,
49    ) -> Self {
50        Self {
51            table_metadata_manager,
52            memory_region_keeper,
53        }
54    }
55}
56
57fn renew_region_lease_via_region_route(
58    region_route: &RegionRoute,
59    datanode_id: DatanodeId,
60    region_id: RegionId,
61) -> Option<(RegionId, RegionRole)> {
62    // If it's a leader region on this datanode.
63    if let Some(leader) = &region_route.leader_peer
64        && leader.id == datanode_id
65    {
66        return region_route
67            .leader_region_role()
68            .map(|region_role| (region_id, region_role));
69    }
70
71    // If it's a follower region on this datanode.
72    if region_route
73        .follower_peers
74        .iter()
75        .any(|peer| peer.id == datanode_id)
76    {
77        return Some((region_id, RegionRole::Follower));
78    }
79
80    None
81}
82
83fn renew_region_lease_via_operating_regions(
84    operating_regions: &HashMap<RegionId, RegionRole>,
85    datanode_id: DatanodeId,
86    region_id: RegionId,
87    reported_role: RegionRole,
88) -> Option<RegionLeaseInfo> {
89    // `operating_regions` is filtered by the current datanode in `collect_metadata`,
90    // so looking up by `region_id` is sufficient here.
91    if let Some(role) = operating_regions.get(&region_id) {
92        let region_lease_info = RegionLeaseInfo::operating(region_id, *role);
93        if *role != reported_role {
94            info!(
95                "The region {} on datanode {} is operating with role {:?}, but reported as {:?}",
96                region_id, datanode_id, role, reported_role
97            );
98        }
99        return Some(region_lease_info);
100    }
101
102    None
103}
104
105/// The information of region lease.
106#[derive(Debug, PartialEq, Eq)]
107pub struct RegionLeaseInfo {
108    pub region_id: RegionId,
109    /// Whether the region is operating.
110    ///
111    /// The region is under dropping or opening / migration operation.
112    pub is_operating: bool,
113    /// The role of region.
114    pub role: RegionRole,
115}
116
117impl RegionLeaseInfo {
118    /// Creates a new [RegionLeaseInfo] with the given region id and role with operating status.
119    pub fn operating(region_id: RegionId, role: RegionRole) -> Self {
120        Self {
121            region_id,
122            is_operating: true,
123            role,
124        }
125    }
126}
127
128impl From<(RegionId, RegionRole)> for RegionLeaseInfo {
129    fn from((region_id, role): (RegionId, RegionRole)) -> Self {
130        Self {
131            region_id,
132            is_operating: false,
133            role,
134        }
135    }
136}
137
138impl RegionLeaseKeeper {
139    async fn collect_table_metadata(
140        &self,
141        table_ids: &[TableId],
142    ) -> Result<HashMap<TableId, TableRouteValue>> {
143        let table_route_manager = self.table_metadata_manager.table_route_manager();
144
145        // The subset of all table metadata.
146        // TODO: considers storing all active regions in meta's memory.
147        let table_routes = table_route_manager
148            .table_route_storage()
149            .batch_get(table_ids)
150            .await
151            .context(error::TableMetadataManagerSnafu)?;
152
153        let metadata_subset = table_routes
154            .into_iter()
155            .zip(table_ids)
156            .filter_map(|(route, id)| route.map(|route| (*id, route)))
157            .collect::<HashMap<_, _>>();
158
159        Ok(metadata_subset)
160    }
161
162    /// Returns [None] if:
163    /// - The region doesn't belong to the datanode in metadata or operating regions.
164    /// - The region belongs to a logical table.
165    fn renew_region_lease(
166        &self,
167        table_metadata: &HashMap<TableId, TableRouteValue>,
168        operating_regions: &HashMap<RegionId, RegionRole>,
169        datanode_id: DatanodeId,
170        region_id: RegionId,
171        reported_role: RegionRole,
172    ) -> Option<RegionLeaseInfo> {
173        // First try to renew via region route
174        if let Some(table_route) = table_metadata.get(&region_id.table_id())
175            && let Ok(Some(region_route)) = table_route.region_route(region_id)
176            && let Some(region_lease) =
177                renew_region_lease_via_region_route(&region_route, datanode_id, region_id)
178        {
179            return Some(RegionLeaseInfo::from(region_lease));
180        }
181        // Then try to renew via operating regions, which covers the opening region without region route in metadata.
182        if let Some(region_lease_info) = renew_region_lease_via_operating_regions(
183            operating_regions,
184            datanode_id,
185            region_id,
186            reported_role,
187        ) {
188            return Some(region_lease_info);
189        }
190
191        warn!(
192            "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, no matching metadata or operating region found",
193        );
194        None
195    }
196
197    async fn collect_metadata(
198        &self,
199        datanode_id: DatanodeId,
200        region_ids: HashSet<RegionId>,
201    ) -> Result<(
202        HashMap<TableId, TableRouteValue>,
203        HashMap<RegionId, RegionRole>,
204    )> {
205        let operating_regions = self
206            .memory_region_keeper
207            .extract_operating_region_roles(datanode_id, &region_ids);
208        let table_ids = region_ids
209            .into_iter()
210            .map(|region_id| region_id.table_id())
211            .collect::<HashSet<_>>()
212            .into_iter()
213            .collect::<Vec<_>>();
214        let table_metadata = self.collect_table_metadata(&table_ids).await?;
215        Ok((table_metadata, operating_regions))
216    }
217
218    /// Renews the lease of regions for specific datanode.
219    ///
220    /// The lease of regions will be renewed if:
221    /// -  The region of the specific datanode exists in [TableRouteValue].
222    /// -  The region of the specific datanode is opening.
223    ///
224    /// Otherwise the lease of regions will not be renewed,
225    /// and corresponding regions will be added to `non_exists` of [RenewRegionLeasesResponse].
226    pub async fn renew_region_leases(
227        &self,
228        datanode_id: DatanodeId,
229        regions: &[(RegionId, RegionRole)],
230    ) -> Result<RenewRegionLeasesResponse> {
231        let region_ids = regions
232            .iter()
233            .map(|(region_id, _)| *region_id)
234            .collect::<HashSet<_>>();
235        let (table_metadata, operating_regions) =
236            self.collect_metadata(datanode_id, region_ids).await?;
237        let mut renewed = HashMap::new();
238        let mut non_exists = HashSet::new();
239
240        for &(region, reported_role) in regions {
241            match self.renew_region_lease(
242                &table_metadata,
243                &operating_regions,
244                datanode_id,
245                region,
246                reported_role,
247            ) {
248                Some(region_lease_info) => {
249                    renewed.insert(region_lease_info.region_id, region_lease_info);
250                }
251                None => {
252                    non_exists.insert(region);
253                }
254            }
255        }
256
257        Ok(RenewRegionLeasesResponse {
258            renewed,
259            non_exists,
260        })
261    }
262
263    #[cfg(test)]
264    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
265        &self.table_metadata_manager
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use std::collections::{HashMap, HashSet};
272    use std::sync::Arc;
273
274    use common_meta::key::TableMetadataManager;
275    use common_meta::key::table_route::{LogicalTableRouteValue, TableRouteValue};
276    use common_meta::key::test_utils::new_test_table_info;
277    use common_meta::kv_backend::memory::MemoryKvBackend;
278    use common_meta::peer::Peer;
279    use common_meta::region_keeper::MemoryRegionKeeper;
280    use common_meta::rpc::router::{LeaderState, Region, RegionRouteBuilder};
281    use store_api::region_engine::RegionRole;
282    use store_api::storage::RegionId;
283    use table::metadata::TableInfo;
284
285    use super::{RegionLeaseKeeper, renew_region_lease_via_region_route};
286    use crate::region::lease_keeper::{RegionLeaseInfo, RenewRegionLeasesResponse};
287
288    fn new_test_keeper() -> RegionLeaseKeeper {
289        let store = Arc::new(MemoryKvBackend::new());
290
291        let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
292
293        let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
294        RegionLeaseKeeper::new(table_metadata_manager, opening_region_keeper)
295    }
296
297    #[test]
298    fn test_renew_region_lease_via_region_route() {
299        let region_id = RegionId::new(1024, 1);
300        let leader_peer_id = 1024;
301        let follower_peer_id = 2048;
302        let mut region_route = RegionRouteBuilder::default()
303            .region(Region::new_test(region_id))
304            .leader_peer(Peer::empty(leader_peer_id))
305            .follower_peers(vec![Peer::empty(follower_peer_id)])
306            .build()
307            .unwrap();
308
309        // The region doesn't belong to the datanode.
310        for region_id in [RegionId::new(1024, 2), region_id] {
311            assert!(renew_region_lease_via_region_route(&region_route, 1, region_id).is_none());
312        }
313
314        // The leader region on the datanode.
315        assert_eq!(
316            renew_region_lease_via_region_route(&region_route, leader_peer_id, region_id),
317            Some((region_id, RegionRole::Leader))
318        );
319        // The follower region on the datanode.
320        assert_eq!(
321            renew_region_lease_via_region_route(&region_route, follower_peer_id, region_id),
322            Some((region_id, RegionRole::Follower))
323        );
324
325        region_route.leader_state = Some(LeaderState::Downgrading);
326        // The downgraded leader region on the datanode.
327        assert_eq!(
328            renew_region_lease_via_region_route(&region_route, leader_peer_id, region_id),
329            Some((region_id, RegionRole::DowngradingLeader))
330        );
331
332        region_route.leader_state = Some(LeaderState::Staging);
333        assert_eq!(
334            renew_region_lease_via_region_route(&region_route, leader_peer_id, region_id),
335            Some((region_id, RegionRole::StagingLeader))
336        );
337    }
338
339    #[tokio::test]
340    async fn test_renew_region_leases_non_exists_regions() {
341        let keeper = new_test_keeper();
342
343        let RenewRegionLeasesResponse {
344            non_exists,
345            renewed,
346        } = keeper
347            .renew_region_leases(
348                1,
349                &[
350                    (RegionId::new(1024, 1), RegionRole::Follower),
351                    (RegionId::new(1024, 2), RegionRole::Leader),
352                ],
353            )
354            .await
355            .unwrap();
356
357        assert!(renewed.is_empty());
358        assert_eq!(
359            non_exists,
360            HashSet::from([RegionId::new(1024, 1), RegionId::new(1024, 2)])
361        );
362    }
363
364    #[tokio::test]
365    async fn test_collect_metadata() {
366        let table_id = 1024;
367        let table_info: TableInfo = new_test_table_info(table_id);
368
369        let region_id = RegionId::new(table_id, 1);
370        let leader_peer_id = 1024;
371        let follower_peer_id = 2048;
372        let region_route = RegionRouteBuilder::default()
373            .region(Region::new_test(region_id))
374            .leader_peer(Peer::empty(leader_peer_id))
375            .follower_peers(vec![Peer::empty(follower_peer_id)])
376            .build()
377            .unwrap();
378
379        let keeper = new_test_keeper();
380        let table_metadata_manager = keeper.table_metadata_manager();
381        table_metadata_manager
382            .create_table_metadata(
383                table_info,
384                TableRouteValue::physical(vec![region_route]),
385                HashMap::default(),
386            )
387            .await
388            .unwrap();
389        let opening_region_id = RegionId::new(1025, 1);
390        let _guard = keeper
391            .memory_region_keeper
392            .register_with_role(leader_peer_id, opening_region_id, RegionRole::Leader)
393            .unwrap();
394        let another_opening_region_id = RegionId::new(1025, 2);
395        let _guard2 = keeper
396            .memory_region_keeper
397            .register_with_role(
398                follower_peer_id,
399                another_opening_region_id,
400                RegionRole::Follower,
401            )
402            .unwrap();
403
404        let (metadata, regions) = keeper
405            .collect_metadata(
406                leader_peer_id,
407                HashSet::from([region_id, opening_region_id]),
408            )
409            .await
410            .unwrap();
411        assert_eq!(
412            metadata.keys().cloned().collect::<Vec<_>>(),
413            vec![region_id.table_id()]
414        );
415        assert_eq!(
416            regions,
417            HashMap::from([(opening_region_id, RegionRole::Leader)])
418        );
419    }
420
421    #[tokio::test]
422    async fn test_renew_region_leases_basic() {
423        let table_id = 1024;
424        let table_info: TableInfo = new_test_table_info(table_id);
425
426        let region_id = RegionId::new(table_id, 1);
427        let leader_peer_id = 1024;
428        let follower_peer_id = 2048;
429        let region_route = RegionRouteBuilder::default()
430            .region(Region::new_test(region_id))
431            .leader_peer(Peer::empty(leader_peer_id))
432            .follower_peers(vec![Peer::empty(follower_peer_id)])
433            .build()
434            .unwrap();
435
436        let keeper = new_test_keeper();
437        let table_metadata_manager = keeper.table_metadata_manager();
438        table_metadata_manager
439            .create_table_metadata(
440                table_info,
441                TableRouteValue::physical(vec![region_route]),
442                HashMap::default(),
443            )
444            .await
445            .unwrap();
446
447        // The region doesn't belong to the datanode.
448        for region_id in [RegionId::new(1024, 2), region_id] {
449            let RenewRegionLeasesResponse {
450                non_exists,
451                renewed,
452            } = keeper
453                .renew_region_leases(1, &[(region_id, RegionRole::Follower)])
454                .await
455                .unwrap();
456            assert!(renewed.is_empty());
457            assert_eq!(non_exists, HashSet::from([region_id]));
458        }
459
460        // The leader region on the datanode.
461        for role in [RegionRole::Leader, RegionRole::Follower] {
462            let RenewRegionLeasesResponse {
463                non_exists,
464                renewed,
465            } = keeper
466                .renew_region_leases(leader_peer_id, &[(region_id, role)])
467                .await
468                .unwrap();
469
470            assert!(non_exists.is_empty());
471            assert_eq!(
472                renewed,
473                HashMap::from([(
474                    region_id,
475                    RegionLeaseInfo::from((region_id, RegionRole::Leader))
476                )])
477            );
478        }
479
480        // The follower region on the datanode.
481        for role in [RegionRole::Leader, RegionRole::Follower] {
482            let RenewRegionLeasesResponse {
483                non_exists,
484                renewed,
485            } = keeper
486                .renew_region_leases(follower_peer_id, &[(region_id, role)])
487                .await
488                .unwrap();
489
490            assert!(non_exists.is_empty());
491            assert_eq!(
492                renewed,
493                HashMap::from([(
494                    region_id,
495                    RegionLeaseInfo::from((region_id, RegionRole::Follower))
496                )])
497            );
498        }
499
500        let opening_region_id = RegionId::new(2048, 1);
501        let _guard = keeper
502            .memory_region_keeper
503            .register_with_role(leader_peer_id, opening_region_id, RegionRole::Leader)
504            .unwrap();
505
506        // The opening region on the datanode.
507        // NOTES: The procedure lock will ensure only one opening leader.
508        for reported_role in [RegionRole::Leader, RegionRole::Follower] {
509            let RenewRegionLeasesResponse {
510                non_exists,
511                renewed,
512            } = keeper
513                .renew_region_leases(leader_peer_id, &[(opening_region_id, reported_role)])
514                .await
515                .unwrap();
516
517            assert!(non_exists.is_empty());
518            assert_eq!(
519                renewed,
520                HashMap::from([(
521                    opening_region_id,
522                    RegionLeaseInfo::operating(opening_region_id, RegionRole::Leader)
523                )])
524            );
525        }
526    }
527
528    #[tokio::test]
529    async fn test_renew_unexpected_logic_table() {
530        let table_id = 1024;
531        let table_info: TableInfo = new_test_table_info(table_id);
532
533        let region_id = RegionId::new(table_id, 1);
534        let keeper = new_test_keeper();
535        let table_metadata_manager = keeper.table_metadata_manager();
536        table_metadata_manager
537            .create_table_metadata(
538                table_info,
539                TableRouteValue::Logical(LogicalTableRouteValue::new(table_id)),
540                HashMap::default(),
541            )
542            .await
543            .unwrap();
544
545        for region_id in [region_id, RegionId::new(1024, 2)] {
546            let RenewRegionLeasesResponse {
547                non_exists,
548                renewed,
549            } = keeper
550                .renew_region_leases(
551                    1,
552                    &[
553                        (region_id, RegionRole::Follower),
554                        (region_id, RegionRole::Leader),
555                    ],
556                )
557                .await
558                .unwrap();
559            assert!(renewed.is_empty());
560            assert_eq!(non_exists, HashSet::from([region_id]));
561        }
562    }
563
564    #[tokio::test]
565    async fn test_renew_region_leases_with_downgrade_leader() {
566        let table_id = 1024;
567        let table_info: TableInfo = new_test_table_info(table_id);
568
569        let region_id = RegionId::new(table_id, 1);
570        let leader_peer_id = 1024;
571        let follower_peer_id = 2048;
572        let region_route = RegionRouteBuilder::default()
573            .region(Region::new_test(region_id))
574            .leader_peer(Peer::empty(leader_peer_id))
575            .follower_peers(vec![Peer::empty(follower_peer_id)])
576            .leader_state(LeaderState::Downgrading)
577            .build()
578            .unwrap();
579
580        let keeper = new_test_keeper();
581        let table_metadata_manager = keeper.table_metadata_manager();
582        table_metadata_manager
583            .create_table_metadata(
584                table_info,
585                TableRouteValue::physical(vec![region_route]),
586                HashMap::default(),
587            )
588            .await
589            .unwrap();
590
591        // The leader region on the datanode.
592        for role in [RegionRole::Leader, RegionRole::Follower] {
593            let RenewRegionLeasesResponse {
594                non_exists,
595                renewed,
596            } = keeper
597                .renew_region_leases(follower_peer_id, &[(region_id, role)])
598                .await
599                .unwrap();
600
601            assert!(non_exists.is_empty());
602            assert_eq!(
603                renewed,
604                HashMap::from([(
605                    region_id,
606                    RegionLeaseInfo::from((region_id, RegionRole::Follower))
607                )])
608            );
609        }
610    }
611
612    #[tokio::test]
613    async fn test_renew_region_leases_reported_staging_expected_leader() {
614        let table_id = 1024;
615        let table_info: TableInfo = new_test_table_info(table_id);
616
617        let region_id = RegionId::new(table_id, 1);
618        let leader_peer_id = 1024;
619        let region_route = RegionRouteBuilder::default()
620            .region(Region::new_test(region_id))
621            .leader_peer(Peer::empty(leader_peer_id))
622            .build()
623            .unwrap();
624
625        let keeper = new_test_keeper();
626        let table_metadata_manager = keeper.table_metadata_manager();
627        table_metadata_manager
628            .create_table_metadata(
629                table_info,
630                TableRouteValue::physical(vec![region_route]),
631                HashMap::default(),
632            )
633            .await
634            .unwrap();
635
636        let RenewRegionLeasesResponse {
637            non_exists,
638            renewed,
639        } = keeper
640            .renew_region_leases(leader_peer_id, &[(region_id, RegionRole::StagingLeader)])
641            .await
642            .unwrap();
643
644        assert!(non_exists.is_empty());
645        assert_eq!(
646            renewed,
647            HashMap::from([(
648                region_id,
649                RegionLeaseInfo::from((region_id, RegionRole::Leader))
650            )])
651        );
652    }
653
654    #[tokio::test]
655    async fn test_renew_region_leases_reported_staging_expected_staging() {
656        let table_id = 1024;
657        let table_info: TableInfo = new_test_table_info(table_id);
658
659        let region_id = RegionId::new(table_id, 1);
660        let leader_peer_id = 1024;
661        let region_route = RegionRouteBuilder::default()
662            .region(Region::new_test(region_id))
663            .leader_peer(Peer::empty(leader_peer_id))
664            .leader_state(LeaderState::Staging)
665            .build()
666            .unwrap();
667
668        let keeper = new_test_keeper();
669        let table_metadata_manager = keeper.table_metadata_manager();
670        table_metadata_manager
671            .create_table_metadata(
672                table_info,
673                TableRouteValue::physical(vec![region_route]),
674                HashMap::default(),
675            )
676            .await
677            .unwrap();
678
679        let RenewRegionLeasesResponse {
680            non_exists,
681            renewed,
682        } = keeper
683            .renew_region_leases(leader_peer_id, &[(region_id, RegionRole::StagingLeader)])
684            .await
685            .unwrap();
686
687        assert!(non_exists.is_empty());
688        assert_eq!(
689            renewed,
690            HashMap::from([(
691                region_id,
692                RegionLeaseInfo::from((region_id, RegionRole::StagingLeader))
693            )])
694        );
695    }
696
697    #[tokio::test]
698    async fn test_renew_region_leases_metadata_role_beats_keeper_role() {
699        let table_id = 2048;
700        let table_info: TableInfo = new_test_table_info(table_id);
701
702        let datanode_id = 1024;
703        let region_id = RegionId::new(table_id, 1);
704        let region_route = RegionRouteBuilder::default()
705            .region(Region::new_test(region_id))
706            .leader_peer(Peer::empty(datanode_id))
707            .build()
708            .unwrap();
709
710        let keeper = new_test_keeper();
711        let table_metadata_manager = keeper.table_metadata_manager();
712        table_metadata_manager
713            .create_table_metadata(
714                table_info,
715                TableRouteValue::physical(vec![region_route]),
716                HashMap::default(),
717            )
718            .await
719            .unwrap();
720
721        let _guard = keeper
722            .memory_region_keeper
723            .register_with_role(datanode_id, region_id, RegionRole::Follower)
724            .unwrap();
725
726        let RenewRegionLeasesResponse {
727            non_exists,
728            renewed,
729        } = keeper
730            .renew_region_leases(datanode_id, &[(region_id, RegionRole::Follower)])
731            .await
732            .unwrap();
733
734        assert!(non_exists.is_empty());
735        assert_eq!(
736            renewed,
737            HashMap::from([(
738                region_id,
739                RegionLeaseInfo::from((region_id, RegionRole::Leader))
740            )])
741        );
742    }
743
744    #[tokio::test]
745    async fn test_renew_region_leases_missing_route_falls_back_to_keeper_role() {
746        let table_id = 2048;
747        let table_info: TableInfo = new_test_table_info(table_id);
748
749        let datanode_id = 1024;
750        let region_id = RegionId::new(table_id, 1);
751        let another_region_id = RegionId::new(table_id, 2);
752        let region_route = RegionRouteBuilder::default()
753            .region(Region::new_test(another_region_id))
754            .leader_peer(Peer::empty(datanode_id))
755            .build()
756            .unwrap();
757
758        let keeper = new_test_keeper();
759        let table_metadata_manager = keeper.table_metadata_manager();
760        table_metadata_manager
761            .create_table_metadata(
762                table_info,
763                TableRouteValue::physical(vec![region_route]),
764                HashMap::default(),
765            )
766            .await
767            .unwrap();
768
769        let _guard = keeper
770            .memory_region_keeper
771            .register_with_role(datanode_id, region_id, RegionRole::DowngradingLeader)
772            .unwrap();
773
774        let RenewRegionLeasesResponse {
775            non_exists,
776            renewed,
777        } = keeper
778            .renew_region_leases(datanode_id, &[(region_id, RegionRole::StagingLeader)])
779            .await
780            .unwrap();
781
782        assert!(non_exists.is_empty());
783        assert_eq!(
784            renewed,
785            HashMap::from([(
786                region_id,
787                RegionLeaseInfo::operating(region_id, RegionRole::DowngradingLeader)
788            )])
789        );
790    }
791
792    #[tokio::test]
793    async fn test_renew_region_leases_operating_region_uses_keeper_role() {
794        let keeper = new_test_keeper();
795        let datanode_id = 1024;
796        let region_id = RegionId::new(2048, 1);
797
798        let _guard = keeper
799            .memory_region_keeper
800            .register_with_role(datanode_id, region_id, RegionRole::DowngradingLeader)
801            .unwrap();
802
803        let RenewRegionLeasesResponse {
804            non_exists,
805            renewed,
806        } = keeper
807            .renew_region_leases(datanode_id, &[(region_id, RegionRole::StagingLeader)])
808            .await
809            .unwrap();
810
811        assert!(non_exists.is_empty());
812        assert_eq!(
813            renewed,
814            HashMap::from([(
815                region_id,
816                RegionLeaseInfo::operating(region_id, RegionRole::DowngradingLeader)
817            )])
818        );
819    }
820}