meta_srv/region/
lease_keeper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use common_meta::key::table_route::TableRouteValue;
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::region_keeper::MemoryRegionKeeperRef;
21use common_meta::rpc::router::RegionRoute;
22use common_meta::DatanodeId;
23use common_telemetry::warn;
24use snafu::ResultExt;
25use store_api::region_engine::RegionRole;
26use store_api::storage::{RegionId, TableId};
27
28use crate::error::{self, Result};
29
30pub type RegionLeaseKeeperRef = Arc<RegionLeaseKeeper>;
31
32/// Renews lease of regions.
33pub struct RegionLeaseKeeper {
34    table_metadata_manager: TableMetadataManagerRef,
35    memory_region_keeper: MemoryRegionKeeperRef,
36}
37
38/// The result of region lease renewal,
39/// contains the renewed region leases and [RegionId] of non-existing regions.
40pub struct RenewRegionLeasesResponse {
41    pub renewed: HashMap<RegionId, RegionLeaseInfo>,
42    pub non_exists: HashSet<RegionId>,
43}
44
45impl RegionLeaseKeeper {
46    pub fn new(
47        table_metadata_manager: TableMetadataManagerRef,
48        memory_region_keeper: MemoryRegionKeeperRef,
49    ) -> Self {
50        Self {
51            table_metadata_manager,
52            memory_region_keeper,
53        }
54    }
55}
56
57fn renew_region_lease_via_region_route(
58    region_route: &RegionRoute,
59    datanode_id: DatanodeId,
60    region_id: RegionId,
61) -> Option<(RegionId, RegionRole)> {
62    // If it's a leader region on this datanode.
63    if let Some(leader) = &region_route.leader_peer {
64        if leader.id == datanode_id {
65            let region_role = if region_route.is_leader_downgrading() {
66                RegionRole::DowngradingLeader
67            } else {
68                RegionRole::Leader
69            };
70
71            return Some((region_id, region_role));
72        }
73    }
74
75    // If it's a follower region on this datanode.
76    if region_route
77        .follower_peers
78        .iter()
79        .any(|peer| peer.id == datanode_id)
80    {
81        return Some((region_id, RegionRole::Follower));
82    }
83
84    warn!(
85        "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region_routes: {:?}",
86        region_route
87    );
88    // The region doesn't belong to this datanode.
89    None
90}
91
92/// The information of region lease.
93#[derive(Debug, PartialEq, Eq)]
94pub struct RegionLeaseInfo {
95    pub region_id: RegionId,
96    /// Whether the region is operating.
97    ///
98    /// The region is under dropping or opening / migration operation.
99    pub is_operating: bool,
100    /// The role of region.
101    pub role: RegionRole,
102}
103
104impl RegionLeaseInfo {
105    /// Creates a new [RegionLeaseInfo] with the given region id and role with operating status.
106    pub fn operating(region_id: RegionId, role: RegionRole) -> Self {
107        Self {
108            region_id,
109            is_operating: true,
110            role,
111        }
112    }
113}
114
115impl From<(RegionId, RegionRole)> for RegionLeaseInfo {
116    fn from((region_id, role): (RegionId, RegionRole)) -> Self {
117        Self {
118            region_id,
119            is_operating: false,
120            role,
121        }
122    }
123}
124
125impl RegionLeaseKeeper {
126    async fn collect_table_metadata(
127        &self,
128        table_ids: &[TableId],
129    ) -> Result<HashMap<TableId, TableRouteValue>> {
130        let table_route_manager = self.table_metadata_manager.table_route_manager();
131
132        // The subset of all table metadata.
133        // TODO: considers storing all active regions in meta's memory.
134        let table_routes = table_route_manager
135            .table_route_storage()
136            .batch_get(table_ids)
137            .await
138            .context(error::TableMetadataManagerSnafu)?;
139
140        let metadata_subset = table_routes
141            .into_iter()
142            .zip(table_ids)
143            .filter_map(|(route, id)| route.map(|route| (*id, route)))
144            .collect::<HashMap<_, _>>();
145
146        Ok(metadata_subset)
147    }
148
149    /// Returns [None] if:
150    /// - The region doesn't belong to the datanode.
151    /// - The region belongs to a logical table.
152    fn renew_region_lease(
153        &self,
154        table_metadata: &HashMap<TableId, TableRouteValue>,
155        operating_regions: &HashSet<RegionId>,
156        datanode_id: DatanodeId,
157        region_id: RegionId,
158        role: RegionRole,
159    ) -> Option<RegionLeaseInfo> {
160        if operating_regions.contains(&region_id) {
161            let region_lease_info = RegionLeaseInfo::operating(region_id, role);
162            return Some(region_lease_info);
163        }
164
165        if let Some(table_route) = table_metadata.get(&region_id.table_id()) {
166            if let Ok(Some(region_route)) = table_route.region_route(region_id) {
167                return renew_region_lease_via_region_route(&region_route, datanode_id, region_id)
168                    .map(RegionLeaseInfo::from);
169            } else {
170                warn!(
171                    "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region route is not found in table({})",
172                    region_id.table_id()
173                );
174            }
175        } else {
176            warn!(
177                "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, table({}) is not found",
178                region_id.table_id()
179            );
180        }
181        None
182    }
183
184    async fn collect_metadata(
185        &self,
186        datanode_id: DatanodeId,
187        mut region_ids: HashSet<RegionId>,
188    ) -> Result<(HashMap<TableId, TableRouteValue>, HashSet<RegionId>)> {
189        // Filters out operating region first, improves the cache hit rate(reduce expensive remote fetches).
190        let operating_regions = self
191            .memory_region_keeper
192            .extract_operating_regions(datanode_id, &mut region_ids);
193        let table_ids = region_ids
194            .into_iter()
195            .map(|region_id| region_id.table_id())
196            .collect::<HashSet<_>>()
197            .into_iter()
198            .collect::<Vec<_>>();
199        let table_metadata = self.collect_table_metadata(&table_ids).await?;
200        Ok((table_metadata, operating_regions))
201    }
202
203    /// Renews the lease of regions for specific datanode.
204    ///
205    /// The lease of regions will be renewed if:
206    /// -  The region of the specific datanode exists in [TableRouteValue].
207    /// -  The region of the specific datanode is opening.
208    ///
209    /// Otherwise the lease of regions will not be renewed,
210    /// and corresponding regions will be added to `non_exists` of [RenewRegionLeasesResponse].
211    pub async fn renew_region_leases(
212        &self,
213        datanode_id: DatanodeId,
214        regions: &[(RegionId, RegionRole)],
215    ) -> Result<RenewRegionLeasesResponse> {
216        let region_ids = regions
217            .iter()
218            .map(|(region_id, _)| *region_id)
219            .collect::<HashSet<_>>();
220        let (table_metadata, operating_regions) =
221            self.collect_metadata(datanode_id, region_ids).await?;
222        let mut renewed = HashMap::new();
223        let mut non_exists = HashSet::new();
224
225        for &(region, role) in regions {
226            match self.renew_region_lease(
227                &table_metadata,
228                &operating_regions,
229                datanode_id,
230                region,
231                role,
232            ) {
233                Some(region_lease_info) => {
234                    renewed.insert(region_lease_info.region_id, region_lease_info);
235                }
236                None => {
237                    non_exists.insert(region);
238                }
239            }
240        }
241
242        Ok(RenewRegionLeasesResponse {
243            renewed,
244            non_exists,
245        })
246    }
247
248    #[cfg(test)]
249    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
250        &self.table_metadata_manager
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use std::collections::{HashMap, HashSet};
257    use std::sync::Arc;
258
259    use common_meta::key::table_route::{LogicalTableRouteValue, TableRouteValue};
260    use common_meta::key::test_utils::new_test_table_info;
261    use common_meta::key::TableMetadataManager;
262    use common_meta::kv_backend::memory::MemoryKvBackend;
263    use common_meta::peer::Peer;
264    use common_meta::region_keeper::MemoryRegionKeeper;
265    use common_meta::rpc::router::{LeaderState, Region, RegionRouteBuilder};
266    use store_api::region_engine::RegionRole;
267    use store_api::storage::RegionId;
268    use table::metadata::RawTableInfo;
269
270    use super::{renew_region_lease_via_region_route, RegionLeaseKeeper};
271    use crate::region::lease_keeper::{RegionLeaseInfo, RenewRegionLeasesResponse};
272
273    fn new_test_keeper() -> RegionLeaseKeeper {
274        let store = Arc::new(MemoryKvBackend::new());
275
276        let table_metadata_manager = Arc::new(TableMetadataManager::new(store));
277
278        let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
279        RegionLeaseKeeper::new(table_metadata_manager, opening_region_keeper)
280    }
281
282    #[test]
283    fn test_renew_region_lease_via_region_route() {
284        let region_id = RegionId::new(1024, 1);
285        let leader_peer_id = 1024;
286        let follower_peer_id = 2048;
287        let mut region_route = RegionRouteBuilder::default()
288            .region(Region::new_test(region_id))
289            .leader_peer(Peer::empty(leader_peer_id))
290            .follower_peers(vec![Peer::empty(follower_peer_id)])
291            .build()
292            .unwrap();
293
294        // The region doesn't belong to the datanode.
295        for region_id in [RegionId::new(1024, 2), region_id] {
296            assert!(renew_region_lease_via_region_route(&region_route, 1, region_id).is_none());
297        }
298
299        // The leader region on the datanode.
300        assert_eq!(
301            renew_region_lease_via_region_route(&region_route, leader_peer_id, region_id),
302            Some((region_id, RegionRole::Leader))
303        );
304        // The follower region on the datanode.
305        assert_eq!(
306            renew_region_lease_via_region_route(&region_route, follower_peer_id, region_id),
307            Some((region_id, RegionRole::Follower))
308        );
309
310        region_route.leader_state = Some(LeaderState::Downgrading);
311        // The downgraded leader region on the datanode.
312        assert_eq!(
313            renew_region_lease_via_region_route(&region_route, leader_peer_id, region_id),
314            Some((region_id, RegionRole::DowngradingLeader))
315        );
316    }
317
318    #[tokio::test]
319    async fn test_renew_region_leases_non_exists_regions() {
320        let keeper = new_test_keeper();
321
322        let RenewRegionLeasesResponse {
323            non_exists,
324            renewed,
325        } = keeper
326            .renew_region_leases(
327                1,
328                &[
329                    (RegionId::new(1024, 1), RegionRole::Follower),
330                    (RegionId::new(1024, 2), RegionRole::Leader),
331                ],
332            )
333            .await
334            .unwrap();
335
336        assert!(renewed.is_empty());
337        assert_eq!(
338            non_exists,
339            HashSet::from([RegionId::new(1024, 1), RegionId::new(1024, 2)])
340        );
341    }
342
343    #[tokio::test]
344    async fn test_collect_metadata() {
345        let region_number = 1u32;
346        let table_id = 1024;
347        let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
348
349        let region_id = RegionId::new(table_id, 1);
350        let leader_peer_id = 1024;
351        let follower_peer_id = 2048;
352        let region_route = RegionRouteBuilder::default()
353            .region(Region::new_test(region_id))
354            .leader_peer(Peer::empty(leader_peer_id))
355            .follower_peers(vec![Peer::empty(follower_peer_id)])
356            .build()
357            .unwrap();
358
359        let keeper = new_test_keeper();
360        let table_metadata_manager = keeper.table_metadata_manager();
361        table_metadata_manager
362            .create_table_metadata(
363                table_info,
364                TableRouteValue::physical(vec![region_route]),
365                HashMap::default(),
366            )
367            .await
368            .unwrap();
369        let opening_region_id = RegionId::new(1025, 1);
370        let _guard = keeper
371            .memory_region_keeper
372            .register(leader_peer_id, opening_region_id)
373            .unwrap();
374        let another_opening_region_id = RegionId::new(1025, 2);
375        let _guard2 = keeper
376            .memory_region_keeper
377            .register(follower_peer_id, another_opening_region_id)
378            .unwrap();
379
380        let (metadata, regions) = keeper
381            .collect_metadata(
382                leader_peer_id,
383                HashSet::from([region_id, opening_region_id]),
384            )
385            .await
386            .unwrap();
387        assert_eq!(
388            metadata.keys().cloned().collect::<Vec<_>>(),
389            vec![region_id.table_id()]
390        );
391        assert!(regions.contains(&opening_region_id));
392        assert_eq!(regions.len(), 1);
393    }
394
395    #[tokio::test]
396    async fn test_renew_region_leases_basic() {
397        let region_number = 1u32;
398        let table_id = 1024;
399        let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
400
401        let region_id = RegionId::new(table_id, 1);
402        let leader_peer_id = 1024;
403        let follower_peer_id = 2048;
404        let region_route = RegionRouteBuilder::default()
405            .region(Region::new_test(region_id))
406            .leader_peer(Peer::empty(leader_peer_id))
407            .follower_peers(vec![Peer::empty(follower_peer_id)])
408            .build()
409            .unwrap();
410
411        let keeper = new_test_keeper();
412        let table_metadata_manager = keeper.table_metadata_manager();
413        table_metadata_manager
414            .create_table_metadata(
415                table_info,
416                TableRouteValue::physical(vec![region_route]),
417                HashMap::default(),
418            )
419            .await
420            .unwrap();
421
422        // The region doesn't belong to the datanode.
423        for region_id in [RegionId::new(1024, 2), region_id] {
424            let RenewRegionLeasesResponse {
425                non_exists,
426                renewed,
427            } = keeper
428                .renew_region_leases(1, &[(region_id, RegionRole::Follower)])
429                .await
430                .unwrap();
431            assert!(renewed.is_empty());
432            assert_eq!(non_exists, HashSet::from([region_id]));
433        }
434
435        // The leader region on the datanode.
436        for role in [RegionRole::Leader, RegionRole::Follower] {
437            let RenewRegionLeasesResponse {
438                non_exists,
439                renewed,
440            } = keeper
441                .renew_region_leases(leader_peer_id, &[(region_id, role)])
442                .await
443                .unwrap();
444
445            assert!(non_exists.is_empty());
446            assert_eq!(
447                renewed,
448                HashMap::from([(
449                    region_id,
450                    RegionLeaseInfo::from((region_id, RegionRole::Leader))
451                )])
452            );
453        }
454
455        // The follower region on the datanode.
456        for role in [RegionRole::Leader, RegionRole::Follower] {
457            let RenewRegionLeasesResponse {
458                non_exists,
459                renewed,
460            } = keeper
461                .renew_region_leases(follower_peer_id, &[(region_id, role)])
462                .await
463                .unwrap();
464
465            assert!(non_exists.is_empty());
466            assert_eq!(
467                renewed,
468                HashMap::from([(
469                    region_id,
470                    RegionLeaseInfo::from((region_id, RegionRole::Follower))
471                )])
472            );
473        }
474
475        let opening_region_id = RegionId::new(2048, 1);
476        let _guard = keeper
477            .memory_region_keeper
478            .register(leader_peer_id, opening_region_id)
479            .unwrap();
480
481        // The opening region on the datanode.
482        // NOTES: The procedure lock will ensure only one opening leader.
483        for role in [RegionRole::Leader, RegionRole::Follower] {
484            let RenewRegionLeasesResponse {
485                non_exists,
486                renewed,
487            } = keeper
488                .renew_region_leases(leader_peer_id, &[(opening_region_id, role)])
489                .await
490                .unwrap();
491
492            assert!(non_exists.is_empty());
493            assert_eq!(
494                renewed,
495                HashMap::from([(
496                    opening_region_id,
497                    RegionLeaseInfo::operating(opening_region_id, role)
498                )])
499            );
500        }
501    }
502
503    #[tokio::test]
504    async fn test_renew_unexpected_logic_table() {
505        let region_number = 1u32;
506        let table_id = 1024;
507        let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
508
509        let region_id = RegionId::new(table_id, 1);
510        let keeper = new_test_keeper();
511        let table_metadata_manager = keeper.table_metadata_manager();
512        table_metadata_manager
513            .create_table_metadata(
514                table_info,
515                TableRouteValue::Logical(LogicalTableRouteValue::new(table_id, vec![region_id])),
516                HashMap::default(),
517            )
518            .await
519            .unwrap();
520
521        for region_id in [region_id, RegionId::new(1024, 2)] {
522            let RenewRegionLeasesResponse {
523                non_exists,
524                renewed,
525            } = keeper
526                .renew_region_leases(
527                    1,
528                    &[
529                        (region_id, RegionRole::Follower),
530                        (region_id, RegionRole::Leader),
531                    ],
532                )
533                .await
534                .unwrap();
535            assert!(renewed.is_empty());
536            assert_eq!(non_exists, HashSet::from([region_id]));
537        }
538    }
539
540    #[tokio::test]
541    async fn test_renew_region_leases_with_downgrade_leader() {
542        let region_number = 1u32;
543        let table_id = 1024;
544        let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into();
545
546        let region_id = RegionId::new(table_id, 1);
547        let leader_peer_id = 1024;
548        let follower_peer_id = 2048;
549        let region_route = RegionRouteBuilder::default()
550            .region(Region::new_test(region_id))
551            .leader_peer(Peer::empty(leader_peer_id))
552            .follower_peers(vec![Peer::empty(follower_peer_id)])
553            .leader_state(LeaderState::Downgrading)
554            .build()
555            .unwrap();
556
557        let keeper = new_test_keeper();
558        let table_metadata_manager = keeper.table_metadata_manager();
559        table_metadata_manager
560            .create_table_metadata(
561                table_info,
562                TableRouteValue::physical(vec![region_route]),
563                HashMap::default(),
564            )
565            .await
566            .unwrap();
567
568        // The leader region on the datanode.
569        for role in [RegionRole::Leader, RegionRole::Follower] {
570            let RenewRegionLeasesResponse {
571                non_exists,
572                renewed,
573            } = keeper
574                .renew_region_leases(follower_peer_id, &[(region_id, role)])
575                .await
576                .unwrap();
577
578            assert!(non_exists.is_empty());
579            assert_eq!(
580                renewed,
581                HashMap::from([(
582                    region_id,
583                    RegionLeaseInfo::from((region_id, RegionRole::Follower))
584                )])
585            );
586        }
587    }
588}