meta_srv/region/
lease_keeper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_meta::DatanodeId;
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::key::table_route::TableRouteValue;
21use common_meta::region_keeper::MemoryRegionKeeperRef;
22use common_meta::rpc::router::RegionRoute;
23use common_telemetry::warn;
24use snafu::ResultExt;
25use store_api::region_engine::RegionRole;
26use store_api::storage::{RegionId, TableId};
27
28use crate::error::{self, Result};
29
30pub type RegionLeaseKeeperRef = Arc<RegionLeaseKeeper>;
31
32/// Renews lease of regions.
33pub struct RegionLeaseKeeper {
34    table_metadata_manager: TableMetadataManagerRef,
35    memory_region_keeper: MemoryRegionKeeperRef,
36}
37
38/// The result of region lease renewal,
39/// contains the renewed region leases and [RegionId] of non-existing regions.
40pub struct RenewRegionLeasesResponse {
41    pub renewed: HashMap<RegionId, RegionLeaseInfo>,
42    pub non_exists: HashSet<RegionId>,
43}
44
45impl RegionLeaseKeeper {
46    pub fn new(
47        table_metadata_manager: TableMetadataManagerRef,
48        memory_region_keeper: MemoryRegionKeeperRef,
49    ) -> Self {
50        Self {
51            table_metadata_manager,
52            memory_region_keeper,
53        }
54    }
55}
56
57fn renew_region_lease_via_region_route(
58    region_route: &RegionRoute,
59    datanode_id: DatanodeId,
60    region_id: RegionId,
61) -> Option<(RegionId, RegionRole)> {
62    // If it's a leader region on this datanode.
63    if let Some(leader) = &region_route.leader_peer
64        && leader.id == datanode_id
65    {
66        let region_role = if region_route.is_leader_downgrading() {
67            RegionRole::DowngradingLeader
68        } else {
69            RegionRole::Leader
70        };
71
72        return Some((region_id, region_role));
73    }
74
75    // If it's a follower region on this datanode.
76    if region_route
77        .follower_peers
78        .iter()
79        .any(|peer| peer.id == datanode_id)
80    {
81        return Some((region_id, RegionRole::Follower));
82    }
83
84    warn!(
85        "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region_routes: {:?}",
86        region_route
87    );
88    // The region doesn't belong to this datanode.
89    None
90}
91
92/// The information of region lease.
93#[derive(Debug, PartialEq, Eq)]
94pub struct RegionLeaseInfo {
95    pub region_id: RegionId,
96    /// Whether the region is operating.
97    ///
98    /// The region is under dropping or opening / migration operation.
99    pub is_operating: bool,
100    /// The role of region.
101    pub role: RegionRole,
102}
103
104impl RegionLeaseInfo {
105    /// Creates a new [RegionLeaseInfo] with the given region id and role with operating status.
106    pub fn operating(region_id: RegionId, role: RegionRole) -> Self {
107        Self {
108            region_id,
109            is_operating: true,
110            role,
111        }
112    }
113}
114
115impl From<(RegionId, RegionRole)> for RegionLeaseInfo {
116    fn from((region_id, role): (RegionId, RegionRole)) -> Self {
117        Self {
118            region_id,
119            is_operating: false,
120            role,
121        }
122    }
123}
124
125impl RegionLeaseKeeper {
126    async fn collect_table_metadata(
127        &self,
128        table_ids: &[TableId],
129    ) -> Result<HashMap<TableId, TableRouteValue>> {
130        let table_route_manager = self.table_metadata_manager.table_route_manager();
131
132        // The subset of all table metadata.
133        // TODO: considers storing all active regions in meta's memory.
134        let table_routes = table_route_manager
135            .table_route_storage()
136            .batch_get(table_ids)
137            .await
138            .context(error::TableMetadataManagerSnafu)?;
139
140        let metadata_subset = table_routes
141            .into_iter()
142            .zip(table_ids)
143            .filter_map(|(route, id)| route.map(|route| (*id, route)))
144            .collect::<HashMap<_, _>>();
145
146        Ok(metadata_subset)
147    }
148
149    /// Returns [None] if:
150    /// - The region doesn't belong to the datanode.
151    /// - The region belongs to a logical table.
152    fn renew_region_lease(
153        &self,
154        table_metadata: &HashMap<TableId, TableRouteValue>,
155        operating_regions: &HashSet<RegionId>,
156        datanode_id: DatanodeId,
157        region_id: RegionId,
158        role: RegionRole,
159    ) -> Option<RegionLeaseInfo> {
160        if operating_regions.contains(&region_id) {
161            let region_lease_info = RegionLeaseInfo::operating(region_id, role);
162            return Some(region_lease_info);
163        }
164
165        if let Some(table_route) = table_metadata.get(&region_id.table_id()) {
166            if let Ok(Some(region_route)) = table_route.region_route(region_id) {
167                return renew_region_lease_via_region_route(&region_route, datanode_id, region_id)
168                    .map(RegionLeaseInfo::from);
169            } else {
170                warn!(
171                    "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region route is not found in table({})",
172                    region_id.table_id()
173                );
174            }
175        } else {
176            warn!(
177                "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, table({}) is not found",
178                region_id.table_id()
179            );
180        }
181        None
182    }
183
184    async fn collect_metadata(
185        &self,
186        datanode_id: DatanodeId,
187        mut region_ids: HashSet<RegionId>,
188    ) -> Result<(HashMap<TableId, TableRouteValue>, HashSet<RegionId>)> {
189        // Filters out operating region first, improves the cache hit rate(reduce expensive remote fetches).
190        let operating_regions = self
191            .memory_region_keeper
192            .extract_operating_regions(datanode_id, &mut region_ids);
193        let table_ids = region_ids
194            .into_iter()
195            .map(|region_id| region_id.table_id())
196            .collect::<HashSet<_>>()
197            .into_iter()
198            .collect::<Vec<_>>();
199        let table_metadata = self.collect_table_metadata(&table_ids).await?;
200        Ok((table_metadata, operating_regions))
201    }
202
203    /// Renews the lease of regions for specific datanode.
204    ///
205    /// The lease of regions will be renewed if:
206    /// -  The region of the specific datanode exists in [TableRouteValue].
207    /// -  The region of the specific datanode is opening.
208    ///
209    /// Otherwise the lease of regions will not be renewed,
210    /// and corresponding regions will be added to `non_exists` of [RenewRegionLeasesResponse].
211    pub async fn renew_region_leases(
212        &self,
213        datanode_id: DatanodeId,
214        regions: &[(RegionId, RegionRole)],
215    ) -> Result<RenewRegionLeasesResponse> {
216        let region_ids = regions
217            .iter()
218            .map(|(region_id, _)| *region_id)
219            .collect::<HashSet<_>>();
220        let (table_metadata, operating_regions) =
221            self.collect_metadata(datanode_id, region_ids).await?;
222        let mut renewed = HashMap::new();
223        let mut non_exists = HashSet::new();
224
225        for &(region, role) in regions {
226            match self.renew_region_lease(
227                &table_metadata,
228                &operating_regions,
229                datanode_id,
230                region,
231                role,
232            ) {
233                Some(region_lease_info) => {
234                    renewed.insert(region_lease_info.region_id, region_lease_info);
235                }
236                None => {
237                    non_exists.insert(region);
238                }
239            }
240        }
241
242        Ok(RenewRegionLeasesResponse {
243            renewed,
244            non_exists,
245        })
246    }
247
248    #[cfg(test)]
249    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
250        &self.table_metadata_manager
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use std::collections::{HashMap, HashSet};
257    use std::sync::Arc;
258
259    use common_meta::key::TableMetadataManager;
260    use common_meta::key::table_route::{LogicalTableRouteValue, TableRouteValue};
261    use common_meta::key::test_utils::new_test_table_info;
262    use common_meta::kv_backend::memory::MemoryKvBackend;
263    use common_meta::peer::Peer;
264    use common_meta::region_keeper::MemoryRegionKeeper;
265    use common_meta::rpc::router::{LeaderState, Region, RegionRouteBuilder};
266    use store_api::region_engine::RegionRole;
267    use store_api::storage::RegionId;
268    use table::metadata::RawTableInfo;
269
270    use super::{RegionLeaseKeeper, renew_region_lease_via_region_route};
271    use crate::region::lease_keeper::{RegionLeaseInfo, RenewRegionLeasesResponse};
272
273    fn new_test_keeper() -> RegionLeaseKeeper {
274        let store = Arc::new(MemoryKvBackend::new());
275
276        let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
277
278        let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
279        RegionLeaseKeeper::new(table_metadata_manager, opening_region_keeper)
280    }
281
282    #[test]
283    fn test_renew_region_lease_via_region_route() {
284        let region_id = RegionId::new(1024, 1);
285        let leader_peer_id = 1024;
286        let follower_peer_id = 2048;
287        let mut region_route = RegionRouteBuilder::default()
288            .region(Region::new_test(region_id))
289            .leader_peer(Peer::empty(leader_peer_id))
290            .follower_peers(vec![Peer::empty(follower_peer_id)])
291            .build()
292            .unwrap();
293
294        // The region doesn't belong to the datanode.
295        for region_id in [RegionId::new(1024, 2), region_id] {
296            assert!(renew_region_lease_via_region_route(&region_route, 1, region_id).is_none());
297        }
298
299        // The leader region on the datanode.
300        assert_eq!(
301            renew_region_lease_via_region_route(&region_route, leader_peer_id, region_id),
302            Some((region_id, RegionRole::Leader))
303        );
304        // The follower region on the datanode.
305        assert_eq!(
306            renew_region_lease_via_region_route(&region_route, follower_peer_id, region_id),
307            Some((region_id, RegionRole::Follower))
308        );
309
310        region_route.leader_state = Some(LeaderState::Downgrading);
311        // The downgraded leader region on the datanode.
312        assert_eq!(
313            renew_region_lease_via_region_route(&region_route, leader_peer_id, region_id),
314            Some((region_id, RegionRole::DowngradingLeader))
315        );
316    }
317
318    #[tokio::test]
319    async fn test_renew_region_leases_non_exists_regions() {
320        let keeper = new_test_keeper();
321
322        let RenewRegionLeasesResponse {
323            non_exists,
324            renewed,
325        } = keeper
326            .renew_region_leases(
327                1,
328                &[
329                    (RegionId::new(1024, 1), RegionRole::Follower),
330                    (RegionId::new(1024, 2), RegionRole::Leader),
331                ],
332            )
333            .await
334            .unwrap();
335
336        assert!(renewed.is_empty());
337        assert_eq!(
338            non_exists,
339            HashSet::from([RegionId::new(1024, 1), RegionId::new(1024, 2)])
340        );
341    }
342
343    #[tokio::test]
344    async fn test_collect_metadata() {
345        let table_id = 1024;
346        let table_info: RawTableInfo = new_test_table_info(table_id).into();
347
348        let region_id = RegionId::new(table_id, 1);
349        let leader_peer_id = 1024;
350        let follower_peer_id = 2048;
351        let region_route = RegionRouteBuilder::default()
352            .region(Region::new_test(region_id))
353            .leader_peer(Peer::empty(leader_peer_id))
354            .follower_peers(vec![Peer::empty(follower_peer_id)])
355            .build()
356            .unwrap();
357
358        let keeper = new_test_keeper();
359        let table_metadata_manager = keeper.table_metadata_manager();
360        table_metadata_manager
361            .create_table_metadata(
362                table_info,
363                TableRouteValue::physical(vec![region_route]),
364                HashMap::default(),
365            )
366            .await
367            .unwrap();
368        let opening_region_id = RegionId::new(1025, 1);
369        let _guard = keeper
370            .memory_region_keeper
371            .register(leader_peer_id, opening_region_id)
372            .unwrap();
373        let another_opening_region_id = RegionId::new(1025, 2);
374        let _guard2 = keeper
375            .memory_region_keeper
376            .register(follower_peer_id, another_opening_region_id)
377            .unwrap();
378
379        let (metadata, regions) = keeper
380            .collect_metadata(
381                leader_peer_id,
382                HashSet::from([region_id, opening_region_id]),
383            )
384            .await
385            .unwrap();
386        assert_eq!(
387            metadata.keys().cloned().collect::<Vec<_>>(),
388            vec![region_id.table_id()]
389        );
390        assert!(regions.contains(&opening_region_id));
391        assert_eq!(regions.len(), 1);
392    }
393
394    #[tokio::test]
395    async fn test_renew_region_leases_basic() {
396        let table_id = 1024;
397        let table_info: RawTableInfo = new_test_table_info(table_id).into();
398
399        let region_id = RegionId::new(table_id, 1);
400        let leader_peer_id = 1024;
401        let follower_peer_id = 2048;
402        let region_route = RegionRouteBuilder::default()
403            .region(Region::new_test(region_id))
404            .leader_peer(Peer::empty(leader_peer_id))
405            .follower_peers(vec![Peer::empty(follower_peer_id)])
406            .build()
407            .unwrap();
408
409        let keeper = new_test_keeper();
410        let table_metadata_manager = keeper.table_metadata_manager();
411        table_metadata_manager
412            .create_table_metadata(
413                table_info,
414                TableRouteValue::physical(vec![region_route]),
415                HashMap::default(),
416            )
417            .await
418            .unwrap();
419
420        // The region doesn't belong to the datanode.
421        for region_id in [RegionId::new(1024, 2), region_id] {
422            let RenewRegionLeasesResponse {
423                non_exists,
424                renewed,
425            } = keeper
426                .renew_region_leases(1, &[(region_id, RegionRole::Follower)])
427                .await
428                .unwrap();
429            assert!(renewed.is_empty());
430            assert_eq!(non_exists, HashSet::from([region_id]));
431        }
432
433        // The leader region on the datanode.
434        for role in [RegionRole::Leader, RegionRole::Follower] {
435            let RenewRegionLeasesResponse {
436                non_exists,
437                renewed,
438            } = keeper
439                .renew_region_leases(leader_peer_id, &[(region_id, role)])
440                .await
441                .unwrap();
442
443            assert!(non_exists.is_empty());
444            assert_eq!(
445                renewed,
446                HashMap::from([(
447                    region_id,
448                    RegionLeaseInfo::from((region_id, RegionRole::Leader))
449                )])
450            );
451        }
452
453        // The follower region on the datanode.
454        for role in [RegionRole::Leader, RegionRole::Follower] {
455            let RenewRegionLeasesResponse {
456                non_exists,
457                renewed,
458            } = keeper
459                .renew_region_leases(follower_peer_id, &[(region_id, role)])
460                .await
461                .unwrap();
462
463            assert!(non_exists.is_empty());
464            assert_eq!(
465                renewed,
466                HashMap::from([(
467                    region_id,
468                    RegionLeaseInfo::from((region_id, RegionRole::Follower))
469                )])
470            );
471        }
472
473        let opening_region_id = RegionId::new(2048, 1);
474        let _guard = keeper
475            .memory_region_keeper
476            .register(leader_peer_id, opening_region_id)
477            .unwrap();
478
479        // The opening region on the datanode.
480        // NOTES: The procedure lock will ensure only one opening leader.
481        for role in [RegionRole::Leader, RegionRole::Follower] {
482            let RenewRegionLeasesResponse {
483                non_exists,
484                renewed,
485            } = keeper
486                .renew_region_leases(leader_peer_id, &[(opening_region_id, role)])
487                .await
488                .unwrap();
489
490            assert!(non_exists.is_empty());
491            assert_eq!(
492                renewed,
493                HashMap::from([(
494                    opening_region_id,
495                    RegionLeaseInfo::operating(opening_region_id, role)
496                )])
497            );
498        }
499    }
500
501    #[tokio::test]
502    async fn test_renew_unexpected_logic_table() {
503        let table_id = 1024;
504        let table_info: RawTableInfo = new_test_table_info(table_id).into();
505
506        let region_id = RegionId::new(table_id, 1);
507        let keeper = new_test_keeper();
508        let table_metadata_manager = keeper.table_metadata_manager();
509        table_metadata_manager
510            .create_table_metadata(
511                table_info,
512                TableRouteValue::Logical(LogicalTableRouteValue::new(table_id)),
513                HashMap::default(),
514            )
515            .await
516            .unwrap();
517
518        for region_id in [region_id, RegionId::new(1024, 2)] {
519            let RenewRegionLeasesResponse {
520                non_exists,
521                renewed,
522            } = keeper
523                .renew_region_leases(
524                    1,
525                    &[
526                        (region_id, RegionRole::Follower),
527                        (region_id, RegionRole::Leader),
528                    ],
529                )
530                .await
531                .unwrap();
532            assert!(renewed.is_empty());
533            assert_eq!(non_exists, HashSet::from([region_id]));
534        }
535    }
536
537    #[tokio::test]
538    async fn test_renew_region_leases_with_downgrade_leader() {
539        let table_id = 1024;
540        let table_info: RawTableInfo = new_test_table_info(table_id).into();
541
542        let region_id = RegionId::new(table_id, 1);
543        let leader_peer_id = 1024;
544        let follower_peer_id = 2048;
545        let region_route = RegionRouteBuilder::default()
546            .region(Region::new_test(region_id))
547            .leader_peer(Peer::empty(leader_peer_id))
548            .follower_peers(vec![Peer::empty(follower_peer_id)])
549            .leader_state(LeaderState::Downgrading)
550            .build()
551            .unwrap();
552
553        let keeper = new_test_keeper();
554        let table_metadata_manager = keeper.table_metadata_manager();
555        table_metadata_manager
556            .create_table_metadata(
557                table_info,
558                TableRouteValue::physical(vec![region_route]),
559                HashMap::default(),
560            )
561            .await
562            .unwrap();
563
564        // The leader region on the datanode.
565        for role in [RegionRole::Leader, RegionRole::Follower] {
566            let RenewRegionLeasesResponse {
567                non_exists,
568                renewed,
569            } = keeper
570                .renew_region_leases(follower_peer_id, &[(region_id, role)])
571                .await
572                .unwrap();
573
574            assert!(non_exists.is_empty());
575            assert_eq!(
576                renewed,
577                HashMap::from([(
578                    region_id,
579                    RegionLeaseInfo::from((region_id, RegionRole::Follower))
580                )])
581            );
582        }
583    }
584}